refactoring; tweaked json-blob-parser; added standardization case for decodable strings as storage items

This commit is contained in:
Matthias Bisping 2022-06-07 14:55:40 +02:00
parent f718b2f7ef
commit c55e41f2d8
8 changed files with 60 additions and 27 deletions

View File

@ -1,7 +1,11 @@
import logging
from funcy import rcompose
from pyinfra.parser.blob_parser import ParsingError
logger = logging.getLogger(__name__)
class Either:
def __init__(self, item):
@ -23,6 +27,13 @@ class EitherParserWrapper:
def __init__(self, parser):
self.parser = parser
def __log(self, result):
if isinstance(result, Right):
logger.debug(f"{self.parser.__class__.__name__} succeeded or forwarded on {result.bind()}")
else:
logger.debug(f"{self.parser.__class__.__name__} failed on {result.bind()}")
return result
def parse(self, item: Either):
if isinstance(item, Left):
@ -38,7 +49,7 @@ class EitherParserWrapper:
return self.parse(Left(item))
def __call__(self, item):
return self.parse(item)
return self.__log(self.parse(item))
class EitherParserComposer:

View File

@ -13,7 +13,9 @@ class JsonBlobParser(BlobParser):
except (UnicodeDecodeError, json.JSONDecodeError, AttributeError) as err:
raise ParsingError from err
if "data" in data:
try:
data["data"] = string_to_bytes(data["data"])
except (KeyError, TypeError) as err:
raise ParsingError from err
return data

View File

@ -133,15 +133,17 @@ class PikaQueueManager(QueueManager):
def pull_request(self):
return self.channel.basic_get(self._input_queue)
def consume(self, inactivity_timeout=None):
def consume(self, inactivity_timeout=None, n=None):
print(f"{n=}")
logger.debug("Consuming")
return self.channel.consume(self._input_queue, inactivity_timeout=inactivity_timeout)
gen = self.channel.consume(self._input_queue, inactivity_timeout=inactivity_timeout)
yield from islice(gen, n)
def consume_and_publish(self, visitor: QueueVisitor, n=None):
logger.info(f"Consuming input queue.")
for message in islice(self.consume(), n):
for message in self.consume(n=n):
self.publish_response(message, visitor)
def basic_consume_and_publish(self, visitor: QueueVisitor):

View File

@ -0,0 +1,8 @@
import gzip
import json
from pyinfra.server.packing import bytes_to_string
def pack_for_upload(data: bytes):
return gzip.compress(json.dumps(bytes_to_string(data)).encode())

View File

@ -17,8 +17,11 @@ from pyinfra.parser.parser_composer import EitherParserComposer
from pyinfra.parser.parsers.identity import IdentityBlobParser
from pyinfra.parser.parsers.json import JsonBlobParser
from pyinfra.parser.parsers.string import StringBlobParser
from pyinfra.server.packing import string_to_bytes
from pyinfra.storage.storage import Storage
logger = logging.getLogger(__name__)
def unique_hash(pages, seed=""):
assert isinstance(seed, str)
@ -219,7 +222,7 @@ class QueueVisitor:
return data
def standardize(self, data: bytes) -> Dict:
def standardize(self, data) -> Dict:
"""Storage items can be a blob or a blob with metadata. Standardizes to the latter.
Cases:
@ -231,14 +234,24 @@ class QueueVisitor:
{"data": bytes, "metadata": dict}
"""
if isinstance(data, bytes): # case 1
def is_blob_without_metadata(data):
return isinstance(data, bytes)
def is_blob_with_metadata(data: Dict):
return isinstance(data, dict)
if is_blob_without_metadata(data):
return wrap(data)
else: # case 2
assert isinstance(data, dict)
elif is_blob_with_metadata(data):
validate(data)
return data
else: # Fallback / used for testing with simple data
logger.warning("Encountered storage data in unexpected format.")
assert isinstance(data, str)
return wrap(string_to_bytes(data))
def load_data(self, queue_item_body):
object_descriptor = get_object_descriptor(queue_item_body)
logging.debug(f"Downloading {object_descriptor}...")

View File

