2022-05-31 16:22:27 +02:00

159 lines
3.8 KiB
Python

import gzip
import json
import logging
from itertools import starmap, repeat, chain
from operator import itemgetter
import pytest
from funcy import compose, lzip, lpluck
from pyinfra.default_objects import (
get_callback,
get_response_strategy,
get_consumer,
get_queue_manager,
get_storage,
)
from pyinfra.queue.consumer import Consumer
from pyinfra.server.packing import bytes_to_string, unpack, pack
from pyinfra.visitor import get_object_descriptor, QueueVisitor
from test.utils.input import pair_data_with_queue_message
logger = logging.getLogger(__name__)
@pytest.fixture
def test_components(url, queue_manager, storage):
callback = get_callback(url)
visitor = QueueVisitor(storage, callback, get_response_strategy(storage))
consumer = Consumer(visitor, queue_manager)
return storage, queue_manager, consumer
@pytest.fixture
def real_components(url):
callback = get_callback(url)
consumer = get_consumer(callback)
queue_manager = get_queue_manager()
storage = get_storage()
return storage, queue_manager, consumer
@pytest.fixture
def components(components_type, real_components, test_components):
if components_type == "real":
return real_components
elif components_type == "test":
return test_components
else:
raise ValueError(f"Unknown components type '{components_type}'.")
@pytest.fixture(params=["real", "mixed"])
def components_type(request):
return request.param
def decode(storage_item):
storage_item = json.loads(storage_item.decode())
if not isinstance(storage_item, list):
storage_item = [storage_item]
yield from map(unpack, storage_item)
@pytest.mark.parametrize(
"one_to_many",
[
False,
True,
],
)
@pytest.mark.parametrize(
"analysis_task",
[
False,
True,
],
)
@pytest.mark.parametrize("n_items", [1, 3])
@pytest.mark.parametrize("n_pages", [2])
@pytest.mark.parametrize("buffer_size", [2])
@pytest.mark.parametrize(
"storage_item_has_metadata",
[
True,
False,
],
)
@pytest.mark.parametrize(
"item_type",
[
"string",
"image",
"pdf",
],
)
@pytest.mark.parametrize(
"queue_manager_name",
[
# "mock",
"pika",
],
scope="session",
)
@pytest.mark.parametrize(
"client_name",
[
# "mock",
"s3",
"azure",
],
scope="session",
)
@pytest.mark.parametrize(
"components_type",
[
# "test",
"real",
],
)
def test_serving(
server_process,
input_data_items,
metadata,
bucket_name,
components,
storage_item_has_metadata,
target_data_items,
targets,
):
storage, queue_manager, consumer = components
queue_manager.clear()
storage.clear_bucket(bucket_name)
if storage_item_has_metadata:
data_metadata_packs = starmap(compose(lambda s: s.encode(), json.dumps, pack), zip(input_data_items, metadata))
else:
data_metadata_packs = map(compose(lambda s: s.encode(), json.dumps, bytes_to_string), input_data_items)
metadata = repeat({})
targets = lzip(target_data_items, metadata)
targets = sorted(targets, key=itemgetter(0))
data_message_pairs = pair_data_with_queue_message(data_metadata_packs)
for data, message in data_message_pairs:
storage.put_object(**get_object_descriptor(message), data=gzip.compress(data))
queue_manager.publish_request(message)
consumer.consume_and_publish(n=len(data_message_pairs))
names_of_uploaded_files = lpluck("responseFile", queue_manager.output_queue.to_list())
uploaded_files = starmap(storage.get_object, zip(repeat(bucket_name), names_of_uploaded_files))
outputs = sorted(chain(*map(decode, uploaded_files)), key=itemgetter(0))
assert outputs == targets