import gzip import json import re from itertools import starmap, repeat, chain from operator import itemgetter import pytest from funcy import compose, lpluck, first, second from pyinfra.default_objects import ( get_callback, get_response_strategy, get_consumer, get_queue_manager, get_storage, ) from pyinfra.queue.consumer import Consumer from pyinfra.server.packing import unpack, pack from pyinfra.visitor import QueueVisitor, get_download_strategy from test.utils.input import pair_data_with_queue_message @pytest.mark.parametrize( "batched", [ False, True, ], ) @pytest.mark.parametrize( "one_to_many", [ False, True, ], ) @pytest.mark.parametrize( "analysis_task", [ # False, True, ], ) @pytest.mark.parametrize( "n_items", [ 1, 3, ], ) @pytest.mark.parametrize("n_pages", [2]) @pytest.mark.parametrize("buffer_size", [1, 2]) @pytest.mark.parametrize( "item_type", [ "string", "image", "pdf", ], ) @pytest.mark.parametrize( "queue_manager_name", [ # "mock", "pika", ], scope="session", ) @pytest.mark.parametrize( "client_name", [ "mock", # "s3", # "azure", ], scope="session", ) @pytest.mark.parametrize( "components_type", [ # "test", "real", ], ) @pytest.mark.parametrize( "many_to_n", [ False, True, ], ) def test_serving( server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_n, download_strategy ): storage, queue_manager, consumer = components assert queue_manager.input_queue.to_list() == [] assert queue_manager.output_queue.to_list() == [] assert [*storage.get_all_object_names(bucket_name)] == [] if n_items: assert data_message_pairs if many_to_n: upload_data_to_folder_in_storage_and_publish_single_request_to_queue( storage, queue_manager, data_message_pairs, download_strategy ) else: upload_data_to_storage_and_publish_requests_to_queue( storage, queue_manager, data_message_pairs, download_strategy ) consumer.consume_and_publish(n=int(many_to_n) or n_items) outputs = get_data_uploaded_by_consumer(queue_manager, storage, bucket_name) # TODO: correctness of target should be validated as well, since production was become non-trivial assert sorted(outputs) == sorted(targets) @pytest.fixture def data_metadata_packs(input_data_items, metadata): return list(starmap(compose(lambda s: s.encode(), json.dumps, pack), zip(input_data_items, metadata))) @pytest.fixture def data_message_pairs(data_metadata_packs): data_message_pairs = pair_data_with_queue_message(data_metadata_packs) return data_message_pairs # TODO: refactor; too many params def upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, data_message_pairs, download_strategy): for data, message in data_message_pairs: upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message, download_strategy) # TODO: refactor; too many params def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message, download_strategy): storage.put_object(**download_strategy.get_object_descriptor(message), data=gzip.compress(data)) queue_manager.publish_request(message) # TODO: refactor body; too long and scripty def upload_data_to_folder_in_storage_and_publish_single_request_to_queue( storage, queue_manager, data_message_pairs, download_strategy ): assert data_message_pairs ref_message = second(first(data_message_pairs)) pages = ref_message["pages"] for data, page in zip(map(first, data_message_pairs), pages): object_descriptor = download_strategy.get_object_descriptor(ref_message) object_descriptor["object_name"] = build_filepath(object_descriptor, page) storage.put_object(**object_descriptor, data=gzip.compress(data)) queue_manager.publish_request(ref_message) def build_filepath(object_descriptor, page): object_name = object_descriptor["object_name"] parts = object_name.split("/") parts.insert(-1, "pages") path = "/".join(parts) path = re.sub("id:\d", f"id:{page}", path) return path def get_data_uploaded_by_consumer(queue_manager, storage, bucket_name): names_of_uploaded_files = lpluck("responseFile", queue_manager.output_queue.to_list()) uploaded_files = starmap(storage.get_object, zip(repeat(bucket_name), names_of_uploaded_files)) outputs = sorted(chain(*map(decode, uploaded_files)), key=itemgetter(0)) return outputs @pytest.fixture def components(components_type, real_components, test_components, bucket_name): if components_type == "real": components = real_components elif components_type == "test": components = test_components else: raise ValueError(f"Unknown component type '{components_type}'.") storage, queue_manager, consumer = components queue_manager.clear() storage.make_bucket(bucket_name) storage.clear_bucket(bucket_name) yield storage, queue_manager, consumer queue_manager.clear() storage.clear_bucket(bucket_name) def decode(storage_item): storage_item = json.loads(gzip.decompress(storage_item).decode()) if not isinstance(storage_item, list): storage_item = [storage_item] yield from map(unpack, storage_item) @pytest.fixture(params=["real", "mixed"]) def components_type(request): return request.param @pytest.fixture def real_components(url, download_strategy): callback = get_callback(url) consumer = get_consumer(callback) queue_manager = get_queue_manager() storage = get_storage() consumer.visitor.download_strategy = download_strategy return storage, queue_manager, consumer @pytest.fixture def download_strategy(many_to_n): return get_download_strategy("multi" if many_to_n else "single") @pytest.fixture def test_components(url, queue_manager, storage): callback = get_callback(url) visitor = QueueVisitor(storage, callback, get_response_strategy(storage)) consumer = Consumer(visitor, queue_manager) return storage, queue_manager, consumer