From 2c902f3129c5f19761a8dd8dbf4988534d695628 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Sun, 19 Jun 2022 00:06:24 +0200 Subject: [PATCH] refactored visitor init --- pyinfra/visitor/utils.py | 18 +++++++++--------- pyinfra/visitor/visitor.py | 26 ++++++++++++++++++++------ test/conftest.py | 2 +- test/integration_tests/serve_test.py | 2 +- test/unit_tests/queue_visitor_test.py | 2 +- 5 files changed, 32 insertions(+), 18 deletions(-) diff --git a/pyinfra/visitor/utils.py b/pyinfra/visitor/utils.py index 242704b..b6ce410 100644 --- a/pyinfra/visitor/utils.py +++ b/pyinfra/visitor/utils.py @@ -22,15 +22,6 @@ def build_file_path(storage_upload_info, folder): return f"{storage_upload_info['fileId']}" + (f"/{folder}" if folder else "") -def validate(data): - if not ("data" in data and "metadata" in data): - raise InvalidStorageItemFormat(f"Expected a mapping with keys 'data' and 'metadata', got {data}.") - - -def wrap(data): - return {"data": data, "metadata": {}} - - def standardize(data) -> Dict: """Storage items can be a blob or a blob with metadata. Standardizes to the latter. @@ -62,6 +53,15 @@ def standardize(data) -> Dict: return wrap(string_to_bytes(data)) +def wrap(data): + return {"data": data, "metadata": {}} + + +def validate(data): + if not ("data" in data and "metadata" in data): + raise InvalidStorageItemFormat(f"Expected a mapping with keys 'data' and 'metadata', got {data}.") + + def get_download_strategy(download_strategy_type=None): download_strategies = { "single": SingleDownloadStrategy(), diff --git a/pyinfra/visitor/visitor.py b/pyinfra/visitor/visitor.py index c0f6ca0..9164346 100644 --- a/pyinfra/visitor/visitor.py +++ b/pyinfra/visitor/visitor.py @@ -8,6 +8,7 @@ from pyinfra.visitor.strategies.download.download import DownloadStrategy from pyinfra.visitor.strategies.parsing.dynamic import DynamicParsingStrategy from pyinfra.visitor.strategies.parsing.parsing import ParsingStrategy from pyinfra.visitor.strategies.response.response import ResponseStrategy +from pyinfra.visitor.strategies.response.storage import StorageStrategy from pyinfra.visitor.utils import standardize, get_download_strategy @@ -16,21 +17,33 @@ class QueueVisitor: self, storage: Storage, callback: Callable, - response_strategy: ResponseStrategy, - parsing_strategy: ParsingStrategy = None, download_strategy: DownloadStrategy = None, + parsing_strategy: ParsingStrategy = None, + response_strategy: ResponseStrategy = None, ): + """Processes queue messages that specify items on a storage to process with a given callback. + + Args: + storage: storage to pull items specified by queue message from + callback: callback to apply to storage items + download_strategy: behaviour for loading items from the storage + parsing_strategy: behaviour for interpreting storage items + response_strategy: behaviour for response production + + Returns: + depends on response strategy + """ self.storage = storage self.callback = callback - self.response_strategy = response_strategy - self.parsing_strategy = parsing_strategy or DynamicParsingStrategy() self.download_strategy = download_strategy or get_download_strategy() + self.parsing_strategy = parsing_strategy or DynamicParsingStrategy() + self.response_strategy = response_strategy or StorageStrategy() def __call__(self, queue_item_body): - analysis_result_body = self.load_item_from_storage_and_process_with_callback(queue_item_body) + analysis_result_body = self.load_items_from_storage_and_process_with_callback(queue_item_body) return self.response_strategy(analysis_result_body) - def load_item_from_storage_and_process_with_callback(self, queue_item_body): + def load_items_from_storage_and_process_with_callback(self, queue_item_body): """Bundles the result from processing a storage item with the body of the corresponding queue item.""" callback_results = compose( @@ -54,6 +67,7 @@ class QueueVisitor: lift(self.parsing_strategy), self.download_strategy, )(self.storage, queue_item_body) + return data def process_storage_item(self, data_metadata_pack): diff --git a/test/conftest.py b/test/conftest.py index 416b372..1293222 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -197,4 +197,4 @@ def response_strategy(response_strategy_name, storage): @pytest.fixture() def visitor(storage, analysis_callback, response_strategy): - return QueueVisitor(storage, analysis_callback, response_strategy) + return QueueVisitor(storage=storage, callback=analysis_callback, response_strategy=response_strategy) diff --git a/test/integration_tests/serve_test.py b/test/integration_tests/serve_test.py index 9046e95..92790e7 100644 --- a/test/integration_tests/serve_test.py +++ b/test/integration_tests/serve_test.py @@ -232,7 +232,7 @@ def download_strategy(many_to_n): def test_components(url, queue_manager, storage): callback = get_callback(url) - visitor = QueueVisitor(storage, callback, get_response_strategy(storage)) + visitor = QueueVisitor(storage=storage, callback=callback, response_strategy=get_response_strategy(storage)) consumer = Consumer(visitor, queue_manager) return storage, queue_manager, consumer diff --git a/test/unit_tests/queue_visitor_test.py b/test/unit_tests/queue_visitor_test.py index e5da21e..5b722c8 100644 --- a/test/unit_tests/queue_visitor_test.py +++ b/test/unit_tests/queue_visitor_test.py @@ -26,7 +26,7 @@ class TestVisitor: def test_visitor_pulls_and_processes_data(self, visitor, body, storage, bucket_name): storage.clear_bucket(bucket_name) storage.put_object(**SingleDownloadStrategy().get_object_descriptor(body), data=pack_for_upload(b"2")) - response_body = visitor.load_item_from_storage_and_process_with_callback(body) + response_body = visitor.load_items_from_storage_and_process_with_callback(body) assert response_body["data"] == ["22"] @pytest.mark.parametrize("response_strategy_name", ["storage"], scope="session")