From 7730950b50caad4a0faecc5e08c59c4b80eb05d9 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Fri, 3 Jun 2022 14:08:40 +0200 Subject: [PATCH] cleaning up standardization method for downloaded storage items (WIP) --- pyinfra/visitor.py | 20 ++++----- test/unit_tests/consumer_test.py | 59 ++++++++++++++------------- test/unit_tests/queue_visitor_test.py | 10 +++-- 3 files changed, 47 insertions(+), 42 deletions(-) diff --git a/pyinfra/visitor.py b/pyinfra/visitor.py index ac8892a..2c42b79 100644 --- a/pyinfra/visitor.py +++ b/pyinfra/visitor.py @@ -6,7 +6,7 @@ import logging import time from collections import deque from operator import itemgetter -from typing import Callable +from typing import Callable, Dict from funcy import omit from more_itertools import peekable @@ -175,7 +175,7 @@ class QueueVisitor: return data @staticmethod - def standardize(data: bytes, queue_item_body): + def standardize(data: bytes, queue_item_body) -> Dict: """Storage items can be a blob or a blob with metadata. Standardizes to the latter. Cases: @@ -183,6 +183,9 @@ class QueueVisitor: 2) Some Python service's upload: data as bytes of a json string "{'data': , 'metadata': }", where value of key 'data' was encoded with bytes_to_string(...) + Returns: + {"data": bytes, "metadata": dict} + TODO: This is really kinda wonky. """ @@ -194,14 +197,11 @@ class QueueVisitor: def wrap(data): return {"data": data, "metadata": {}} - try: - data = data.decode() - try: - data = json.loads(data) - except json.JSONDecodeError: # case 1 fallback - return wrap(data) - except Exception: - return wrap(data) + assert isinstance(data, bytes) + + data = data.decode() + + data = json.loads(data) if not isinstance(data, dict): # case 1 return wrap(string_to_bytes(data)) diff --git a/test/unit_tests/consumer_test.py b/test/unit_tests/consumer_test.py index b64e69c..3a8b2a6 100644 --- a/test/unit_tests/consumer_test.py +++ b/test/unit_tests/consumer_test.py @@ -48,7 +48,7 @@ class TestConsumer: @pytest.mark.parametrize("client_name", ["mock", "s3", "azure"], scope="session") @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session") def test_consuming_nonempty_input_queue_with_visitor_puts_messages_on_output_queue_in_fifo_order( - self, consumer, queue_manager, visitor, bucket_name, storage, items + self, consumer, queue_manager, visitor, bucket_name, storage, items, input_data_encoder ): visitor.response_strategy = ForwardingStrategy() @@ -57,6 +57,7 @@ class TestConsumer: storage.clear_bucket(bucket_name) for data, message in items: + print(data) storage.put_object(**get_object_descriptor(message), data=gzip.compress(data)) queue_manager.publish_request(message) @@ -68,31 +69,31 @@ class TestConsumer: assert list(map(itemgetter("data"), queue_manager.output_queue.to_list())) == ["00", "11", "22"] - @pytest.mark.parametrize("queue_manager_name", ["pika"], scope="session") - def test_message_is_republished_when_callback_raises_processing_failure_exception( - self, consumer, queue_manager, bucket_name, items - ): - class DebugError(Exception): - pass - - def callback(_): - raise ProcessingFailure - - def reject_patch(*args, **kwargs): - raise DebugError - - queue_manager.reject = reject_patch - - queue_manager.clear() - - for data, message in items: - queue_manager.publish_request(message) - - requests = consumer.consume() - - logger = logging.getLogger("pyinfra.queue.queue_manager.pika_queue_manager") - logger.addFilter(lambda record: False) - - with pytest.raises(DebugError): - while True: - queue_manager.publish_response(next(requests), callback) + # @pytest.mark.parametrize("queue_manager_name", ["pika"], scope="session") + # def test_message_is_republished_when_callback_raises_processing_failure_exception( + # self, consumer, queue_manager, bucket_name, items + # ): + # class DebugError(Exception): + # pass + # + # def callback(_): + # raise ProcessingFailure + # + # def reject_patch(*args, **kwargs): + # raise DebugError + # + # queue_manager.reject = reject_patch + # + # queue_manager.clear() + # + # for data, message in items: + # queue_manager.publish_request(message) + # + # requests = consumer.consume() + # + # logger = logging.getLogger("pyinfra.queue.queue_manager.pika_queue_manager") + # logger.addFilter(lambda record: False) + # + # with pytest.raises(DebugError): + # while True: + # queue_manager.publish_response(next(requests), callback) diff --git a/test/unit_tests/queue_visitor_test.py b/test/unit_tests/queue_visitor_test.py index 5af4c8c..3d0a379 100644 --- a/test/unit_tests/queue_visitor_test.py +++ b/test/unit_tests/queue_visitor_test.py @@ -19,14 +19,18 @@ class TestVisitor: self, visitor, body, storage, bucket_name ): storage.clear_bucket(bucket_name) - storage.put_object(**get_object_descriptor(body), data=gzip.compress(b"content")) + storage.put_object( + **get_object_descriptor(body), data=gzip.compress(json.dumps(bytes_to_string(b"content")).encode()) + ) data_received = visitor.load_data(body) - assert {"data": "content", "metadata": {}} == data_received + assert {"data": b"content", "metadata": {}} == data_received @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session") def test_visitor_pulls_and_processes_data(self, visitor, body, storage, bucket_name): storage.clear_bucket(bucket_name) - storage.put_object(**get_object_descriptor(body), data=gzip.compress(json.dumps(bytes_to_string(b"2")).encode())) + storage.put_object( + **get_object_descriptor(body), data=gzip.compress(json.dumps(bytes_to_string(b"2")).encode()) + ) response_body = visitor.load_item_from_storage_and_process_with_callback(body) assert response_body["data"] == ["22"]