refactoring; removed dispatch callback from aggregation strategy

This commit is contained in:
Matthias Bisping 2022-06-20 12:27:40 +02:00
parent aeb60a97a9
commit 5392d03295
9 changed files with 34 additions and 60 deletions

View File

@ -1,11 +1,9 @@
import gzip
import json
from pyinfra.server.packing import bytes_to_string
def pack_for_upload(data: bytes):
return compress(json.dumps(bytes_to_string(data)).encode())
def pack_analysis_payload(analysis_payload):
return compress(json.dumps(analysis_payload).encode())
def compress(data: bytes):

View File

@ -1,8 +0,0 @@
import abc
# TODO: rename
class DispatchCallback(abc.ABC):
@abc.abstractmethod
def __call__(self, payload):
pass

View File

@ -1,21 +0,0 @@
from _operator import itemgetter
from pyinfra.visitor.dispatch.dispatch import DispatchCallback
class IdentifierDispatchCallback(DispatchCallback):
def __init__(self):
self.identifier = None
def has_new_identifier(self, metadata):
identifier = ":".join(itemgetter("fileId", "dossierId")(metadata))
if not self.identifier:
self.identifier = identifier
return identifier != self.identifier
def __call__(self, metadata):
return self.has_new_identifier(metadata)

View File

@ -1,4 +1,3 @@
import json
from collections import deque
from typing import Callable
@ -6,18 +5,15 @@ from funcy import omit, filter
from more_itertools import peekable
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
from pyinfra.utils.encoding import compress
from pyinfra.visitor.dispatch.dispatch import DispatchCallback
from pyinfra.visitor.dispatch.identifier_dispatch import IdentifierDispatchCallback
from pyinfra.utils.encoding import pack_analysis_payload
from pyinfra.visitor.strategies.response.response import ResponseStrategy
from pyinfra.visitor.utils import build_storage_upload_info
class AggregationStorageStrategy(ResponseStrategy):
def __init__(self, storage, merger: Callable = None, dispatch_callback: DispatchCallback = None):
def __init__(self, storage, merger: Callable = None):
self.storage = storage
self.merger = merger or list
self.dispatch_callback = dispatch_callback or IdentifierDispatchCallback()
self.buffer = deque()
def handle_response(self, analysis_response, final=False):
@ -33,26 +29,26 @@ class AggregationStorageStrategy(ResponseStrategy):
"""analysis_payload : {data: ..., metadata: ...}"""
storage_upload_info = build_storage_upload_info(analysis_payload, request_metadata)
analysis_payload["metadata"].pop("id")
if analysis_payload["data"]:
return self.put_object(json.dumps(analysis_payload).encode(), storage_upload_info)
else:
self.buffer.append(analysis_payload)
if last or self.dispatch_callback(storage_upload_info):
return self.upload_queue_items(storage_upload_info)
else:
return Nothing
def put_object(self, data: bytes, storage_upload_info):
object_descriptor = self.get_response_object_descriptor(storage_upload_info)
self.storage.put_object(**object_descriptor, data=compress(data))
return {**storage_upload_info, "responseFile": object_descriptor["object_name"]}
def upload_queue_items(self, storage_upload_info):
data = json.dumps(self.merge_queue_items()).encode()
return self.put_object(data, storage_upload_info)
self.add_analysis_payload_to_buffer(analysis_payload)
if analysis_payload["data"] or last:
self.upload_aggregated_items(object_descriptor)
# TODO: mappings such as object_name -> responseFile should be put in a separate interface mapping layer.
# See the enum-formatter in image-prediction service for reference.
return {**storage_upload_info, "responseFile": object_descriptor["object_name"]}
else:
return Nothing
def add_analysis_payload_to_buffer(self, analysis_payload):
self.buffer.append({**analysis_payload, "metadata": omit(analysis_payload["metadata"], ["id"])})
def upload_item(self, analysis_payload, object_descriptor):
self.storage.put_object(**object_descriptor, data=pack_analysis_payload(analysis_payload))
def upload_aggregated_items(self, object_descriptor):
return self.upload_item(self.merge_queue_items(), object_descriptor)
def merge_queue_items(self):
merged_buffer_content = self.merger(self.buffer)

View File

@ -38,7 +38,7 @@ from test.utils.input import pair_data_with_queue_message
@pytest.mark.parametrize(
"analysis_task",
[
# False,
False,
True,
],
)

View File

@ -5,7 +5,7 @@ import pytest
from funcy import lmapcat
from pyinfra.exceptions import ProcessingFailure
from pyinfra.utils.encoding import pack_for_upload
from test.utils.storage import pack_for_upload
from pyinfra.visitor.strategies.download.single import SingleDownloadStrategy
from pyinfra.visitor.strategies.response.forwarding import ForwardingStrategy

View File

@ -2,7 +2,8 @@ import json
import pytest
from pyinfra.utils.encoding import pack_for_upload, decompress
from pyinfra.utils.encoding import decompress
from test.utils.storage import pack_for_upload
from pyinfra.visitor.strategies.download.single import SingleDownloadStrategy

8
test/utils/storage.py Normal file
View File

@ -0,0 +1,8 @@
import json
from pyinfra.server.packing import bytes_to_string
from pyinfra.utils.encoding import compress
def pack_for_upload(data: bytes):
return compress(json.dumps(bytes_to_string(data)).encode())