235 lines
6.4 KiB
Python

import gzip
import json
from itertools import starmap, repeat, chain
from operator import itemgetter
import pytest
from funcy import compose, lpluck
from pyinfra.default_objects import (
get_callback,
get_response_strategy,
get_consumer,
get_queue_manager,
get_storage,
)
from pyinfra.queue.consumer import Consumer
from pyinfra.server.packing import unpack, pack
from pyinfra.visitor import get_object_descriptor, QueueVisitor
from test.utils.input import pair_data_with_queue_message
@pytest.mark.parametrize(
"s3_backend",
[
"minio",
"aws",
],
)
@pytest.mark.parametrize(
"batched",
[
False,
# True,
],
)
@pytest.mark.parametrize(
"one_to_many",
[
# False,
True,
],
)
@pytest.mark.parametrize(
"analysis_task",
[
False,
# True,
],
)
@pytest.mark.parametrize(
"n_items",
[
1,
# 3,
],
)
@pytest.mark.parametrize("n_pages", [2])
@pytest.mark.parametrize("buffer_size", [2])
@pytest.mark.parametrize(
"item_type",
[
"string",
# "image",
# "pdf",
],
)
@pytest.mark.parametrize(
"queue_manager_name",
[
# "mock",
"pika",
],
scope="session",
)
@pytest.mark.parametrize(
"client_name",
[
"mock",
# "s3",
# "azure",
],
scope="session",
)
@pytest.mark.parametrize(
"components_type",
[
"test",
# "real",
],
)
@pytest.mark.parametrize(
"many_to_n",
[
False,
# True,
],
)
def test_serving(server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_n):
input("before")
print()
storage, queue_manager, consumer = components
assert queue_manager.input_queue.to_list() == []
assert queue_manager.output_queue.to_list() == []
assert [*storage.get_all_object_names(bucket_name)] == []
if many_to_n:
# for data, message in data_message_pairs:
# # storage.put_object(**get_object_descriptor(message), data=gzip.compress(data))
# print("message", message)
# queue_manager.publish_request(message)
upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, data_message_pairs)
storage.clear_bucket(bucket_name)
queue_manager.clear()
outputs = get_data_uploaded_by_consumer(queue_manager, storage, bucket_name)
# upload_data_to_folder_in_storage_and_publish_single_request_to_queue(storage, queue_manager, data_message_pairs)
return
else:
print(22222222222222222222222222222222222222222222222222222222222222222222)
if n_items:
assert data_message_pairs
upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, data_message_pairs)
print(33333333333333333333333333333333333333333333333)
consumer.consume_and_publish(n=n_items)
print(44444444444444444444444444444444444444444444444)
print([*storage.get_all_object_names(bucket_name)])
outputs = get_data_uploaded_by_consumer(queue_manager, storage, bucket_name)
print("BLYAT", data_message_pairs)
# TODO: correctness of target should be validated as well, since production was become non-trivial
assert sorted(outputs) == sorted(targets)
input("after")
@pytest.fixture
def data_metadata_packs(input_data_items, metadata):
return list(starmap(compose(lambda s: s.encode(), json.dumps, pack), zip(input_data_items, metadata)))
@pytest.fixture
def data_message_pairs(data_metadata_packs):
data_message_pairs = pair_data_with_queue_message(data_metadata_packs)
return data_message_pairs
def upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, data_message_pairs):
for data, message in data_message_pairs:
upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message)
def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message):
storage.put_object(**get_object_descriptor(message), data=gzip.compress(data))
queue_manager.publish_request(message)
# def upload_data_to_folder_in_storage_and_publish_single_request_to_queue(storage, queue_manager, data_message_pairs):
# print()
# print(22222222222222222222222222222222222222222)
# assert data_message_pairs
# for i, (data, message) in enumerate(data_message_pairs):
# print(i)
# object_descriptor = get_object_descriptor(message)
# object_name = object_descriptor["object_name"]
# object_descriptor["object_name"] = f"{object_name}/pages/{i}"
# storage.put_object(**object_descriptor, data=gzip.compress(data))
#
# queue_manager.publish_request(message)
def get_data_uploaded_by_consumer(queue_manager, storage, bucket_name):
names_of_uploaded_files = lpluck("responseFile", queue_manager.output_queue.to_list())
uploaded_files = starmap(storage.get_object, zip(repeat(bucket_name), names_of_uploaded_files))
outputs = sorted(chain(*map(decode, uploaded_files)), key=itemgetter(0))
return outputs
@pytest.fixture
def components(components_type, test_components, bucket_name):
# if components_type == "real":
# components = real_components
if components_type == "test":
components = test_components
else:
raise ValueError(f"Unknown component type '{components_type}'.")
storage, queue_manager, consumer = components
queue_manager.clear()
storage.clear_bucket(bucket_name)
yield storage, queue_manager, consumer
queue_manager.clear()
print()
print("queue", queue_manager.input_queue.to_list())
storage.clear_bucket(bucket_name)
def decode(storage_item):
storage_item = json.loads(gzip.decompress(storage_item).decode())
if not isinstance(storage_item, list):
storage_item = [storage_item]
yield from map(unpack, storage_item)
@pytest.fixture(params=["real", "mixed"])
def components_type(request):
return request.param
@pytest.fixture
def real_components(url):
callback = get_callback(url)
consumer = get_consumer(callback)
queue_manager = get_queue_manager()
storage = get_storage()
return storage, queue_manager, consumer
@pytest.fixture
def test_components(url, queue_manager, storage):
callback = get_callback(url)
visitor = QueueVisitor(storage, callback, get_response_strategy(storage))
consumer = Consumer(visitor, queue_manager)
return storage, queue_manager, consumer