From 343637aaf7fd24d18ecd5738ef019225fdeee76f Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Mon, 20 Jun 2022 15:03:13 +0200 Subject: [PATCH] - modified aggr strat to produce only one response message, which now contains a list of response files instead of a single response file - introduced response formatter --- config.yaml | 1 + pyinfra/default_objects.py | 17 +++++++++++++- .../visitor/response_formatter/__init__.py | 0 .../visitor/response_formatter/formatter.py | 10 +++++++++ .../response_formatter/formatters/__init__.py | 0 .../response_formatter/formatters/default.py | 13 +++++++++++ .../response_formatter/formatters/identity.py | 6 +++++ .../strategies/response/aggregation.py | 22 +++++++++++-------- .../visitor/strategies/response/storage.py | 2 +- pyinfra/visitor/visitor.py | 13 ++++++++--- test/conftest.py | 2 ++ test/integration_tests/serve_test.py | 4 ++-- test/unit_tests/queue_visitor_test.py | 3 ++- .../server/aggregation_strategy_tets.py | 20 +++++------------ 14 files changed, 82 insertions(+), 31 deletions(-) create mode 100644 pyinfra/visitor/response_formatter/__init__.py create mode 100644 pyinfra/visitor/response_formatter/formatter.py create mode 100644 pyinfra/visitor/response_formatter/formatters/__init__.py create mode 100644 pyinfra/visitor/response_formatter/formatters/default.py create mode 100644 pyinfra/visitor/response_formatter/formatters/identity.py diff --git a/config.yaml b/config.yaml index 4a52b80..caf7eea 100755 --- a/config.yaml +++ b/config.yaml @@ -8,6 +8,7 @@ service: # Specifies, how to handle the `page` key of a request. "multi" will download all pages matching the list of pages # specified in the request download_strategy: $DOWNLOAD_STRATEGY|single + response_formatter: default probing_webserver: host: $PROBING_WEBSERVER_HOST|"0.0.0.0" # Probe webserver address diff --git a/pyinfra/default_objects.py b/pyinfra/default_objects.py index ae66410..523fde9 100644 --- a/pyinfra/default_objects.py +++ b/pyinfra/default_objects.py @@ -15,6 +15,8 @@ from pyinfra.server.receiver.receivers.rest import RestReceiver from pyinfra.storage import storages from pyinfra.visitor import QueueVisitor from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy +from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter +from pyinfra.visitor.response_formatter.formatters.default import DefaultResponseFormatter logger = logging.getLogger(__name__) @@ -27,7 +29,12 @@ def get_consumer(callback=None): @lru_cache(maxsize=None) def get_visitor(callback): - return QueueVisitor(storage=get_storage(), callback=callback, response_strategy=get_response_strategy()) + return QueueVisitor( + storage=get_storage(), + callback=callback, + response_strategy=get_response_strategy(), + response_formatter=get_response_formatter(), + ) @lru_cache(maxsize=None) @@ -59,6 +66,14 @@ def get_response_strategy(storage=None): return AggregationStorageStrategy(storage or get_storage()) +@lru_cache(maxsize=None) +def get_response_formatter(): + return { + "default": DefaultResponseFormatter(), + "identity": IdentityResponseFormatter() + }[CONFIG.service.response_formatter] + + class Callback: def __init__(self, base_url): self.base_url = base_url diff --git a/pyinfra/visitor/response_formatter/__init__.py b/pyinfra/visitor/response_formatter/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/visitor/response_formatter/formatter.py b/pyinfra/visitor/response_formatter/formatter.py new file mode 100644 index 0000000..9364bb4 --- /dev/null +++ b/pyinfra/visitor/response_formatter/formatter.py @@ -0,0 +1,10 @@ +import abc + + +class ResponseFormatter(abc.ABC): + def __call__(self, message): + return self.format(message) + + @abc.abstractmethod + def format(self, message): + pass diff --git a/pyinfra/visitor/response_formatter/formatters/__init__.py b/pyinfra/visitor/response_formatter/formatters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/visitor/response_formatter/formatters/default.py b/pyinfra/visitor/response_formatter/formatters/default.py new file mode 100644 index 0000000..43bf489 --- /dev/null +++ b/pyinfra/visitor/response_formatter/formatters/default.py @@ -0,0 +1,13 @@ +from funcy import first + +from pyinfra.visitor.response_formatter.formatter import ResponseFormatter + + +class DefaultResponseFormatter(ResponseFormatter): + """ + TODO: Extend via using enums throughout the codebase instead of strings. + See the enum-formatter in image-prediction service for reference. + """ + + def format(self, message): + return {**message, "responseFile": first(message["response_files"])} diff --git a/pyinfra/visitor/response_formatter/formatters/identity.py b/pyinfra/visitor/response_formatter/formatters/identity.py new file mode 100644 index 0000000..6d92014 --- /dev/null +++ b/pyinfra/visitor/response_formatter/formatters/identity.py @@ -0,0 +1,6 @@ +from pyinfra.visitor.response_formatter.formatter import ResponseFormatter + + +class IdentityResponseFormatter(ResponseFormatter): + def format(self, message): + return message diff --git a/pyinfra/visitor/strategies/response/aggregation.py b/pyinfra/visitor/strategies/response/aggregation.py index c3e5bd9..928b389 100644 --- a/pyinfra/visitor/strategies/response/aggregation.py +++ b/pyinfra/visitor/strategies/response/aggregation.py @@ -15,6 +15,7 @@ class AggregationStorageStrategy(ResponseStrategy): self.storage = storage self.merger = merger or list self.buffer = deque() + self.response_files = deque() def handle_response(self, analysis_response, final=False): def upload_or_aggregate(analysis_payload): @@ -34,23 +35,26 @@ class AggregationStorageStrategy(ResponseStrategy): if analysis_payload["data"] or last: self.upload_aggregated_items(object_descriptor) - # TODO: mappings such as object_name -> responseFile should be put in a separate interface mapping layer. - # See the enum-formatter in image-prediction service for reference. - return {**storage_upload_info, "responseFile": object_descriptor["object_name"]} - else: - return Nothing + self.response_files.append(object_descriptor["object_name"]) - # TODO: aggregate response files and make responseFile -> responseFiles + return self.build_response_message(storage_upload_info) if last else Nothing def add_analysis_payload_to_buffer(self, analysis_payload): self.buffer.append({**analysis_payload, "metadata": omit(analysis_payload["metadata"], ["id"])}) + def upload_aggregated_items(self, object_descriptor): + self.upload_item(self.merge_queue_items(), object_descriptor) + + def build_response_message(self, storage_upload_info): + + response_files = [*self.response_files] + self.response_files.clear() + + return {**storage_upload_info, "response_files": response_files} + def upload_item(self, analysis_payload, object_descriptor): self.storage.put_object(**object_descriptor, data=pack_analysis_payload(analysis_payload)) - def upload_aggregated_items(self, object_descriptor): - return self.upload_item(self.merge_queue_items(), object_descriptor) - def merge_queue_items(self): merged_buffer_content = self.merger(self.buffer) self.buffer.clear() diff --git a/pyinfra/visitor/strategies/response/storage.py b/pyinfra/visitor/strategies/response/storage.py index b14f410..aa5d60b 100644 --- a/pyinfra/visitor/strategies/response/storage.py +++ b/pyinfra/visitor/strategies/response/storage.py @@ -12,5 +12,5 @@ class StorageStrategy(ResponseStrategy): response_object_descriptor = self.get_response_object_descriptor(body) self.storage.put_object(**response_object_descriptor, data=compress(json.dumps(body).encode())) body.pop("analysis_payloads") - body["responseFile"] = response_object_descriptor["object_name"] + body["response_files"] = [response_object_descriptor["object_name"]] return body diff --git a/pyinfra/visitor/visitor.py b/pyinfra/visitor/visitor.py index 9f93cc3..fe61531 100644 --- a/pyinfra/visitor/visitor.py +++ b/pyinfra/visitor/visitor.py @@ -4,9 +4,11 @@ from funcy import lflatten, compose from pyinfra.storage.storage import Storage from pyinfra.utils.func import lift -from pyinfra.visitor.strategies.download.download import DownloadStrategy -from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy +from pyinfra.visitor.response_formatter.formatter import ResponseFormatter +from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter from pyinfra.visitor.strategies.blob_parsing.blob_parsing import BlobParsingStrategy +from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy +from pyinfra.visitor.strategies.download.download import DownloadStrategy from pyinfra.visitor.strategies.response.response import ResponseStrategy from pyinfra.visitor.strategies.response.storage import StorageStrategy from pyinfra.visitor.utils import standardize, get_download_strategy @@ -20,6 +22,7 @@ class QueueVisitor: download_strategy: DownloadStrategy = None, parsing_strategy: BlobParsingStrategy = None, response_strategy: ResponseStrategy = None, + response_formatter: ResponseFormatter = None, ): """Processes queue messages that specify items on a storage to process with a given callback. @@ -38,10 +41,14 @@ class QueueVisitor: self.download_strategy = download_strategy or get_download_strategy() self.parsing_strategy = parsing_strategy or DynamicParsingStrategy() self.response_strategy = response_strategy or StorageStrategy() + self.response_formatter = response_formatter or IdentityResponseFormatter() def __call__(self, queue_item_body): analysis_response = self.load_items_from_storage_and_process_with_callback(queue_item_body) - return self.response_strategy(analysis_response) + response = self.response_strategy(analysis_response) + response = self.response_formatter(response) + + return response def load_items_from_storage_and_process_with_callback(self, queue_item_body): """Bundles the result from processing a storage item with the body of the corresponding queue item.""" diff --git a/test/conftest.py b/test/conftest.py index 1293222..4291894 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -4,6 +4,8 @@ from pyinfra.config import CONFIG as MAIN_CONFIG MAIN_CONFIG["retry"]["delay"] = 0.1 MAIN_CONFIG["retry"]["jitter"] = (0.1, 0.2) +MAIN_CONFIG["service"]["response_formatter"] = "identity" + import logging import time from unittest.mock import Mock diff --git a/test/integration_tests/serve_test.py b/test/integration_tests/serve_test.py index b34d512..aa56497 100644 --- a/test/integration_tests/serve_test.py +++ b/test/integration_tests/serve_test.py @@ -4,7 +4,7 @@ from itertools import starmap, repeat, chain from operator import itemgetter import pytest -from funcy import compose, lpluck, first, second +from funcy import compose, lpluck, first, second, pluck, lflatten from pyinfra.default_objects import ( get_callback, @@ -172,7 +172,7 @@ def build_filepath(object_descriptor, page): def get_data_uploaded_by_consumer(queue_manager, storage, bucket_name): - names_of_uploaded_files = lpluck("responseFile", queue_manager.output_queue.to_list()) + names_of_uploaded_files = lflatten(pluck("response_files", queue_manager.output_queue.to_list())) uploaded_files = starmap(storage.get_object, zip(repeat(bucket_name), names_of_uploaded_files)) outputs = sorted(chain(*map(decode, uploaded_files)), key=itemgetter(0)) return outputs diff --git a/test/unit_tests/queue_visitor_test.py b/test/unit_tests/queue_visitor_test.py index 4bb2974..a7cdb41 100644 --- a/test/unit_tests/queue_visitor_test.py +++ b/test/unit_tests/queue_visitor_test.py @@ -1,6 +1,7 @@ import json import pytest +from funcy import first from pyinfra.utils.encoding import decompress from test.utils.storage import pack_for_upload @@ -37,5 +38,5 @@ class TestVisitor: response_body = visitor(body) assert "data" not in response_body assert json.loads( - decompress(storage.get_object(bucket_name=bucket_name, object_name=response_body["responseFile"])) + decompress(storage.get_object(bucket_name=bucket_name, object_name=first(response_body["response_files"]))) )["analysis_payloads"] == ["22"] diff --git a/test/unit_tests/server/aggregation_strategy_tets.py b/test/unit_tests/server/aggregation_strategy_tets.py index c794dc8..3c40527 100644 --- a/test/unit_tests/server/aggregation_strategy_tets.py +++ b/test/unit_tests/server/aggregation_strategy_tets.py @@ -7,7 +7,7 @@ from pyinfra.visitor.strategies.response.aggregation import AggregationStorageSt @pytest.mark.parametrize("client_name", ["mock"], scope="session") class TestAggregationStorageStrategy: - def test_aggregation_strategy_with_no_empty_data_field(self, storage): + def test_aggregation_strategy_with_no_empty_data_field_causes_two_uploads(self, storage): strat = AggregationStorageStrategy(storage=storage) analysis_response = { @@ -21,24 +21,16 @@ class TestAggregationStorageStrategy: } response_message_bodies = [*strat(analysis_response)] assert response_message_bodies == [ - { - "dossierId": "dossier0", - "fileId": "file0", - "pages": [0, 2], - "id": 1, - "responseFile": "dossier0/file0/id:1.json.gz", - }, { "dossierId": "dossier0", "fileId": "file0", "pages": [0, 2], "id": 3, - "responseFile": "dossier0/file0/id:3.json.gz", - }, + "response_files": ["dossier0/file0/id:1.json.gz", "dossier0/file0/id:3.json.gz"], + } ] - - def test_aggregation_strategy_with_empty_data_field(self, storage): + def test_aggregation_strategy_with_empty_data_field_causes_single_uploads(self, storage): strat = AggregationStorageStrategy(storage=storage) analysis_response = { @@ -57,6 +49,6 @@ class TestAggregationStorageStrategy: "fileId": "file0", "pages": [0, 2], "id": 3, - "responseFile": "dossier0/file0/id:3.json.gz", - }, + "response_files": ["dossier0/file0/id:3.json.gz"], + } ]