import json import re from itertools import starmap, repeat, chain from operator import itemgetter import pytest from funcy import compose, first, second, pluck, lflatten, lzip from pyinfra.config import CONFIG from pyinfra.default_objects import ComponentFactory, get_component_factory from pyinfra.queue.consumer import Consumer from pyinfra.server.packing import unpack, pack from pyinfra.utils.encoding import compress, decompress from pyinfra.visitor import QueueVisitor from test.config import CONFIG as TEST_CONFIG @pytest.mark.parametrize( "batched", [ False, True, ], ) @pytest.mark.parametrize( "one_to_many", [ False, True, ], ) @pytest.mark.parametrize( "analysis_task", [ False, True, ], ) @pytest.mark.parametrize( "n_items", [ 1, 3, ], ) @pytest.mark.parametrize("n_pages", [2]) @pytest.mark.parametrize("buffer_size", [1, 2]) @pytest.mark.parametrize( "item_type", [ "string", "image", "pdf", ], ) @pytest.mark.parametrize( "queue_manager_name", [ # "mock", "pika", ], scope="session", ) @pytest.mark.parametrize( "client_name", [ "mock", # "s3", # "azure", ], scope="session", ) @pytest.mark.parametrize( "components_type", [ # "test", "real", ], ) @pytest.mark.parametrize( "many_to_n", [ True, # False, ], ) def test_serving(server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_n): storage, queue_manager, consumer, file_descriptor_manager = components assert queue_manager.input_queue.to_list() == [] assert queue_manager.output_queue.to_list() == [] assert [*storage.get_all_object_names(bucket_name)] == [] if n_items: assert data_message_pairs if many_to_n: upload_data_to_folder_in_storage_and_publish_single_request_to_queue( storage, queue_manager, data_message_pairs, file_descriptor_manager ) else: upload_data_to_storage_and_publish_requests_to_queue( storage, queue_manager, data_message_pairs, file_descriptor_manager ) consumer.consume_and_publish(n=int(many_to_n) or n_items) outputs = get_data_uploaded_by_consumer(queue_manager, storage, bucket_name) # TODO: correctness of target should be validated as well, since production has become non-trivial assert sorted(outputs) == sorted(targets) @pytest.fixture def data_metadata_packs(input_data_items, metadata): return list(starmap(compose(lambda s: s.encode(), json.dumps, pack), zip(input_data_items, metadata))) @pytest.fixture def data_message_pairs(data_metadata_packs, queue_message_metadata): return lzip(data_metadata_packs, queue_message_metadata) # TODO: refactor; too many params def upload_data_to_storage_and_publish_requests_to_queue( storage, queue_manager, data_message_pairs, file_descriptor_manager ): for data, message in data_message_pairs: upload_data_to_storage_and_publish_request_to_queue( storage, queue_manager, data, message, file_descriptor_manager ) # TODO: refactor; too many params def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message, file_descriptor_manager): storage.put_object(**file_descriptor_manager.get_input_object_descriptor(message), data=compress(data)) queue_manager.publish_request(message) # TODO: refactor body; too long and scripty def upload_data_to_folder_in_storage_and_publish_single_request_to_queue( storage, queue_manager, data_message_pairs, file_descriptor_manager ): assert data_message_pairs ref_message = second(first(data_message_pairs)) pages = ref_message["pages"] for data, page in zip(map(first, data_message_pairs), pages): object_descriptor = file_descriptor_manager.get_input_object_descriptor(ref_message) object_descriptor["object_name"] = build_filepath(object_descriptor, page) storage.put_object(**object_descriptor, data=compress(data)) queue_manager.publish_request(ref_message) def build_filepath(object_descriptor, page): object_name = object_descriptor["object_name"] parts = object_name.split("/") path = "/".join(parts) path = re.sub("id:\d", f"id:{page}", path) return path def get_data_uploaded_by_consumer(queue_manager, storage, bucket_name): names_of_uploaded_files = lflatten(pluck("response_files", queue_manager.output_queue.to_list())) uploaded_files = starmap(storage.get_object, zip(repeat(bucket_name), names_of_uploaded_files)) outputs = sorted(chain(*map(decode, uploaded_files)), key=itemgetter(0)) return outputs @pytest.fixture def components(components_type, real_components, test_components, bucket_name): if components_type == "real": components = real_components elif components_type == "test": components = test_components else: raise ValueError(f"Unknown component type '{components_type}'.") storage, queue_manager, consumer, file_descriptor_manager = components queue_manager.clear() storage.make_bucket(bucket_name) storage.clear_bucket(bucket_name) yield storage, queue_manager, consumer, file_descriptor_manager queue_manager.clear() storage.clear_bucket(bucket_name) def decode(storage_item): storage_item = json.loads(decompress(storage_item).decode()) if not isinstance(storage_item, list): storage_item = [storage_item] yield from map(unpack, storage_item) @pytest.fixture(params=["real", "mixed"]) def components_type(request): return request.param @pytest.fixture def real_components(url, download_strategy_type): CONFIG["service"]["operations"] = TEST_CONFIG.service.operations CONFIG["service"]["response_formatter"] = TEST_CONFIG.service.response_formatter CONFIG["service"]["download_strategy"] = download_strategy_type component_factory = get_component_factory(CONFIG) callback = component_factory.get_callback(url) consumer = component_factory.get_consumer(callback) queue_manager = component_factory.get_queue_manager() storage = component_factory.get_storage() file_descriptor_manager = component_factory.get_file_descriptor_manager() print(download_strategy_type, component_factory.get_download_strategy()) return storage, queue_manager, consumer, file_descriptor_manager @pytest.fixture def download_strategy_type(many_to_n): return "multi" if many_to_n else "single" @pytest.fixture def test_components(url, queue_manager, storage, download_strategy_type): pass # # component_factory = ComponentFactory(CONFIG) # # download_strategy = component_factory.get_download_strategy(download_strategy_type) # file_descriptor_manager = component_factory.get_file_descriptor_manager() # # visitor = QueueVisitor( # storage=storage, # callback=component_factory.get_callback(url), # response_strategy=component_factory.get_response_strategy(storage), # download_strategy=download_strategy, # ) # consumer = Consumer(visitor, queue_manager) # # return storage, queue_manager, consumer, file_descriptor_manager