From 0d455422b765b27882af30bc043014ea964d0fc9 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Mon, 20 Jun 2022 14:03:01 +0200 Subject: [PATCH] added aggregation strategy test + fixed bug in check for empty data in refactored aggregation strategy --- .../strategies/response/aggregation.py | 11 ++-- .../visitor/strategies/response/storage.py | 2 +- pyinfra/visitor/visitor.py | 6 +- test/unit_tests/queue_visitor_test.py | 4 +- .../server/aggregation_strategy_tets.py | 59 +++++++++++++++++++ 5 files changed, 70 insertions(+), 12 deletions(-) create mode 100644 test/unit_tests/server/aggregation_strategy_tets.py diff --git a/pyinfra/visitor/strategies/response/aggregation.py b/pyinfra/visitor/strategies/response/aggregation.py index f799092..db7ec3f 100644 --- a/pyinfra/visitor/strategies/response/aggregation.py +++ b/pyinfra/visitor/strategies/response/aggregation.py @@ -18,12 +18,11 @@ class AggregationStorageStrategy(ResponseStrategy): def handle_response(self, analysis_response, final=False): def upload_or_aggregate(analysis_payload): - return self.upload_or_aggregate(analysis_payload, request_metadata, last=not result_data.peek(False)) + request_metadata = omit(analysis_response, ["analysis_payloads"]) + return self.upload_or_aggregate(analysis_payload, request_metadata, last=not analysis_payloads.peek(False)) - request_metadata = omit(analysis_response, ["data"]) - result_data = peekable(analysis_response["data"]) - - yield from filter(is_not_nothing, map(upload_or_aggregate, result_data)) + analysis_payloads = peekable(analysis_response["analysis_payloads"]) + yield from filter(is_not_nothing, map(upload_or_aggregate, analysis_payloads)) def upload_or_aggregate(self, analysis_payload, request_metadata, last=False): """analysis_payload : {data: ..., metadata: ...}""" @@ -33,7 +32,7 @@ class AggregationStorageStrategy(ResponseStrategy): self.add_analysis_payload_to_buffer(analysis_payload) - if analysis_payload["data"] or last: + if not analysis_payload["data"] or last: self.upload_aggregated_items(object_descriptor) # TODO: mappings such as object_name -> responseFile should be put in a separate interface mapping layer. # See the enum-formatter in image-prediction service for reference. diff --git a/pyinfra/visitor/strategies/response/storage.py b/pyinfra/visitor/strategies/response/storage.py index b0be8cf..b14f410 100644 --- a/pyinfra/visitor/strategies/response/storage.py +++ b/pyinfra/visitor/strategies/response/storage.py @@ -11,6 +11,6 @@ class StorageStrategy(ResponseStrategy): def handle_response(self, body: dict): response_object_descriptor = self.get_response_object_descriptor(body) self.storage.put_object(**response_object_descriptor, data=compress(json.dumps(body).encode())) - body.pop("data") + body.pop("analysis_payloads") body["responseFile"] = response_object_descriptor["object_name"] return body diff --git a/pyinfra/visitor/visitor.py b/pyinfra/visitor/visitor.py index d14a904..9f93cc3 100644 --- a/pyinfra/visitor/visitor.py +++ b/pyinfra/visitor/visitor.py @@ -40,8 +40,8 @@ class QueueVisitor: self.response_strategy = response_strategy or StorageStrategy() def __call__(self, queue_item_body): - analysis_result_body = self.load_items_from_storage_and_process_with_callback(queue_item_body) - return self.response_strategy(analysis_result_body) + analysis_response = self.load_items_from_storage_and_process_with_callback(queue_item_body) + return self.response_strategy(analysis_response) def load_items_from_storage_and_process_with_callback(self, queue_item_body): """Bundles the result from processing a storage item with the body of the corresponding queue item.""" @@ -52,7 +52,7 @@ class QueueVisitor: self.load_data, )(queue_item_body) - return {"data": callback_results, **queue_item_body} + return {"analysis_payloads": callback_results, **queue_item_body} def get_item_processor(self, queue_item_body): def process_storage_item(storage_item): diff --git a/test/unit_tests/queue_visitor_test.py b/test/unit_tests/queue_visitor_test.py index 1bf3247..4bb2974 100644 --- a/test/unit_tests/queue_visitor_test.py +++ b/test/unit_tests/queue_visitor_test.py @@ -28,7 +28,7 @@ class TestVisitor: storage.clear_bucket(bucket_name) storage.put_object(**SingleDownloadStrategy().get_object_descriptor(body), data=pack_for_upload(b"2")) response_body = visitor.load_items_from_storage_and_process_with_callback(body) - assert response_body["data"] == ["22"] + assert response_body["analysis_payloads"] == ["22"] @pytest.mark.parametrize("response_strategy_name", ["storage"], scope="session") def test_visitor_puts_response_on_storage(self, visitor, body, storage, bucket_name): @@ -38,4 +38,4 @@ class TestVisitor: assert "data" not in response_body assert json.loads( decompress(storage.get_object(bucket_name=bucket_name, object_name=response_body["responseFile"])) - )["data"] == ["22"] + )["analysis_payloads"] == ["22"] diff --git a/test/unit_tests/server/aggregation_strategy_tets.py b/test/unit_tests/server/aggregation_strategy_tets.py new file mode 100644 index 0000000..268a6bf --- /dev/null +++ b/test/unit_tests/server/aggregation_strategy_tets.py @@ -0,0 +1,59 @@ +import pytest + +from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy + + +@pytest.mark.parametrize("client_name", ["mock"], scope="session") +class TestAggregationStorageStrategy: + def test_aggregation_strategy_with_no_empty_data_field(self, storage): + strat = AggregationStorageStrategy(storage=storage) + + analysis_response = { + "analysis_payloads": [ + {"data": [1], "metadata": {"id": 1}}, + {"data": [3], "metadata": {"id": 3}}, + ], + "dossierId": "dossier0", + "fileId": "file0", + "pages": [0, 2], + } + response_message_bodies = [*strat(analysis_response)] + assert response_message_bodies == [ + { + "dossierId": "dossier0", + "fileId": "file0", + "pages": [0, 2], + "id": 3, + "responseFile": "dossier0/file0/id:3.json.gz", + } + ] + + def test_aggregation_strategy_with_empty_data_field(self, storage): + strat = AggregationStorageStrategy(storage=storage) + + analysis_response = { + "analysis_payloads": [ + {"data": None, "metadata": {"id": 1}}, + {"data": [3], "metadata": {"id": 3}}, + ], + "dossierId": "dossier0", + "fileId": "file0", + "pages": [0, 2], + } + response_message_bodies = [*strat(analysis_response)] + assert response_message_bodies == [ + { + "dossierId": "dossier0", + "fileId": "file0", + "pages": [0, 2], + "id": 1, + "responseFile": "dossier0/file0/id:1.json.gz", + }, + { + "dossierId": "dossier0", + "fileId": "file0", + "pages": [0, 2], + "id": 3, + "responseFile": "dossier0/file0/id:3.json.gz", + }, + ]