diff --git a/pyinfra/queue/queue_manager/pika_queue_manager.py b/pyinfra/queue/queue_manager/pika_queue_manager.py index 20eb993..795f5d4 100644 --- a/pyinfra/queue/queue_manager/pika_queue_manager.py +++ b/pyinfra/queue/queue_manager/pika_queue_manager.py @@ -159,6 +159,8 @@ class PikaQueueManager(QueueManager): try: self.channel.queue_purge(self._input_queue) self.channel.queue_purge(self._output_queue) + assert self.input_queue.to_list() == [] + assert self.output_queue.to_list() == [] except pika.exceptions.ChannelWrongStateError: pass diff --git a/pyinfra/visitor.py b/pyinfra/visitor.py index d7d560e..5dc419c 100644 --- a/pyinfra/visitor.py +++ b/pyinfra/visitor.py @@ -153,6 +153,9 @@ class AggregationStorageStrategy(ResponseStrategy): request_metadata = omit(analysis_payload, ["data"]) result_data = peekable(analysis_payload["data"]) for analysis_payload in result_data: + print("------------------------------------------------------") + print("analysis_payload", analysis_payload) + print("request_metadata", request_metadata) yield self.upload_or_aggregate(analysis_payload, request_metadata, last=not result_data.peek(False)) @@ -229,6 +232,7 @@ class QueueVisitor: return data def process_storage_item(self, data_metadata_pack): + print(data_metadata_pack) return self.callback(data_metadata_pack) def load_item_from_storage_and_process_with_callback(self, queue_item_body): diff --git a/test/conftest.py b/test/conftest.py index 11ee108..a39a664 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -40,7 +40,7 @@ logging.getLogger("PIL.PngImagePlugin").setLevel(level=logging.CRITICAL + 1) logging.getLogger("waitress").setLevel(level=logging.CRITICAL + 1) -@pytest.fixture(autouse=False) +@pytest.fixture(autouse=True) def mute_logger(): logger.setLevel(logging.CRITICAL + 1) @@ -77,14 +77,14 @@ def mock_make_load_data(): return load_data -# def pytest_make_parametrize_id(config, val, argname): -# return f"\n\t{argname}={val}\n" +def pytest_make_parametrize_id(config, val, argname): + return f"\n\t{argname}={val}\n" -@pytest.fixture(params=["minio", "aws"], scope="session") -def storage(client_name, bucket_name, request, docker_compose): +@pytest.fixture +def storage(client_name, bucket_name, s3_backend, docker_compose): logger.debug("Setup for storage") - storage = Storage(get_adapter(client_name, request.param)) + storage = Storage(get_adapter(client_name, s3_backend)) storage.make_bucket(bucket_name) storage.clear_bucket(bucket_name) yield storage @@ -92,15 +92,21 @@ def storage(client_name, bucket_name, request, docker_compose): storage.clear_bucket(bucket_name) +@pytest.fixture(params=["minio", "aws"], scope="session") +def s3_backend(request): + return request.param + + @pytest.fixture(scope="session", autouse=True) def docker_compose(sleep_seconds=30): - logger.info(f"Starting docker containers with {COMPOSE_PATH}/docker-compose.yml...") - compose = DockerCompose(COMPOSE_PATH, compose_file_name="docker-compose.yml") - compose.start() - logger.info(f"Sleeping for {sleep_seconds} seconds to wait for containers to finish startup... ") - time.sleep(sleep_seconds) - yield compose - compose.stop() + pass + # logger.info(f"Starting docker containers with {COMPOSE_PATH}/docker-compose.yml...") + # compose = DockerCompose(COMPOSE_PATH, compose_file_name="docker-compose.yml") + # compose.start() + # logger.info(f"Sleeping for {sleep_seconds} seconds to wait for containers to finish startup... ") + # time.sleep(sleep_seconds) + # yield compose + # compose.stop() def get_pika_connection_params(): diff --git a/test/fixtures/input.py b/test/fixtures/input.py index 8c76f7f..7d07b07 100644 --- a/test/fixtures/input.py +++ b/test/fixtures/input.py @@ -1,15 +1,17 @@ +from functools import partial from itertools import starmap, repeat import numpy as np import pytest from PIL import Image -from funcy import lmap, compose, flatten, lflatten, omit +from funcy import lmap, compose, flatten, lflatten, omit, join, pluck, lpluck, second, project, first, lzip from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.normalization import normalize_item from pyinfra.server.packing import pack, unpack from pyinfra.utils.func import star, lift, lstarlift from test.utils.image import image_to_bytes +from test.utils.input import pair_data_with_queue_message from test.utils.pdf import pdf_stream @@ -80,16 +82,37 @@ def strings_to_bytes(strings): @pytest.fixture -def targets(input_data_items, operation, metadata): +def targets(data_message_pairs, input_data_items, operation, metadata): + """TODO: this has become super wonky""" + print(data_message_pairs) + klaus = lmap(second, data_message_pairs) + print(klaus) + metadata = [{**m1, **m2} for m1, m2 in zip(klaus, metadata)] if operation is Nothing: return Nothing op = compose(lift(star(pack)), normalize_item, operation) - expected = lmap(unpack, flatten(starmap(op, zip(input_data_items, metadata)))) + + try: + response_data, response_metadata = zip(*map(unpack, flatten(starmap(op, zip(input_data_items, metadata))))) + + queue_message_keys = second(first(pair_data_with_queue_message([b""]))).keys() + print(queue_message_keys) + response_metadata = lmap(partial(omit, keys=queue_message_keys), response_metadata) + expected = lzip(response_data, response_metadata) + + except ValueError: + print() + print(input_data_items) + print(metadata) + expected = [] + + print("expected", expected) return expected + @pytest.fixture def endpoint(url): return f"{url}/submit" @@ -133,15 +156,10 @@ def images_to_bytes(images): @pytest.fixture -def metadata(n_items): - return repeat({"key": "value", "pages": [0, 2, 3]}) +def metadata(): + return repeat({"key": "value"}) @pytest.fixture def packages(input_data_items, metadata): return lstarlift(pack)(zip(input_data_items, metadata)) - - -@pytest.fixture(params=[False]) -def many_to_many(request): - return request.param diff --git a/test/fixtures/server.py b/test/fixtures/server.py index cae4fde..b238651 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -63,8 +63,10 @@ def operation(core_operation): def op(data, metadata): assert isinstance(metadata, dict) result = core_operation(data, metadata) + print("result", result, type(result)) if isinstance(result, Generator): for data, metadata in result: + print(5555555555555555555555555555555) yield data, omit(metadata, ["pages", "operation"]) else: data, metadata = result @@ -85,8 +87,11 @@ def core_operation(item_type, one_to_many, analysis_task): return string.decode().upper().encode(), metadata def extract(string: bytes, metadata): + print() + print("metadata", metadata) for i, c in project(dict(enumerate(string.decode())), metadata["pages"]).items(): metadata["id"] = i + print("XYZ", metadata) yield c.encode(), metadata def rotate(im: bytes, metadata): @@ -132,6 +137,11 @@ def one_to_many(request): return request.param +@pytest.fixture(params=[True, False]) +def many_to_n(request): + return request.param + + @pytest.fixture(params=[True, False]) def analysis_task(request): return request.param diff --git a/test/integration_tests/serve_test.py b/test/integration_tests/serve_test.py index 494b91c..87745e8 100644 --- a/test/integration_tests/serve_test.py +++ b/test/integration_tests/serve_test.py @@ -20,9 +20,23 @@ from test.utils.input import pair_data_with_queue_message @pytest.mark.parametrize( - "one_to_many", + "s3_backend", + [ + "minio", + "aws", + ], +) +@pytest.mark.parametrize( + "batched", [ False, + # True, + ], +) +@pytest.mark.parametrize( + "one_to_many", + [ + # False, True, ], ) @@ -30,18 +44,24 @@ from test.utils.input import pair_data_with_queue_message "analysis_task", [ False, - True, + # True, + ], +) +@pytest.mark.parametrize( + "n_items", + [ + 1, + # 3, ], ) -@pytest.mark.parametrize("n_items", [1, 3]) @pytest.mark.parametrize("n_pages", [2]) @pytest.mark.parametrize("buffer_size", [2]) @pytest.mark.parametrize( "item_type", [ "string", - "image", - "pdf", + # "image", + # "pdf", ], ) @pytest.mark.parametrize( @@ -55,34 +75,68 @@ from test.utils.input import pair_data_with_queue_message @pytest.mark.parametrize( "client_name", [ - # "mock", - "s3", - "azure", + "mock", + # "s3", + # "azure", ], scope="session", ) @pytest.mark.parametrize( "components_type", [ - # "test", - "real", + "test", + # "real", ], ) -def test_serving(server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_many): +@pytest.mark.parametrize( + "many_to_n", + [ + False, + # True, + ], +) +def test_serving(server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_n): + input("before") + print() storage, queue_manager, consumer = components - if many_to_many: - upload_data_to_folder_in_storage_and_publish_single_request_to_queue(storage, queue_manager, data_message_pairs) + assert queue_manager.input_queue.to_list() == [] + assert queue_manager.output_queue.to_list() == [] + assert [*storage.get_all_object_names(bucket_name)] == [] + + if many_to_n: + # for data, message in data_message_pairs: + # # storage.put_object(**get_object_descriptor(message), data=gzip.compress(data)) + # print("message", message) + # queue_manager.publish_request(message) + + upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, data_message_pairs) + storage.clear_bucket(bucket_name) + queue_manager.clear() + outputs = get_data_uploaded_by_consumer(queue_manager, storage, bucket_name) + # upload_data_to_folder_in_storage_and_publish_single_request_to_queue(storage, queue_manager, data_message_pairs) + return else: + print(22222222222222222222222222222222222222222222222222222222222222222222) + if n_items: + assert data_message_pairs upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, data_message_pairs) + print(33333333333333333333333333333333333333333333333) + consumer.consume_and_publish(n=n_items) + + print(44444444444444444444444444444444444444444444444) + print([*storage.get_all_object_names(bucket_name)]) outputs = get_data_uploaded_by_consumer(queue_manager, storage, bucket_name) + print("BLYAT", data_message_pairs) # TODO: correctness of target should be validated as well, since production was become non-trivial assert sorted(outputs) == sorted(targets) + input("after") + @pytest.fixture def data_metadata_packs(input_data_items, metadata): @@ -105,14 +159,18 @@ def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, queue_manager.publish_request(message) -def upload_data_to_folder_in_storage_and_publish_single_request_to_queue(storage, queue_manager, data_message_pairs): - for i, (data, message) in enumerate(data_message_pairs): - object_descriptor = get_object_descriptor(message) - object_name = object_descriptor["object_name"] - object_descriptor["object_name"] = f"{object_name}/pages/{i}" - storage.put_object(**object_descriptor, data=gzip.compress(data)) - - queue_manager.publish_request(message) +# def upload_data_to_folder_in_storage_and_publish_single_request_to_queue(storage, queue_manager, data_message_pairs): +# print() +# print(22222222222222222222222222222222222222222) +# assert data_message_pairs +# for i, (data, message) in enumerate(data_message_pairs): +# print(i) +# object_descriptor = get_object_descriptor(message) +# object_name = object_descriptor["object_name"] +# object_descriptor["object_name"] = f"{object_name}/pages/{i}" +# storage.put_object(**object_descriptor, data=gzip.compress(data)) +# +# queue_manager.publish_request(message) def get_data_uploaded_by_consumer(queue_manager, storage, bucket_name): @@ -123,13 +181,13 @@ def get_data_uploaded_by_consumer(queue_manager, storage, bucket_name): @pytest.fixture -def components(components_type, real_components, test_components, bucket_name): - if components_type == "real": - components = real_components - elif components_type == "test": +def components(components_type, test_components, bucket_name): + # if components_type == "real": + # components = real_components + if components_type == "test": components = test_components else: - raise ValueError(f"Unknown components type '{components_type}'.") + raise ValueError(f"Unknown component type '{components_type}'.") storage, queue_manager, consumer = components @@ -138,6 +196,9 @@ def components(components_type, real_components, test_components, bucket_name): yield storage, queue_manager, consumer + queue_manager.clear() + print() + print("queue", queue_manager.input_queue.to_list()) storage.clear_bucket(bucket_name) diff --git a/test/utils/input.py b/test/utils/input.py index daf754c..77003bc 100644 --- a/test/utils/input.py +++ b/test/utils/input.py @@ -9,6 +9,7 @@ def pair_data_with_queue_message(data: Iterable[bytes]): "fileId": f"file{i}", "targetFileExtension": "in.gz", "responseFileExtension": "out.gz", + "pages": [0, 2, 3] } yield d, body