import gzip import json from itertools import starmap, repeat, chain from operator import itemgetter import pytest from funcy import compose, lzip, lpluck from pyinfra.default_objects import ( get_callback, get_response_strategy, get_consumer, get_queue_manager, get_storage, ) from pyinfra.queue.consumer import Consumer from pyinfra.server.packing import bytes_to_string, unpack, pack from pyinfra.visitor import get_object_descriptor, QueueVisitor from test.utils.input import pair_data_with_queue_message @pytest.fixture def test_components(url, queue_manager, storage): callback = get_callback(url) visitor = QueueVisitor(storage, callback, get_response_strategy(storage)) consumer = Consumer(visitor, queue_manager) return storage, queue_manager, consumer @pytest.fixture def real_components(url): callback = get_callback(url) consumer = get_consumer(callback) queue_manager = get_queue_manager() storage = get_storage() return storage, queue_manager, consumer @pytest.fixture def components(components_type, real_components, test_components): if components_type == "real": return real_components elif components_type == "test": return test_components else: raise ValueError(f"Unknown components type '{components_type}'.") @pytest.fixture(params=["real", "mixed"]) def components_type(request): return request.param def decode(storage_item): storage_item = json.loads(storage_item.decode()) if not isinstance(storage_item, list): storage_item = [storage_item] yield from map(unpack, storage_item) @pytest.mark.parametrize( "one_to_many", [ False, True, ], ) @pytest.mark.parametrize( "analysis_task", [ False, True, ], ) @pytest.mark.parametrize("n_items", [1, 3]) @pytest.mark.parametrize("n_pages", [2]) @pytest.mark.parametrize("buffer_size", [2]) @pytest.mark.parametrize( "storage_item_has_metadata", [ True, False, ], ) @pytest.mark.parametrize( "item_type", [ "string", "image", "pdf", ], ) @pytest.mark.parametrize( "queue_manager_name", [ # "mock", "pika", ], scope="session", ) @pytest.mark.parametrize( "client_name", [ # "mock", "s3", "azure", ], scope="session", ) @pytest.mark.parametrize( "components_type", [ # "test", "real", ], ) def test_serving( server_process, input_data_items, metadata, bucket_name, components, storage_item_has_metadata, target_data_items, targets, ): storage, queue_manager, consumer = components queue_manager.clear() storage.clear_bucket(bucket_name) if storage_item_has_metadata: data_metadata_packs = starmap(compose(lambda s: s.encode(), json.dumps, pack), zip(input_data_items, metadata)) else: data_metadata_packs = map(compose(lambda s: s.encode(), json.dumps, bytes_to_string), input_data_items) metadata = repeat({}) targets = lzip(target_data_items, metadata) targets = sorted(targets, key=itemgetter(0)) data_message_pairs = pair_data_with_queue_message(data_metadata_packs) for data, message in data_message_pairs: storage.put_object(**get_object_descriptor(message), data=gzip.compress(data)) queue_manager.publish_request(message) consumer.consume_and_publish(n=len(data_message_pairs)) names_of_uploaded_files = lpluck("responseFile", queue_manager.output_queue.to_list()) uploaded_files = starmap(storage.get_object, zip(repeat(bucket_name), names_of_uploaded_files)) outputs = sorted(chain(*map(decode, uploaded_files)), key=itemgetter(0)) assert outputs == targets storage.clear_bucket(bucket_name)