From 5392d032959e7b03bd28164cd496d77ff1026ea3 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Mon, 20 Jun 2022 12:27:40 +0200 Subject: [PATCH] refactoring; removed dispatch callback from aggregation strategy --- pyinfra/utils/encoding.py | 6 +-- pyinfra/visitor/dispatch/__init__.py | 0 pyinfra/visitor/dispatch/dispatch.py | 8 ---- .../visitor/dispatch/identifier_dispatch.py | 21 --------- .../strategies/response/aggregation.py | 44 +++++++++---------- test/integration_tests/serve_test.py | 2 +- test/unit_tests/consumer_test.py | 2 +- test/unit_tests/queue_visitor_test.py | 3 +- test/utils/storage.py | 8 ++++ 9 files changed, 34 insertions(+), 60 deletions(-) delete mode 100644 pyinfra/visitor/dispatch/__init__.py delete mode 100644 pyinfra/visitor/dispatch/dispatch.py delete mode 100644 pyinfra/visitor/dispatch/identifier_dispatch.py create mode 100644 test/utils/storage.py diff --git a/pyinfra/utils/encoding.py b/pyinfra/utils/encoding.py index 146853b..57f60bb 100644 --- a/pyinfra/utils/encoding.py +++ b/pyinfra/utils/encoding.py @@ -1,11 +1,9 @@ import gzip import json -from pyinfra.server.packing import bytes_to_string - -def pack_for_upload(data: bytes): - return compress(json.dumps(bytes_to_string(data)).encode()) +def pack_analysis_payload(analysis_payload): + return compress(json.dumps(analysis_payload).encode()) def compress(data: bytes): diff --git a/pyinfra/visitor/dispatch/__init__.py b/pyinfra/visitor/dispatch/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pyinfra/visitor/dispatch/dispatch.py b/pyinfra/visitor/dispatch/dispatch.py deleted file mode 100644 index a2e25d5..0000000 --- a/pyinfra/visitor/dispatch/dispatch.py +++ /dev/null @@ -1,8 +0,0 @@ -import abc - - -# TODO: rename -class DispatchCallback(abc.ABC): - @abc.abstractmethod - def __call__(self, payload): - pass diff --git a/pyinfra/visitor/dispatch/identifier_dispatch.py b/pyinfra/visitor/dispatch/identifier_dispatch.py deleted file mode 100644 index 17842ee..0000000 --- a/pyinfra/visitor/dispatch/identifier_dispatch.py +++ /dev/null @@ -1,21 +0,0 @@ -from _operator import itemgetter - -from pyinfra.visitor.dispatch.dispatch import DispatchCallback - - -class IdentifierDispatchCallback(DispatchCallback): - def __init__(self): - self.identifier = None - - def has_new_identifier(self, metadata): - - identifier = ":".join(itemgetter("fileId", "dossierId")(metadata)) - - if not self.identifier: - self.identifier = identifier - - return identifier != self.identifier - - def __call__(self, metadata): - - return self.has_new_identifier(metadata) diff --git a/pyinfra/visitor/strategies/response/aggregation.py b/pyinfra/visitor/strategies/response/aggregation.py index d3845f9..f799092 100644 --- a/pyinfra/visitor/strategies/response/aggregation.py +++ b/pyinfra/visitor/strategies/response/aggregation.py @@ -1,4 +1,3 @@ -import json from collections import deque from typing import Callable @@ -6,18 +5,15 @@ from funcy import omit, filter from more_itertools import peekable from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing -from pyinfra.utils.encoding import compress -from pyinfra.visitor.dispatch.dispatch import DispatchCallback -from pyinfra.visitor.dispatch.identifier_dispatch import IdentifierDispatchCallback +from pyinfra.utils.encoding import pack_analysis_payload from pyinfra.visitor.strategies.response.response import ResponseStrategy from pyinfra.visitor.utils import build_storage_upload_info class AggregationStorageStrategy(ResponseStrategy): - def __init__(self, storage, merger: Callable = None, dispatch_callback: DispatchCallback = None): + def __init__(self, storage, merger: Callable = None): self.storage = storage self.merger = merger or list - self.dispatch_callback = dispatch_callback or IdentifierDispatchCallback() self.buffer = deque() def handle_response(self, analysis_response, final=False): @@ -33,26 +29,26 @@ class AggregationStorageStrategy(ResponseStrategy): """analysis_payload : {data: ..., metadata: ...}""" storage_upload_info = build_storage_upload_info(analysis_payload, request_metadata) - analysis_payload["metadata"].pop("id") - - if analysis_payload["data"]: - return self.put_object(json.dumps(analysis_payload).encode(), storage_upload_info) - - else: - self.buffer.append(analysis_payload) - if last or self.dispatch_callback(storage_upload_info): - return self.upload_queue_items(storage_upload_info) - else: - return Nothing - - def put_object(self, data: bytes, storage_upload_info): object_descriptor = self.get_response_object_descriptor(storage_upload_info) - self.storage.put_object(**object_descriptor, data=compress(data)) - return {**storage_upload_info, "responseFile": object_descriptor["object_name"]} - def upload_queue_items(self, storage_upload_info): - data = json.dumps(self.merge_queue_items()).encode() - return self.put_object(data, storage_upload_info) + self.add_analysis_payload_to_buffer(analysis_payload) + + if analysis_payload["data"] or last: + self.upload_aggregated_items(object_descriptor) + # TODO: mappings such as object_name -> responseFile should be put in a separate interface mapping layer. + # See the enum-formatter in image-prediction service for reference. + return {**storage_upload_info, "responseFile": object_descriptor["object_name"]} + else: + return Nothing + + def add_analysis_payload_to_buffer(self, analysis_payload): + self.buffer.append({**analysis_payload, "metadata": omit(analysis_payload["metadata"], ["id"])}) + + def upload_item(self, analysis_payload, object_descriptor): + self.storage.put_object(**object_descriptor, data=pack_analysis_payload(analysis_payload)) + + def upload_aggregated_items(self, object_descriptor): + return self.upload_item(self.merge_queue_items(), object_descriptor) def merge_queue_items(self): merged_buffer_content = self.merger(self.buffer) diff --git a/test/integration_tests/serve_test.py b/test/integration_tests/serve_test.py index 92790e7..b34d512 100644 --- a/test/integration_tests/serve_test.py +++ b/test/integration_tests/serve_test.py @@ -38,7 +38,7 @@ from test.utils.input import pair_data_with_queue_message @pytest.mark.parametrize( "analysis_task", [ - # False, + False, True, ], ) diff --git a/test/unit_tests/consumer_test.py b/test/unit_tests/consumer_test.py index a15d782..0fc75f4 100644 --- a/test/unit_tests/consumer_test.py +++ b/test/unit_tests/consumer_test.py @@ -5,7 +5,7 @@ import pytest from funcy import lmapcat from pyinfra.exceptions import ProcessingFailure -from pyinfra.utils.encoding import pack_for_upload +from test.utils.storage import pack_for_upload from pyinfra.visitor.strategies.download.single import SingleDownloadStrategy from pyinfra.visitor.strategies.response.forwarding import ForwardingStrategy diff --git a/test/unit_tests/queue_visitor_test.py b/test/unit_tests/queue_visitor_test.py index 5b722c8..1bf3247 100644 --- a/test/unit_tests/queue_visitor_test.py +++ b/test/unit_tests/queue_visitor_test.py @@ -2,7 +2,8 @@ import json import pytest -from pyinfra.utils.encoding import pack_for_upload, decompress +from pyinfra.utils.encoding import decompress +from test.utils.storage import pack_for_upload from pyinfra.visitor.strategies.download.single import SingleDownloadStrategy diff --git a/test/utils/storage.py b/test/utils/storage.py new file mode 100644 index 0000000..e13fee8 --- /dev/null +++ b/test/utils/storage.py @@ -0,0 +1,8 @@ +import json + +from pyinfra.server.packing import bytes_to_string +from pyinfra.utils.encoding import compress + + +def pack_for_upload(data: bytes): + return compress(json.dumps(bytes_to_string(data)).encode())