From c55e41f2d8de75af146170378b5db96f614cdc01 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Tue, 7 Jun 2022 14:55:40 +0200 Subject: [PATCH] refactoring; tweaked json-blob-parser; added standardization case for decodable strings as storage items --- pyinfra/parser/parser_composer.py | 13 +++++++++++- pyinfra/parser/parsers/json.py | 4 +++- .../queue/queue_manager/pika_queue_manager.py | 8 ++++--- pyinfra/utils/encoding.py | 8 +++++++ pyinfra/visitor.py | 21 +++++++++++++++---- test/conftest.py | 8 ++++--- test/parser/parser_test.py | 9 ++++---- test/unit_tests/queue_visitor_test.py | 16 +++++--------- 8 files changed, 60 insertions(+), 27 deletions(-) create mode 100644 pyinfra/utils/encoding.py diff --git a/pyinfra/parser/parser_composer.py b/pyinfra/parser/parser_composer.py index 1b22c2d..a0b7bc9 100644 --- a/pyinfra/parser/parser_composer.py +++ b/pyinfra/parser/parser_composer.py @@ -1,7 +1,11 @@ +import logging + from funcy import rcompose from pyinfra.parser.blob_parser import ParsingError +logger = logging.getLogger(__name__) + class Either: def __init__(self, item): @@ -23,6 +27,13 @@ class EitherParserWrapper: def __init__(self, parser): self.parser = parser + def __log(self, result): + if isinstance(result, Right): + logger.debug(f"{self.parser.__class__.__name__} succeeded or forwarded on {result.bind()}") + else: + logger.debug(f"{self.parser.__class__.__name__} failed on {result.bind()}") + return result + def parse(self, item: Either): if isinstance(item, Left): @@ -38,7 +49,7 @@ class EitherParserWrapper: return self.parse(Left(item)) def __call__(self, item): - return self.parse(item) + return self.__log(self.parse(item)) class EitherParserComposer: diff --git a/pyinfra/parser/parsers/json.py b/pyinfra/parser/parsers/json.py index b666ce1..0ff9d1a 100644 --- a/pyinfra/parser/parsers/json.py +++ b/pyinfra/parser/parsers/json.py @@ -13,7 +13,9 @@ class JsonBlobParser(BlobParser): except (UnicodeDecodeError, json.JSONDecodeError, AttributeError) as err: raise ParsingError from err - if "data" in data: + try: data["data"] = string_to_bytes(data["data"]) + except (KeyError, TypeError) as err: + raise ParsingError from err return data diff --git a/pyinfra/queue/queue_manager/pika_queue_manager.py b/pyinfra/queue/queue_manager/pika_queue_manager.py index 278e446..2d036f3 100644 --- a/pyinfra/queue/queue_manager/pika_queue_manager.py +++ b/pyinfra/queue/queue_manager/pika_queue_manager.py @@ -133,15 +133,17 @@ class PikaQueueManager(QueueManager): def pull_request(self): return self.channel.basic_get(self._input_queue) - def consume(self, inactivity_timeout=None): + def consume(self, inactivity_timeout=None, n=None): + print(f"{n=}") logger.debug("Consuming") - return self.channel.consume(self._input_queue, inactivity_timeout=inactivity_timeout) + gen = self.channel.consume(self._input_queue, inactivity_timeout=inactivity_timeout) + yield from islice(gen, n) def consume_and_publish(self, visitor: QueueVisitor, n=None): logger.info(f"Consuming input queue.") - for message in islice(self.consume(), n): + for message in self.consume(n=n): self.publish_response(message, visitor) def basic_consume_and_publish(self, visitor: QueueVisitor): diff --git a/pyinfra/utils/encoding.py b/pyinfra/utils/encoding.py new file mode 100644 index 0000000..d6ed7d3 --- /dev/null +++ b/pyinfra/utils/encoding.py @@ -0,0 +1,8 @@ +import gzip +import json + +from pyinfra.server.packing import bytes_to_string + + +def pack_for_upload(data: bytes): + return gzip.compress(json.dumps(bytes_to_string(data)).encode()) diff --git a/pyinfra/visitor.py b/pyinfra/visitor.py index ef2ed86..03e83f2 100644 --- a/pyinfra/visitor.py +++ b/pyinfra/visitor.py @@ -17,8 +17,11 @@ from pyinfra.parser.parser_composer import EitherParserComposer from pyinfra.parser.parsers.identity import IdentityBlobParser from pyinfra.parser.parsers.json import JsonBlobParser from pyinfra.parser.parsers.string import StringBlobParser +from pyinfra.server.packing import string_to_bytes from pyinfra.storage.storage import Storage +logger = logging.getLogger(__name__) + def unique_hash(pages, seed=""): assert isinstance(seed, str) @@ -219,7 +222,7 @@ class QueueVisitor: return data - def standardize(self, data: bytes) -> Dict: + def standardize(self, data) -> Dict: """Storage items can be a blob or a blob with metadata. Standardizes to the latter. Cases: @@ -231,14 +234,24 @@ class QueueVisitor: {"data": bytes, "metadata": dict} """ - if isinstance(data, bytes): # case 1 + def is_blob_without_metadata(data): + return isinstance(data, bytes) + + def is_blob_with_metadata(data: Dict): + return isinstance(data, dict) + + if is_blob_without_metadata(data): return wrap(data) - else: # case 2 - assert isinstance(data, dict) + elif is_blob_with_metadata(data): validate(data) return data + else: # Fallback / used for testing with simple data + logger.warning("Encountered storage data in unexpected format.") + assert isinstance(data, str) + return wrap(string_to_bytes(data)) + def load_data(self, queue_item_body): object_descriptor = get_object_descriptor(queue_item_body) logging.debug(f"Downloading {object_descriptor}...") diff --git a/test/conftest.py b/test/conftest.py index c7b60cd..ad3db72 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,12 +1,14 @@ import json import logging +import time from unittest.mock import Mock import pika import pytest +from testcontainers.compose import DockerCompose from pyinfra.exceptions import UnknownClient -from pyinfra.locations import TEST_DIR +from pyinfra.locations import TEST_DIR, COMPOSE_PATH from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager, get_connection_params from pyinfra.queue.queue_manager.queue_manager import QueueManager from pyinfra.storage.adapters.azure import AzureStorageAdapter @@ -88,11 +90,11 @@ def storage(client_name, bucket_name, request, docker_compose): storage.clear_bucket(bucket_name) -@pytest.fixture(scope="session", autouse=False) +@pytest.fixture(scope="session", autouse=True) def docker_compose(sleep_seconds=30): pass # logger.info(f"Starting docker containers with {COMPOSE_PATH}/docker-compose.yml...") - # compose = testcontainers.compose.DockerCompose(COMPOSE_PATH, compose_file_name="docker-compose.yml") + # compose = DockerCompose(COMPOSE_PATH, compose_file_name="docker-compose.yml") # compose.start() # logger.info(f"Sleeping for {sleep_seconds} seconds to wait for containers to finish startup... ") # time.sleep(sleep_seconds) diff --git a/test/parser/parser_test.py b/test/parser/parser_test.py index de40692..ba2ba63 100644 --- a/test/parser/parser_test.py +++ b/test/parser/parser_test.py @@ -7,11 +7,12 @@ from pyinfra.parser.parser_composer import EitherParserComposer from pyinfra.parser.parsers.identity import IdentityBlobParser from pyinfra.parser.parsers.json import JsonBlobParser from pyinfra.parser.parsers.string import StringBlobParser +from pyinfra.server.packing import bytes_to_string def test_json_parser(): - d = {"a": 1} - assert JsonBlobParser()(json.dumps(d).encode()) == d + d = {"data": bytes_to_string(b"aa")} + assert JsonBlobParser()(json.dumps(d).encode()) == {"data": b"aa"} def test_string_parser(): @@ -27,8 +28,8 @@ def test_identity_parser(): def test_either_parser_composer(): parser = EitherParserComposer(JsonBlobParser(), StringBlobParser(), IdentityBlobParser()) - d = {"a": 1} - assert parser(json.dumps(d).encode()) == d + d = {"data": bytes_to_string(b"aa")} + assert parser(json.dumps(d).encode()) == {"data": b"aa"} a = "a" assert parser(a.encode()) == a diff --git a/test/unit_tests/queue_visitor_test.py b/test/unit_tests/queue_visitor_test.py index 3d0a379..87a184d 100644 --- a/test/unit_tests/queue_visitor_test.py +++ b/test/unit_tests/queue_visitor_test.py @@ -3,8 +3,8 @@ import json import pytest -from pyinfra.server.packing import bytes_to_string -from pyinfra.visitor import get_object_descriptor, get_response_object_descriptor +from pyinfra.utils.encoding import pack_for_upload +from pyinfra.visitor import get_object_descriptor @pytest.fixture() @@ -19,27 +19,21 @@ class TestVisitor: self, visitor, body, storage, bucket_name ): storage.clear_bucket(bucket_name) - storage.put_object( - **get_object_descriptor(body), data=gzip.compress(json.dumps(bytes_to_string(b"content")).encode()) - ) + storage.put_object(**get_object_descriptor(body), data=pack_for_upload(b"content")) data_received = visitor.load_data(body) assert {"data": b"content", "metadata": {}} == data_received @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session") def test_visitor_pulls_and_processes_data(self, visitor, body, storage, bucket_name): storage.clear_bucket(bucket_name) - storage.put_object( - **get_object_descriptor(body), data=gzip.compress(json.dumps(bytes_to_string(b"2")).encode()) - ) + storage.put_object(**get_object_descriptor(body), data=pack_for_upload(b"2")) response_body = visitor.load_item_from_storage_and_process_with_callback(body) assert response_body["data"] == ["22"] @pytest.mark.parametrize("response_strategy_name", ["storage"], scope="session") def test_visitor_puts_response_on_storage(self, visitor, body, storage, bucket_name): storage.clear_bucket(bucket_name) - storage.put_object( - **get_object_descriptor(body), data=gzip.compress(json.dumps(bytes_to_string(b"2")).encode()) - ) + storage.put_object(**get_object_descriptor(body), data=pack_for_upload(b"2")) response_body = visitor(body) assert "data" not in response_body assert json.loads(