From 23876501dc373cf6d1800d0adb0fd3c9e4af5e9a Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Wed, 22 Jun 2022 12:00:42 +0200 Subject: [PATCH] refactoring: de-hardcorded operation by refactoring queue message metadata fixture --- test/config.yaml | 4 ++++ test/fixtures/consumer.py | 14 -------------- test/fixtures/input.py | 22 +++++++++++++++++----- test/integration_tests/serve_test.py | 8 +++----- test/unit_tests/server/pipeline_test.py | 21 +++++++++------------ test/utils/input.py | 16 ---------------- 6 files changed, 33 insertions(+), 52 deletions(-) delete mode 100644 test/utils/input.py diff --git a/test/config.yaml b/test/config.yaml index 40fb706..20fa617 100644 --- a/test/config.yaml +++ b/test/config.yaml @@ -5,6 +5,10 @@ service: input: subdir: op_inp_files extension: IN.gz + single_inp_op: + input: + subdir: "" + extension: IN.gz storage: minio: diff --git a/test/fixtures/consumer.py b/test/fixtures/consumer.py index 8b663b1..80bb4ea 100644 --- a/test/fixtures/consumer.py +++ b/test/fixtures/consumer.py @@ -1,22 +1,8 @@ -from _operator import itemgetter - import pytest from pyinfra.queue.consumer import Consumer -from test.utils.input import pair_data_with_queue_message @pytest.fixture(scope="session") def consumer(queue_manager, callback): return Consumer(callback, queue_manager) - - -@pytest.fixture(scope="session") -def access_callback(): - return itemgetter("fileId") - - -@pytest.fixture() -def items(): - numbers = [f"{i}".encode() for i in range(3)] - return pair_data_with_queue_message(numbers) diff --git a/test/fixtures/input.py b/test/fixtures/input.py index b6387ae..00e5c90 100644 --- a/test/fixtures/input.py +++ b/test/fixtures/input.py @@ -11,7 +11,6 @@ from pyinfra.server.normalization import normalize_item from pyinfra.server.packing import pack, unpack from pyinfra.utils.func import star, lift, lstarlift from test.utils.image import image_to_bytes -from test.utils.input import pair_data_with_queue_message from test.utils.pdf import pdf_stream @@ -74,7 +73,7 @@ def strings_to_bytes(strings): @pytest.fixture -def targets(data_message_pairs, input_data_items, operation, metadata, server_side_test): +def targets(data_message_pairs, input_data_items, operation, metadata, server_side_test, queue_message_metadata): """TODO: this has become super wonky""" metadata = [{**m1, **m2} for m1, m2 in zip(lmap(second, data_message_pairs), metadata)] @@ -87,7 +86,7 @@ def targets(data_message_pairs, input_data_items, operation, metadata, server_si response_data, response_metadata = zip(*map(unpack, flatten(starmap(op, zip(input_data_items, metadata))))) queue_message_keys = ["id"] * (not server_side_test) + [ - *second(first(pair_data_with_queue_message([b""]))).keys() + *first(queue_message_metadata).keys() ] response_metadata = lmap(partial(omit, keys=queue_message_keys), response_metadata) expected = lzip(response_data, response_metadata) @@ -99,8 +98,8 @@ def targets(data_message_pairs, input_data_items, operation, metadata, server_si @pytest.fixture -def endpoint(url): - return f"{url}/submit" +def endpoint(url, operation_name): + return f"{url}/{operation_name}" @pytest.fixture(params=["rest", "basic"]) @@ -148,6 +147,19 @@ def metadata(n_items, many_to_n): return list(repeat({"key": "value"}, times=n_items)) +@pytest.fixture +def queue_message_metadata(n_items, operation_name): + def metadata(i): + return { + "dossierId": "folder", + "fileId": f"file{i}", + "pages": [0, 2, 3], + "operation": operation_name, + } + + return lmap(metadata, range(n_items)) + + @pytest.fixture def packages(input_data_items, metadata): return lstarlift(pack)(zip(input_data_items, metadata)) diff --git a/test/integration_tests/serve_test.py b/test/integration_tests/serve_test.py index bd9e6ca..4465a54 100644 --- a/test/integration_tests/serve_test.py +++ b/test/integration_tests/serve_test.py @@ -4,7 +4,7 @@ from itertools import starmap, repeat, chain from operator import itemgetter import pytest -from funcy import compose, first, second, pluck, lflatten +from funcy import compose, first, second, pluck, lflatten, lzip from pyinfra.config import CONFIG from pyinfra.default_objects import ComponentFactory, get_component_factory @@ -13,7 +13,6 @@ from pyinfra.server.packing import unpack, pack from pyinfra.utils.encoding import compress, decompress from pyinfra.visitor import QueueVisitor from test.config import CONFIG as TEST_CONFIG -from test.utils.input import pair_data_with_queue_message @pytest.mark.parametrize( @@ -119,9 +118,8 @@ def data_metadata_packs(input_data_items, metadata): @pytest.fixture -def data_message_pairs(data_metadata_packs): - data_message_pairs = pair_data_with_queue_message(data_metadata_packs) - return data_message_pairs +def data_message_pairs(data_metadata_packs, queue_message_metadata): + return lzip(data_metadata_packs, queue_message_metadata) # TODO: refactor; too many params diff --git a/test/unit_tests/server/pipeline_test.py b/test/unit_tests/server/pipeline_test.py index 1508c57..0197d95 100644 --- a/test/unit_tests/server/pipeline_test.py +++ b/test/unit_tests/server/pipeline_test.py @@ -1,5 +1,5 @@ import pytest -from funcy import rcompose, compose, project, second, merge +from funcy import rcompose, compose, project, second, merge, lpluck from pyinfra.server.buffering.stream import FlatStreamBuffer from pyinfra.server.client_pipeline import ClientPipeline @@ -15,14 +15,13 @@ from pyinfra.server.receiver.receivers.rest import RestReceiver from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction from pyinfra.server.utils import make_streamable_and_wrap_in_packing_logic from pyinfra.utils.func import lift, llift -from test.utils.input import pair_data_with_queue_message def test_mock_pipeline(): data = [1, 2, 3] - f, g, h, u = map(lift, [lambda x: x**2, lambda x: x + 2, lambda x: x / 2, lambda x: x]) + f, g, h, u = map(lift, [lambda x: x ** 2, lambda x: x + 2, lambda x: x / 2, lambda x: x]) pipeline = ClientPipeline(f, g, h, u) @@ -31,23 +30,21 @@ def test_mock_pipeline(): @pytest.mark.parametrize("client_pipeline_type", ["basic", "rest"]) @pytest.mark.parametrize("server_side_test", [True]) -def test_pipeline(core_operation, client_pipeline, input_data_items, metadata, targets, n_items): +def test_pipeline( + core_operation, client_pipeline, input_data_items, metadata, queue_message_metadata, targets, n_items +): assert core_operation is not Nothing - metadata = add_queue_metadata_to_server_side_metadata(metadata) + metadata = [ + merge(storage_mdt, project(queue_mdt, ["operation", "pages"])) + for storage_mdt, queue_mdt in zip(metadata, queue_message_metadata) + ] output = compose(llift(unpack), client_pipeline)(input_data_items, metadata) assert n_items == 0 or len(output) > 0 assert output == targets -def add_queue_metadata_to_server_side_metadata(metadata): - return [ - merge(project(second(mdt), [*mdt_o.keys(), "pages"]), mdt_o) - for mdt, mdt_o in zip(pair_data_with_queue_message(metadata), metadata) - ] - - @pytest.mark.parametrize("item_type", ["string"]) @pytest.mark.parametrize("n_items", [1]) def test_pipeline_is_lazy(input_data_items, metadata, basic_client_pipeline, buffer_size): diff --git a/test/utils/input.py b/test/utils/input.py deleted file mode 100644 index 540aef1..0000000 --- a/test/utils/input.py +++ /dev/null @@ -1,16 +0,0 @@ -from typing import Iterable - - -# TODO: make into fixture -def pair_data_with_queue_message(data: Iterable[bytes]): - def inner(): - for i, d in enumerate(data): - body = { - "dossierId": "folder", - "fileId": f"file{i}", - "pages": [0, 2, 3], - "operation": "multi_inp_op", # TODO: de-hardcode - } - yield d, body - - return list(inner())