- modified aggr strat to produce only one response message, which now contains a list of response files instead of a single response file

- introduced response formatter
This commit is contained in:
Matthias Bisping 2022-06-20 15:03:13 +02:00
parent 3a6e9281ce
commit 343637aaf7
14 changed files with 82 additions and 31 deletions

View File

@ -8,6 +8,7 @@ service:
# Specifies, how to handle the `page` key of a request. "multi" will download all pages matching the list of pages
# specified in the request
download_strategy: $DOWNLOAD_STRATEGY|single
response_formatter: default
probing_webserver:
host: $PROBING_WEBSERVER_HOST|"0.0.0.0" # Probe webserver address

View File

@ -15,6 +15,8 @@ from pyinfra.server.receiver.receivers.rest import RestReceiver
from pyinfra.storage import storages
from pyinfra.visitor import QueueVisitor
from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy
from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter
from pyinfra.visitor.response_formatter.formatters.default import DefaultResponseFormatter
logger = logging.getLogger(__name__)
@ -27,7 +29,12 @@ def get_consumer(callback=None):
@lru_cache(maxsize=None)
def get_visitor(callback):
return QueueVisitor(storage=get_storage(), callback=callback, response_strategy=get_response_strategy())
return QueueVisitor(
storage=get_storage(),
callback=callback,
response_strategy=get_response_strategy(),
response_formatter=get_response_formatter(),
)
@lru_cache(maxsize=None)
@ -59,6 +66,14 @@ def get_response_strategy(storage=None):
return AggregationStorageStrategy(storage or get_storage())
@lru_cache(maxsize=None)
def get_response_formatter():
return {
"default": DefaultResponseFormatter(),
"identity": IdentityResponseFormatter()
}[CONFIG.service.response_formatter]
class Callback:
def __init__(self, base_url):
self.base_url = base_url

View File

@ -0,0 +1,10 @@
import abc
class ResponseFormatter(abc.ABC):
def __call__(self, message):
return self.format(message)
@abc.abstractmethod
def format(self, message):
pass

View File

@ -0,0 +1,13 @@
from funcy import first
from pyinfra.visitor.response_formatter.formatter import ResponseFormatter
class DefaultResponseFormatter(ResponseFormatter):
"""
TODO: Extend via using enums throughout the codebase instead of strings.
See the enum-formatter in image-prediction service for reference.
"""
def format(self, message):
return {**message, "responseFile": first(message["response_files"])}

View File

@ -0,0 +1,6 @@
from pyinfra.visitor.response_formatter.formatter import ResponseFormatter
class IdentityResponseFormatter(ResponseFormatter):
def format(self, message):
return message

View File

@ -15,6 +15,7 @@ class AggregationStorageStrategy(ResponseStrategy):
self.storage = storage
self.merger = merger or list
self.buffer = deque()
self.response_files = deque()
def handle_response(self, analysis_response, final=False):
def upload_or_aggregate(analysis_payload):
@ -34,23 +35,26 @@ class AggregationStorageStrategy(ResponseStrategy):
if analysis_payload["data"] or last:
self.upload_aggregated_items(object_descriptor)
# TODO: mappings such as object_name -> responseFile should be put in a separate interface mapping layer.
# See the enum-formatter in image-prediction service for reference.
return {**storage_upload_info, "responseFile": object_descriptor["object_name"]}
else:
return Nothing
self.response_files.append(object_descriptor["object_name"])
# TODO: aggregate response files and make responseFile -> responseFiles
return self.build_response_message(storage_upload_info) if last else Nothing
def add_analysis_payload_to_buffer(self, analysis_payload):
self.buffer.append({**analysis_payload, "metadata": omit(analysis_payload["metadata"], ["id"])})
def upload_aggregated_items(self, object_descriptor):
self.upload_item(self.merge_queue_items(), object_descriptor)
def build_response_message(self, storage_upload_info):
response_files = [*self.response_files]
self.response_files.clear()
return {**storage_upload_info, "response_files": response_files}
def upload_item(self, analysis_payload, object_descriptor):
self.storage.put_object(**object_descriptor, data=pack_analysis_payload(analysis_payload))
def upload_aggregated_items(self, object_descriptor):
return self.upload_item(self.merge_queue_items(), object_descriptor)
def merge_queue_items(self):
merged_buffer_content = self.merger(self.buffer)
self.buffer.clear()

View File

@ -12,5 +12,5 @@ class StorageStrategy(ResponseStrategy):
response_object_descriptor = self.get_response_object_descriptor(body)
self.storage.put_object(**response_object_descriptor, data=compress(json.dumps(body).encode()))
body.pop("analysis_payloads")
body["responseFile"] = response_object_descriptor["object_name"]
body["response_files"] = [response_object_descriptor["object_name"]]
return body

View File

