modified serve test to use components from fixtures; response file path depending on response metadata and request page index WIP

This commit is contained in:
Matthias Bisping 2022-05-25 16:56:08 +02:00
parent 9e2ed6a9f9
commit 2d1ec16714
5 changed files with 110 additions and 37 deletions

View File

@ -44,8 +44,8 @@ def get_callback(analysis_endpoint=None):
@lru_cache(maxsize=None)
def get_response_strategy():
return AggregationStorageStrategy(get_storage())
def get_response_strategy(storage=None):
return AggregationStorageStrategy(storage or get_storage())
@lru_cache(maxsize=None)

View File

@ -1,8 +1,10 @@
import abc
import gzip
import hashlib
import json
import logging
import random
import time
from collections import deque
from operator import itemgetter
from typing import Callable
@ -15,6 +17,15 @@ from pyinfra.server.packing import string_to_bytes, bytes_to_string
from pyinfra.storage.storage import Storage
def unique_hash(pages, seed=""):
assert isinstance(seed, str)
pages_str = "-".join(pages)
seed = seed or str(time.time())
rand_str = (pages_str + seed).encode(encoding="UTF-8", errors="strict")
hsh = hashlib.md5(rand_str).hexdigest()
return hsh
def get_object_name(body):
dossier_id, file_id, target_file_extension = itemgetter("dossierId", "fileId", "targetFileExtension")(body)
object_name = f"{dossier_id}/{file_id}.{target_file_extension}"
@ -22,8 +33,10 @@ def get_object_name(body):
def get_response_object_name(body):
dossier_id, file_id, response_file_extension = itemgetter("dossierId", "fileId", "responseFileExtension")(body)
object_name = f"{dossier_id}/{file_id}.{response_file_extension}"
dossier_id, file_id, pages, response_file_extension = itemgetter(
"dossierId", "fileId", "pages", "responseFileExtension"
)(body)
object_name = f"{dossier_id}/{file_id}_{unique_hash(pages)}.{response_file_extension}"
return object_name
@ -34,8 +47,7 @@ def get_object_descriptor(body):
def get_response_object_descriptor(body):
return {
"bucket_name": parse_disjunction_string(CONFIG.storage.bucket),
"object_name": get_response_object_name(body)
+ str(random.randint(0, 100)), # TODO: this random suffix should be built by some policy
"object_name": get_response_object_name(body),
}
@ -101,6 +113,7 @@ class AggregationStorageStrategy(ResponseStrategy):
def put_object(self, data: bytes, metadata):
object_descriptor = get_response_object_descriptor(metadata)
# TODO: object_descriptor needs suffix
self.storage.put_object(**object_descriptor, data=data)
def merge_queue_items(self):
@ -112,15 +125,18 @@ class AggregationStorageStrategy(ResponseStrategy):
data = json.dumps(self.merge_queue_items()).encode()
self.put_object(data, metadata)
def upload_or_aggregate(self, data, metadata):
def upload_or_aggregate(self, analysis_payload, request_metadata):
"""
analysis_payload : {data: ..., metadata: ...}
"""
if isinstance(data, str):
self.put_object(data.encode(), metadata)
if isinstance(analysis_payload, str):
self.put_object(analysis_payload.encode(), request_metadata)
else:
self.buffer.append(data)
if self.dispatch_callback(metadata):
self.upload_queue_items(metadata)
self.buffer.append(analysis_payload)
if self.dispatch_callback(request_metadata):
self.upload_queue_items(request_metadata)
def handle_response(self, payload, final=False):
request_metadata = omit(payload, ["result_data"])

View File

@ -109,8 +109,9 @@ def core_operation(item_type, one_to_many, analysis_task):
try:
return params2op[one_to_many][item_type][analysis_task]
except KeyError:
pytest.skip(f"No operation defined for parameter combination.")
logger.debug(f"No operation defined for [{one_to_many=}, {item_type=}, {analysis_task=}].")
msg = f"No operation defined for [{one_to_many=}, {item_type=}, {analysis_task=}]."
pytest.skip(msg)
logger.debug(msg)
return Nothing

View File

