158 lines
3.8 KiB
Python
158 lines
3.8 KiB
Python
import gzip
|
|
import json
|
|
from itertools import starmap, repeat, chain
|
|
from operator import itemgetter
|
|
|
|
import pytest
|
|
from funcy import compose, lzip, lpluck
|
|
|
|
from pyinfra.default_objects import (
|
|
get_callback,
|
|
get_response_strategy,
|
|
get_consumer,
|
|
get_queue_manager,
|
|
get_storage,
|
|
)
|
|
from pyinfra.queue.consumer import Consumer
|
|
from pyinfra.server.packing import bytes_to_string, unpack, pack
|
|
from pyinfra.visitor import get_object_descriptor, QueueVisitor
|
|
from test.utils.input import pair_data_with_queue_message
|
|
|
|
|
|
@pytest.fixture
|
|
def test_components(url, queue_manager, storage):
|
|
|
|
callback = get_callback(url)
|
|
visitor = QueueVisitor(storage, callback, get_response_strategy(storage))
|
|
consumer = Consumer(visitor, queue_manager)
|
|
|
|
return storage, queue_manager, consumer
|
|
|
|
|
|
@pytest.fixture
|
|
def real_components(url):
|
|
callback = get_callback(url)
|
|
consumer = get_consumer(callback)
|
|
queue_manager = get_queue_manager()
|
|
storage = get_storage()
|
|
return storage, queue_manager, consumer
|
|
|
|
|
|
@pytest.fixture
|
|
def components(components_type, real_components, test_components):
|
|
if components_type == "real":
|
|
return real_components
|
|
elif components_type == "test":
|
|
return test_components
|
|
else:
|
|
raise ValueError(f"Unknown components type '{components_type}'.")
|
|
|
|
|
|
@pytest.fixture(params=["real", "mixed"])
|
|
def components_type(request):
|
|
return request.param
|
|
|
|
|
|
def decode(storage_item):
|
|
storage_item = json.loads(storage_item.decode())
|
|
if not isinstance(storage_item, list):
|
|
storage_item = [storage_item]
|
|
|
|
yield from map(unpack, storage_item)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"one_to_many",
|
|
[
|
|
False,
|
|
True,
|
|
],
|
|
)
|
|
@pytest.mark.parametrize(
|
|
"analysis_task",
|
|
[
|
|
False,
|
|
True,
|
|
],
|
|
)
|
|
@pytest.mark.parametrize("n_items", [1, 3])
|
|
@pytest.mark.parametrize("n_pages", [2])
|
|
@pytest.mark.parametrize("buffer_size", [2])
|
|
@pytest.mark.parametrize(
|
|
"storage_item_has_metadata",
|
|
[
|
|
True,
|
|
False,
|
|
],
|
|
)
|
|
@pytest.mark.parametrize(
|
|
"item_type",
|
|
[
|
|
"string",
|
|
"image",
|
|
"pdf",
|
|
],
|
|
)
|
|
@pytest.mark.parametrize(
|
|
"queue_manager_name",
|
|
[
|
|
# "mock",
|
|
"pika",
|
|
],
|
|
scope="session",
|
|
)
|
|
@pytest.mark.parametrize(
|
|
"client_name",
|
|
[
|
|
# "mock",
|
|
"s3",
|
|
"azure",
|
|
],
|
|
scope="session",
|
|
)
|
|
@pytest.mark.parametrize(
|
|
"components_type",
|
|
[
|
|
# "test",
|
|
"real",
|
|
],
|
|
)
|
|
def test_serving(
|
|
server_process,
|
|
input_data_items,
|
|
metadata,
|
|
bucket_name,
|
|
components,
|
|
storage_item_has_metadata,
|
|
target_data_items,
|
|
targets,
|
|
):
|
|
storage, queue_manager, consumer = components
|
|
|
|
queue_manager.clear()
|
|
storage.clear_bucket(bucket_name)
|
|
|
|
if storage_item_has_metadata:
|
|
data_metadata_packs = starmap(compose(lambda s: s.encode(), json.dumps, pack), zip(input_data_items, metadata))
|
|
else:
|
|
data_metadata_packs = map(compose(lambda s: s.encode(), json.dumps, bytes_to_string), input_data_items)
|
|
metadata = repeat({})
|
|
targets = lzip(target_data_items, metadata)
|
|
|
|
targets = sorted(targets, key=itemgetter(0))
|
|
|
|
data_message_pairs = pair_data_with_queue_message(data_metadata_packs)
|
|
|
|
for data, message in data_message_pairs:
|
|
storage.put_object(**get_object_descriptor(message), data=gzip.compress(data))
|
|
queue_manager.publish_request(message)
|
|
|
|
consumer.consume_and_publish(n=len(data_message_pairs))
|
|
|
|
names_of_uploaded_files = lpluck("responseFile", queue_manager.output_queue.to_list())
|
|
uploaded_files = starmap(storage.get_object, zip(repeat(bucket_name), names_of_uploaded_files))
|
|
outputs = sorted(chain(*map(decode, uploaded_files)), key=itemgetter(0))
|
|
assert outputs == targets
|
|
|
|
storage.clear_bucket(bucket_name)
|