231 lines
6.1 KiB
Python

import gzip
import json
import re
from itertools import starmap, repeat, chain
from operator import itemgetter
import pytest
from funcy import compose, lpluck, first, second
from pyinfra.default_objects import (
get_callback,
get_response_strategy,
get_consumer,
get_queue_manager,
get_storage,
)
from pyinfra.queue.consumer import Consumer
from pyinfra.server.packing import unpack, pack
from pyinfra.visitor import get_object_descriptor, QueueVisitor, get_download_strategy
from test.utils.input import pair_data_with_queue_message
@pytest.mark.parametrize(
"batched",
[
False,
True,
],
)
@pytest.mark.parametrize(
"one_to_many",
[
False,
True,
],
)
@pytest.mark.parametrize(
"analysis_task",
[
False,
True,
],
)
# NOTE: There is a bug in pytest, which leads this test to fail, when n_items has more than one entry and s3_backend is
# the last decorator. Yes, really.
@pytest.mark.parametrize(
"s3_backend",
[
"minio",
"aws",
],
)
@pytest.mark.parametrize(
"n_items",
[
1,
3,
],
)
@pytest.mark.parametrize("n_pages", [2])
@pytest.mark.parametrize("buffer_size", [1, 2])
@pytest.mark.parametrize(
"item_type",
[
"string",
"image",
"pdf",
],
)
@pytest.mark.parametrize(
"queue_manager_name",
[
# "mock",
"pika",
],
scope="session",
)
@pytest.mark.parametrize(
"client_name",
[
"mock",
# "s3",
# "azure",
],
scope="session",
)
@pytest.mark.parametrize(
"components_type",
[
# "test",
"real",
],
)
@pytest.mark.parametrize(
"many_to_n",
[
False,
True,
],
)
def test_serving(server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_n):
storage, queue_manager, consumer = components
assert queue_manager.input_queue.to_list() == []
assert queue_manager.output_queue.to_list() == []
assert [*storage.get_all_object_names(bucket_name)] == []
if n_items:
assert data_message_pairs
if many_to_n:
upload_data_to_folder_in_storage_and_publish_single_request_to_queue(storage, queue_manager, data_message_pairs)
else:
upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, data_message_pairs)
consumer.consume_and_publish(n=int(many_to_n) or n_items)
outputs = get_data_uploaded_by_consumer(queue_manager, storage, bucket_name)
# TODO: correctness of target should be validated as well, since production was become non-trivial
assert sorted(outputs) == sorted(targets)
@pytest.fixture
def data_metadata_packs(input_data_items, metadata):
return list(starmap(compose(lambda s: s.encode(), json.dumps, pack), zip(input_data_items, metadata)))
@pytest.fixture
def data_message_pairs(data_metadata_packs):
data_message_pairs = pair_data_with_queue_message(data_metadata_packs)
return data_message_pairs
def upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, data_message_pairs):
for data, message in data_message_pairs:
upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message)
def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message):
storage.put_object(**get_object_descriptor(message), data=gzip.compress(data))
queue_manager.publish_request(message)
# TODO: refactor
def upload_data_to_folder_in_storage_and_publish_single_request_to_queue(storage, queue_manager, data_message_pairs):
assert data_message_pairs
ref_message = second(first(data_message_pairs))
pages = ref_message["pages"]
for data, page in zip(map(first, data_message_pairs), pages):
object_descriptor = get_object_descriptor(ref_message)
object_descriptor["object_name"] = build_filepath(object_descriptor, page)
storage.put_object(**object_descriptor, data=gzip.compress(data))
queue_manager.publish_request(ref_message)
def build_filepath(object_descriptor, page):
object_name = object_descriptor["object_name"]
parts = object_name.split("/")
parts.insert(-1, "pages")
path = "/".join(parts)
path = re.sub("id:\d", f"id:{page}", path)
return path
def get_data_uploaded_by_consumer(queue_manager, storage, bucket_name):
names_of_uploaded_files = lpluck("responseFile", queue_manager.output_queue.to_list())
uploaded_files = starmap(storage.get_object, zip(repeat(bucket_name), names_of_uploaded_files))
outputs = sorted(chain(*map(decode, uploaded_files)), key=itemgetter(0))
return outputs
@pytest.fixture
def components(components_type, real_components, test_components, bucket_name):
if components_type == "real":
components = real_components
elif components_type == "test":
components = test_components
else:
raise ValueError(f"Unknown component type '{components_type}'.")
storage, queue_manager, consumer = components
queue_manager.clear()
storage.clear_bucket(bucket_name)
yield storage, queue_manager, consumer
queue_manager.clear()
storage.clear_bucket(bucket_name)
def decode(storage_item):
storage_item = json.loads(gzip.decompress(storage_item).decode())
if not isinstance(storage_item, list):
storage_item = [storage_item]
yield from map(unpack, storage_item)
@pytest.fixture(params=["real", "mixed"])
def components_type(request):
return request.param
@pytest.fixture
def real_components(url, many_to_n):
callback = get_callback(url)
consumer = get_consumer(callback)
queue_manager = get_queue_manager()
storage = get_storage()
consumer.visitor.download_strategy = get_download_strategy("multi" if many_to_n else "single")
return storage, queue_manager, consumer
@pytest.fixture
def test_components(url, queue_manager, storage):
callback = get_callback(url)
visitor = QueueVisitor(storage, callback, get_response_strategy(storage))
consumer = Consumer(visitor, queue_manager)
return storage, queue_manager, consumer