@ -2,15 +2,20 @@ import gzip
import json
import logging
from itertools import starmap, repeat, chain
from operator import itemgetter
import pytest
from frozendict import frozendict
from funcy import lfilter, compose, lzip
from pyinfra.default_objects import get_visitor, get_queue_manager, get_storage, get_consumer, get_callback
from pyinfra.default_objects import (
get_callback,
get_response_strategy,
)
from pyinfra.queue.consumer import Consumer
from pyinfra.server.packing import bytes_to_string, unpack, pack
from pyinfra.utils.func import star
from pyinfra.visitor import get_object_descriptor
from pyinfra.visitor import get_object_descriptor, QueueVisitor
from test.utils.input import adorn_data_with_storage_info
logger = logging.getLogger(__name__)
@ -20,30 +25,87 @@ def freeze(data, metadata):
return data, frozendict(metadata)
@pytest.mark.parametrize("one_to_many", [False, True])
@pytest.mark.parametrize("analysis_task", [False, True])
@pytest.fixture
def components(endpoint, bucket_name, queue_manager, storage):
callback = get_callback(endpoint)
consumer = Consumer(callback, queue_manager)
visitor = QueueVisitor(storage, callback, get_response_strategy(storage))
return visitor, queue_manager, storage, consumer
def decode(storage_item):
storage_item = json.loads(storage_item.decode())
if not isinstance(storage_item, list):
storage_item = [storage_item]
yield from map(compose(star(freeze), unpack), storage_item)
@pytest.mark.parametrize(
"one_to_many",
[
False,
True,
],
)
@pytest.mark.parametrize(
"analysis_task",
[
False,
True,
],
)
@pytest.mark.parametrize("n_items", [2])
@pytest.mark.parametrize("n_pages", [1])
@pytest.mark.parametrize("buffer_size", [2])
@pytest.mark.parametrize("storage_item_has_metadata", [True, False])
@pytest.mark.parametrize("item_type", ["string", "image", "pdf"])
@pytest.mark.parametrize(
"storage_item_has_metadata",
[
True,
False,
],
)
@pytest.mark.parametrize(
"item_type",
[
"string",
"image",
"pdf",
],
)
@pytest.mark.parametrize(
"queue_manager_name",
[
"mock",
# "pika",
],
scope="session",
)
@pytest.mark.parametrize(
"client_name",
[
"mock",
# "s3",
# "azure",
],
scope="session",
)
def test_serving(
server_process,
input_data_items,
unencoded_input_data,
metadata,
bucket_name,
endpoint,
components,
core_operation,
storage_item_has_metadata,
target_data_items,
targets,
):
callback = get_callback(endpoint)
visitor = get_visitor(callback)
queue_manager = get_queue_manager()
storage = get_storage()
consumer = get_consumer(callback)
visitor, queue_manager, storage, consumer = components
queue_manager.clear()
storage.clear_bucket(bucket_name)
@ -55,7 +117,7 @@ def test_serving(
metadata = repeat({})
targets = lzip(target_data_items, metadata)
targets = {*starmap(freeze, targets)}
targets = sorted(starmap(freeze, targets), key=itemgetter(0))
adorned_data_metadata_packs = adorn_data_with_storage_info(data_metadata_packs)
@ -65,18 +127,11 @@ def test_serving(
reqs = consumer.consume(inactivity_timeout=5)
for itm, req in zip(adorned_data_metadata_packs, reqs):
for _, req in zip(adorned_data_metadata_packs, reqs):
queue_manager.publish_response(req, visitor)
def decode(storage_item):
storage_item = json.loads(storage_item.decode())
if not isinstance(storage_item, list):
storage_item = [storage_item]
yield from map(compose(star(freeze), unpack), storage_item)
names_of_uploaded_files = lfilter(".out", storage.get_all_object_names(bucket_name))
uploaded_files = [storage.get_object(bucket_name, fn) for fn in names_of_uploaded_files]
outputs = {*chain(*map(decode, uploaded_files))}
outputs = sorted(chain(*map(decode, uploaded_files)), key=itemgetter(0))
assert outputs == targets

View File

@ -9,6 +9,7 @@ def adorn_data_with_storage_info(data: Iterable[bytes]):
"fileId": f"file{i}",
"targetFileExtension": "in.gz",
"responseFileExtension": "out.gz",
"pages": []
}
yield d, body