From 2d1ec1671435df243421a9f5e21e441344f7a1ec Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Wed, 25 May 2022 16:56:08 +0200 Subject: [PATCH] modified serve test to use components from fixtures; response file path depending on response metadata and request page index WIP --- pyinfra/default_objects.py | 4 +- pyinfra/visitor.py | 36 +++++++--- test/fixtures/server.py | 5 +- test/integration_tests/serve_test.py | 101 +++++++++++++++++++++------ test/utils/input.py | 1 + 5 files changed, 110 insertions(+), 37 deletions(-) diff --git a/pyinfra/default_objects.py b/pyinfra/default_objects.py index ee11efb..3beee27 100644 --- a/pyinfra/default_objects.py +++ b/pyinfra/default_objects.py @@ -44,8 +44,8 @@ def get_callback(analysis_endpoint=None): @lru_cache(maxsize=None) -def get_response_strategy(): - return AggregationStorageStrategy(get_storage()) +def get_response_strategy(storage=None): + return AggregationStorageStrategy(storage or get_storage()) @lru_cache(maxsize=None) diff --git a/pyinfra/visitor.py b/pyinfra/visitor.py index bb0eb85..70d2d21 100644 --- a/pyinfra/visitor.py +++ b/pyinfra/visitor.py @@ -1,8 +1,10 @@ import abc import gzip +import hashlib import json import logging import random +import time from collections import deque from operator import itemgetter from typing import Callable @@ -15,6 +17,15 @@ from pyinfra.server.packing import string_to_bytes, bytes_to_string from pyinfra.storage.storage import Storage +def unique_hash(pages, seed=""): + assert isinstance(seed, str) + pages_str = "-".join(pages) + seed = seed or str(time.time()) + rand_str = (pages_str + seed).encode(encoding="UTF-8", errors="strict") + hsh = hashlib.md5(rand_str).hexdigest() + return hsh + + def get_object_name(body): dossier_id, file_id, target_file_extension = itemgetter("dossierId", "fileId", "targetFileExtension")(body) object_name = f"{dossier_id}/{file_id}.{target_file_extension}" @@ -22,8 +33,10 @@ def get_object_name(body): def get_response_object_name(body): - dossier_id, file_id, response_file_extension = itemgetter("dossierId", "fileId", "responseFileExtension")(body) - object_name = f"{dossier_id}/{file_id}.{response_file_extension}" + dossier_id, file_id, pages, response_file_extension = itemgetter( + "dossierId", "fileId", "pages", "responseFileExtension" + )(body) + object_name = f"{dossier_id}/{file_id}_{unique_hash(pages)}.{response_file_extension}" return object_name @@ -34,8 +47,7 @@ def get_object_descriptor(body): def get_response_object_descriptor(body): return { "bucket_name": parse_disjunction_string(CONFIG.storage.bucket), - "object_name": get_response_object_name(body) - + str(random.randint(0, 100)), # TODO: this random suffix should be built by some policy + "object_name": get_response_object_name(body), } @@ -101,6 +113,7 @@ class AggregationStorageStrategy(ResponseStrategy): def put_object(self, data: bytes, metadata): object_descriptor = get_response_object_descriptor(metadata) + # TODO: object_descriptor needs suffix self.storage.put_object(**object_descriptor, data=data) def merge_queue_items(self): @@ -112,15 +125,18 @@ class AggregationStorageStrategy(ResponseStrategy): data = json.dumps(self.merge_queue_items()).encode() self.put_object(data, metadata) - def upload_or_aggregate(self, data, metadata): + def upload_or_aggregate(self, analysis_payload, request_metadata): + """ + analysis_payload : {data: ..., metadata: ...} + """ - if isinstance(data, str): - self.put_object(data.encode(), metadata) + if isinstance(analysis_payload, str): + self.put_object(analysis_payload.encode(), request_metadata) else: - self.buffer.append(data) - if self.dispatch_callback(metadata): - self.upload_queue_items(metadata) + self.buffer.append(analysis_payload) + if self.dispatch_callback(request_metadata): + self.upload_queue_items(request_metadata) def handle_response(self, payload, final=False): request_metadata = omit(payload, ["result_data"]) diff --git a/test/fixtures/server.py b/test/fixtures/server.py index 6c94fc2..c03d05f 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -109,8 +109,9 @@ def core_operation(item_type, one_to_many, analysis_task): try: return params2op[one_to_many][item_type][analysis_task] except KeyError: - pytest.skip(f"No operation defined for parameter combination.") - logger.debug(f"No operation defined for [{one_to_many=}, {item_type=}, {analysis_task=}].") + msg = f"No operation defined for [{one_to_many=}, {item_type=}, {analysis_task=}]." + pytest.skip(msg) + logger.debug(msg) return Nothing diff --git a/test/integration_tests/serve_test.py b/test/integration_tests/serve_test.py index 9400d44..394887c 100644 --- a/test/integration_tests/serve_test.py +++ b/test/integration_tests/serve_test.py @@ -2,15 +2,20 @@ import gzip import json import logging from itertools import starmap, repeat, chain +from operator import itemgetter import pytest from frozendict import frozendict from funcy import lfilter, compose, lzip -from pyinfra.default_objects import get_visitor, get_queue_manager, get_storage, get_consumer, get_callback +from pyinfra.default_objects import ( + get_callback, + get_response_strategy, +) +from pyinfra.queue.consumer import Consumer from pyinfra.server.packing import bytes_to_string, unpack, pack from pyinfra.utils.func import star -from pyinfra.visitor import get_object_descriptor +from pyinfra.visitor import get_object_descriptor, QueueVisitor from test.utils.input import adorn_data_with_storage_info logger = logging.getLogger(__name__) @@ -20,30 +25,87 @@ def freeze(data, metadata): return data, frozendict(metadata) -@pytest.mark.parametrize("one_to_many", [False, True]) -@pytest.mark.parametrize("analysis_task", [False, True]) +@pytest.fixture +def components(endpoint, bucket_name, queue_manager, storage): + + callback = get_callback(endpoint) + consumer = Consumer(callback, queue_manager) + + visitor = QueueVisitor(storage, callback, get_response_strategy(storage)) + + return visitor, queue_manager, storage, consumer + + +def decode(storage_item): + storage_item = json.loads(storage_item.decode()) + if not isinstance(storage_item, list): + storage_item = [storage_item] + + yield from map(compose(star(freeze), unpack), storage_item) + + +@pytest.mark.parametrize( + "one_to_many", + [ + False, + True, + ], +) +@pytest.mark.parametrize( + "analysis_task", + [ + False, + True, + ], +) @pytest.mark.parametrize("n_items", [2]) @pytest.mark.parametrize("n_pages", [1]) @pytest.mark.parametrize("buffer_size", [2]) -@pytest.mark.parametrize("storage_item_has_metadata", [True, False]) -@pytest.mark.parametrize("item_type", ["string", "image", "pdf"]) +@pytest.mark.parametrize( + "storage_item_has_metadata", + [ + True, + False, + ], +) +@pytest.mark.parametrize( + "item_type", + [ + "string", + "image", + "pdf", + ], +) +@pytest.mark.parametrize( + "queue_manager_name", + [ + "mock", + # "pika", + ], + scope="session", +) +@pytest.mark.parametrize( + "client_name", + [ + "mock", + # "s3", + # "azure", + ], + scope="session", +) def test_serving( server_process, input_data_items, + unencoded_input_data, metadata, bucket_name, - endpoint, + components, core_operation, storage_item_has_metadata, target_data_items, targets, ): - - callback = get_callback(endpoint) - visitor = get_visitor(callback) - queue_manager = get_queue_manager() - storage = get_storage() - consumer = get_consumer(callback) + visitor, queue_manager, storage, consumer = components queue_manager.clear() storage.clear_bucket(bucket_name) @@ -55,7 +117,7 @@ def test_serving( metadata = repeat({}) targets = lzip(target_data_items, metadata) - targets = {*starmap(freeze, targets)} + targets = sorted(starmap(freeze, targets), key=itemgetter(0)) adorned_data_metadata_packs = adorn_data_with_storage_info(data_metadata_packs) @@ -65,18 +127,11 @@ def test_serving( reqs = consumer.consume(inactivity_timeout=5) - for itm, req in zip(adorned_data_metadata_packs, reqs): + for _, req in zip(adorned_data_metadata_packs, reqs): queue_manager.publish_response(req, visitor) - def decode(storage_item): - storage_item = json.loads(storage_item.decode()) - if not isinstance(storage_item, list): - storage_item = [storage_item] - - yield from map(compose(star(freeze), unpack), storage_item) - names_of_uploaded_files = lfilter(".out", storage.get_all_object_names(bucket_name)) uploaded_files = [storage.get_object(bucket_name, fn) for fn in names_of_uploaded_files] - outputs = {*chain(*map(decode, uploaded_files))} + outputs = sorted(chain(*map(decode, uploaded_files)), key=itemgetter(0)) assert outputs == targets diff --git a/test/utils/input.py b/test/utils/input.py index b760040..ccd3768 100644 --- a/test/utils/input.py +++ b/test/utils/input.py @@ -9,6 +9,7 @@ def adorn_data_with_storage_info(data: Iterable[bytes]): "fileId": f"file{i}", "targetFileExtension": "in.gz", "responseFileExtension": "out.gz", + "pages": [] } yield d, body