@ -4,9 +4,11 @@ from funcy import lflatten, compose
from pyinfra.storage.storage import Storage
from pyinfra.utils.func import lift
from pyinfra.visitor.strategies.download.download import DownloadStrategy
from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy
from pyinfra.visitor.response_formatter.formatter import ResponseFormatter
from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter
from pyinfra.visitor.strategies.blob_parsing.blob_parsing import BlobParsingStrategy
from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy
from pyinfra.visitor.strategies.download.download import DownloadStrategy
from pyinfra.visitor.strategies.response.response import ResponseStrategy
from pyinfra.visitor.strategies.response.storage import StorageStrategy
from pyinfra.visitor.utils import standardize, get_download_strategy
@ -20,6 +22,7 @@ class QueueVisitor:
download_strategy: DownloadStrategy = None,
parsing_strategy: BlobParsingStrategy = None,
response_strategy: ResponseStrategy = None,
response_formatter: ResponseFormatter = None,
):
"""Processes queue messages that specify items on a storage to process with a given callback.
@ -38,10 +41,14 @@ class QueueVisitor:
self.download_strategy = download_strategy or get_download_strategy()
self.parsing_strategy = parsing_strategy or DynamicParsingStrategy()
self.response_strategy = response_strategy or StorageStrategy()
self.response_formatter = response_formatter or IdentityResponseFormatter()
def __call__(self, queue_item_body):
analysis_response = self.load_items_from_storage_and_process_with_callback(queue_item_body)
return self.response_strategy(analysis_response)
response = self.response_strategy(analysis_response)
response = self.response_formatter(response)
return response
def load_items_from_storage_and_process_with_callback(self, queue_item_body):
"""Bundles the result from processing a storage item with the body of the corresponding queue item."""

View File

@ -4,6 +4,8 @@ from pyinfra.config import CONFIG as MAIN_CONFIG
MAIN_CONFIG["retry"]["delay"] = 0.1
MAIN_CONFIG["retry"]["jitter"] = (0.1, 0.2)
MAIN_CONFIG["service"]["response_formatter"] = "identity"
import logging
import time
from unittest.mock import Mock

View File

@ -4,7 +4,7 @@ from itertools import starmap, repeat, chain
from operator import itemgetter
import pytest
from funcy import compose, lpluck, first, second
from funcy import compose, lpluck, first, second, pluck, lflatten
from pyinfra.default_objects import (
get_callback,
@ -172,7 +172,7 @@ def build_filepath(object_descriptor, page):
def get_data_uploaded_by_consumer(queue_manager, storage, bucket_name):
names_of_uploaded_files = lpluck("responseFile", queue_manager.output_queue.to_list())
names_of_uploaded_files = lflatten(pluck("response_files", queue_manager.output_queue.to_list()))
uploaded_files = starmap(storage.get_object, zip(repeat(bucket_name), names_of_uploaded_files))
outputs = sorted(chain(*map(decode, uploaded_files)), key=itemgetter(0))
return outputs

View File

@ -1,6 +1,7 @@
import json
import pytest
from funcy import first
from pyinfra.utils.encoding import decompress
from test.utils.storage import pack_for_upload
@ -37,5 +38,5 @@ class TestVisitor:
response_body = visitor(body)
assert "data" not in response_body
assert json.loads(
decompress(storage.get_object(bucket_name=bucket_name, object_name=response_body["responseFile"]))
decompress(storage.get_object(bucket_name=bucket_name, object_name=first(response_body["response_files"])))
)["analysis_payloads"] == ["22"]

View File

@ -7,7 +7,7 @@ from pyinfra.visitor.strategies.response.aggregation import AggregationStorageSt
@pytest.mark.parametrize("client_name", ["mock"], scope="session")
class TestAggregationStorageStrategy:
def test_aggregation_strategy_with_no_empty_data_field(self, storage):
def test_aggregation_strategy_with_no_empty_data_field_causes_two_uploads(self, storage):
strat = AggregationStorageStrategy(storage=storage)
analysis_response = {
@ -21,24 +21,16 @@ class TestAggregationStorageStrategy:
}
response_message_bodies = [*strat(analysis_response)]
assert response_message_bodies == [
{
"dossierId": "dossier0",
"fileId": "file0",
"pages": [0, 2],
"id": 1,
"responseFile": "dossier0/file0/id:1.json.gz",
},
{
"dossierId": "dossier0",
"fileId": "file0",
"pages": [0, 2],
"id": 3,
"responseFile": "dossier0/file0/id:3.json.gz",
},
"response_files": ["dossier0/file0/id:1.json.gz", "dossier0/file0/id:3.json.gz"],
}
]
def test_aggregation_strategy_with_empty_data_field(self, storage):
def test_aggregation_strategy_with_empty_data_field_causes_single_uploads(self, storage):
strat = AggregationStorageStrategy(storage=storage)
analysis_response = {
@ -57,6 +49,6 @@ class TestAggregationStorageStrategy:
"fileId": "file0",
"pages": [0, 2],
"id": 3,
"responseFile": "dossier0/file0/id:3.json.gz",
},
"response_files": ["dossier0/file0/id:3.json.gz"],
}
]