@ -1,12 +1,14 @@
import json
import logging
import time
from unittest.mock import Mock
import pika
import pytest
from testcontainers.compose import DockerCompose
from pyinfra.exceptions import UnknownClient
from pyinfra.locations import TEST_DIR
from pyinfra.locations import TEST_DIR, COMPOSE_PATH
from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager, get_connection_params
from pyinfra.queue.queue_manager.queue_manager import QueueManager
from pyinfra.storage.adapters.azure import AzureStorageAdapter
@ -88,11 +90,11 @@ def storage(client_name, bucket_name, request, docker_compose):
storage.clear_bucket(bucket_name)
@pytest.fixture(scope="session", autouse=False)
@pytest.fixture(scope="session", autouse=True)
def docker_compose(sleep_seconds=30):
pass
# logger.info(f"Starting docker containers with {COMPOSE_PATH}/docker-compose.yml...")
# compose = testcontainers.compose.DockerCompose(COMPOSE_PATH, compose_file_name="docker-compose.yml")
# compose = DockerCompose(COMPOSE_PATH, compose_file_name="docker-compose.yml")
# compose.start()
# logger.info(f"Sleeping for {sleep_seconds} seconds to wait for containers to finish startup... ")
# time.sleep(sleep_seconds)

View File

@ -7,11 +7,12 @@ from pyinfra.parser.parser_composer import EitherParserComposer
from pyinfra.parser.parsers.identity import IdentityBlobParser
from pyinfra.parser.parsers.json import JsonBlobParser
from pyinfra.parser.parsers.string import StringBlobParser
from pyinfra.server.packing import bytes_to_string
def test_json_parser():
d = {"a": 1}
assert JsonBlobParser()(json.dumps(d).encode()) == d
d = {"data": bytes_to_string(b"aa")}
assert JsonBlobParser()(json.dumps(d).encode()) == {"data": b"aa"}
def test_string_parser():
@ -27,8 +28,8 @@ def test_identity_parser():
def test_either_parser_composer():
parser = EitherParserComposer(JsonBlobParser(), StringBlobParser(), IdentityBlobParser())
d = {"a": 1}
assert parser(json.dumps(d).encode()) == d
d = {"data": bytes_to_string(b"aa")}
assert parser(json.dumps(d).encode()) == {"data": b"aa"}
a = "a"
assert parser(a.encode()) == a

View File

@ -3,8 +3,8 @@ import json
import pytest
from pyinfra.server.packing import bytes_to_string
from pyinfra.visitor import get_object_descriptor, get_response_object_descriptor
from pyinfra.utils.encoding import pack_for_upload
from pyinfra.visitor import get_object_descriptor
@pytest.fixture()
@ -19,27 +19,21 @@ class TestVisitor:
self, visitor, body, storage, bucket_name
):
storage.clear_bucket(bucket_name)
storage.put_object(
**get_object_descriptor(body), data=gzip.compress(json.dumps(bytes_to_string(b"content")).encode())
)
storage.put_object(**get_object_descriptor(body), data=pack_for_upload(b"content"))
data_received = visitor.load_data(body)
assert {"data": b"content", "metadata": {}} == data_received
@pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session")
def test_visitor_pulls_and_processes_data(self, visitor, body, storage, bucket_name):
storage.clear_bucket(bucket_name)
storage.put_object(
**get_object_descriptor(body), data=gzip.compress(json.dumps(bytes_to_string(b"2")).encode())
)
storage.put_object(**get_object_descriptor(body), data=pack_for_upload(b"2"))
response_body = visitor.load_item_from_storage_and_process_with_callback(body)
assert response_body["data"] == ["22"]
@pytest.mark.parametrize("response_strategy_name", ["storage"], scope="session")
def test_visitor_puts_response_on_storage(self, visitor, body, storage, bucket_name):
storage.clear_bucket(bucket_name)
storage.put_object(
**get_object_descriptor(body), data=gzip.compress(json.dumps(bytes_to_string(b"2")).encode())
)
storage.put_object(**get_object_descriptor(body), data=pack_for_upload(b"2"))
response_body = visitor(body)
assert "data" not in response_body
assert json.loads(