added aggregation strategy test + fixed bug in check for empty data in refactored aggregation strategy

This commit is contained in:
Matthias Bisping 2022-06-20 14:03:01 +02:00
parent e73cfe5d87
commit 0d455422b7
5 changed files with 70 additions and 12 deletions

View File

@ -18,12 +18,11 @@ class AggregationStorageStrategy(ResponseStrategy):
def handle_response(self, analysis_response, final=False):
def upload_or_aggregate(analysis_payload):
return self.upload_or_aggregate(analysis_payload, request_metadata, last=not result_data.peek(False))
request_metadata = omit(analysis_response, ["analysis_payloads"])
return self.upload_or_aggregate(analysis_payload, request_metadata, last=not analysis_payloads.peek(False))
request_metadata = omit(analysis_response, ["data"])
result_data = peekable(analysis_response["data"])
yield from filter(is_not_nothing, map(upload_or_aggregate, result_data))
analysis_payloads = peekable(analysis_response["analysis_payloads"])
yield from filter(is_not_nothing, map(upload_or_aggregate, analysis_payloads))
def upload_or_aggregate(self, analysis_payload, request_metadata, last=False):
"""analysis_payload : {data: ..., metadata: ...}"""
@ -33,7 +32,7 @@ class AggregationStorageStrategy(ResponseStrategy):
self.add_analysis_payload_to_buffer(analysis_payload)
if analysis_payload["data"] or last:
if not analysis_payload["data"] or last:
self.upload_aggregated_items(object_descriptor)
# TODO: mappings such as object_name -> responseFile should be put in a separate interface mapping layer.
# See the enum-formatter in image-prediction service for reference.

View File

@ -11,6 +11,6 @@ class StorageStrategy(ResponseStrategy):
def handle_response(self, body: dict):
response_object_descriptor = self.get_response_object_descriptor(body)
self.storage.put_object(**response_object_descriptor, data=compress(json.dumps(body).encode()))
body.pop("data")
body.pop("analysis_payloads")
body["responseFile"] = response_object_descriptor["object_name"]
return body

View File

@ -40,8 +40,8 @@ class QueueVisitor:
self.response_strategy = response_strategy or StorageStrategy()
def __call__(self, queue_item_body):
analysis_result_body = self.load_items_from_storage_and_process_with_callback(queue_item_body)
return self.response_strategy(analysis_result_body)
analysis_response = self.load_items_from_storage_and_process_with_callback(queue_item_body)
return self.response_strategy(analysis_response)
def load_items_from_storage_and_process_with_callback(self, queue_item_body):
"""Bundles the result from processing a storage item with the body of the corresponding queue item."""
@ -52,7 +52,7 @@ class QueueVisitor:
self.load_data,
)(queue_item_body)
return {"data": callback_results, **queue_item_body}
return {"analysis_payloads": callback_results, **queue_item_body}
def get_item_processor(self, queue_item_body):
def process_storage_item(storage_item):

View File

@ -28,7 +28,7 @@ class TestVisitor:
storage.clear_bucket(bucket_name)
storage.put_object(**SingleDownloadStrategy().get_object_descriptor(body), data=pack_for_upload(b"2"))
response_body = visitor.load_items_from_storage_and_process_with_callback(body)
assert response_body["data"] == ["22"]
assert response_body["analysis_payloads"] == ["22"]
@pytest.mark.parametrize("response_strategy_name", ["storage"], scope="session")
def test_visitor_puts_response_on_storage(self, visitor, body, storage, bucket_name):
@ -38,4 +38,4 @@ class TestVisitor:
assert "data" not in response_body
assert json.loads(
decompress(storage.get_object(bucket_name=bucket_name, object_name=response_body["responseFile"]))
)["data"] == ["22"]
)["analysis_payloads"] == ["22"]

View File

@ -0,0 +1,59 @@
import pytest
from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy
@pytest.mark.parametrize("client_name", ["mock"], scope="session")
class TestAggregationStorageStrategy:
def test_aggregation_strategy_with_no_empty_data_field(self, storage):
strat = AggregationStorageStrategy(storage=storage)
analysis_response = {
"analysis_payloads": [
{"data": [1], "metadata": {"id": 1}},
{"data": [3], "metadata": {"id": 3}},
],
"dossierId": "dossier0",
"fileId": "file0",
"pages": [0, 2],
}
response_message_bodies = [*strat(analysis_response)]
assert response_message_bodies == [
{
"dossierId": "dossier0",
"fileId": "file0",
"pages": [0, 2],
"id": 3,
"responseFile": "dossier0/file0/id:3.json.gz",
}
]
def test_aggregation_strategy_with_empty_data_field(self, storage):
strat = AggregationStorageStrategy(storage=storage)
analysis_response = {
"analysis_payloads": [
{"data": None, "metadata": {"id": 1}},
{"data": [3], "metadata": {"id": 3}},
],
"dossierId": "dossier0",
"fileId": "file0",
"pages": [0, 2],
}
response_message_bodies = [*strat(analysis_response)]
assert response_message_bodies == [
{
"dossierId": "dossier0",
"fileId": "file0",
"pages": [0, 2],
"id": 1,
"responseFile": "dossier0/file0/id:1.json.gz",
},
{
"dossierId": "dossier0",
"fileId": "file0",
"pages": [0, 2],
"id": 3,
"responseFile": "dossier0/file0/id:3.json.gz",
},
]