From 14ab23b2ccc3df46dd93b2bf413aac6044fdea54 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Mon, 13 Jun 2022 15:36:17 +0200 Subject: [PATCH] fixed bug in operation wrapper returning a tuple instead of an singleton-iterable with a tuple in one of the return-cases. --- test/fixtures/input.py | 17 ++++------------- test/fixtures/server.py | 7 +------ test/unit_tests/server/pipeline_test.py | 13 ++++++++++++- 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/test/fixtures/input.py b/test/fixtures/input.py index 7d07b07..298bbe1 100644 --- a/test/fixtures/input.py +++ b/test/fixtures/input.py @@ -4,7 +4,7 @@ from itertools import starmap, repeat import numpy as np import pytest from PIL import Image -from funcy import lmap, compose, flatten, lflatten, omit, join, pluck, lpluck, second, project, first, lzip +from funcy import lmap, compose, flatten, lflatten, omit, second, first, lzip from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.normalization import normalize_item @@ -84,10 +84,7 @@ def strings_to_bytes(strings): @pytest.fixture def targets(data_message_pairs, input_data_items, operation, metadata): """TODO: this has become super wonky""" - print(data_message_pairs) - klaus = lmap(second, data_message_pairs) - print(klaus) - metadata = [{**m1, **m2} for m1, m2 in zip(klaus, metadata)] + metadata = [{**m1, **m2} for m1, m2 in zip(lmap(second, data_message_pairs), metadata)] if operation is Nothing: return Nothing @@ -98,21 +95,15 @@ def targets(data_message_pairs, input_data_items, operation, metadata): response_data, response_metadata = zip(*map(unpack, flatten(starmap(op, zip(input_data_items, metadata))))) queue_message_keys = second(first(pair_data_with_queue_message([b""]))).keys() - print(queue_message_keys) response_metadata = lmap(partial(omit, keys=queue_message_keys), response_metadata) expected = lzip(response_data, response_metadata) except ValueError: - print() - print(input_data_items) - print(metadata) expected = [] - print("expected", expected) return expected - @pytest.fixture def endpoint(url): return f"{url}/submit" @@ -156,8 +147,8 @@ def images_to_bytes(images): @pytest.fixture -def metadata(): - return repeat({"key": "value"}) +def metadata(n_items): + return list(repeat({"key": "value"}, times=n_items)) @pytest.fixture diff --git a/test/fixtures/server.py b/test/fixtures/server.py index b238651..1508396 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -63,14 +63,12 @@ def operation(core_operation): def op(data, metadata): assert isinstance(metadata, dict) result = core_operation(data, metadata) - print("result", result, type(result)) if isinstance(result, Generator): for data, metadata in result: - print(5555555555555555555555555555555) yield data, omit(metadata, ["pages", "operation"]) else: data, metadata = result - return data, omit(metadata, ["pages", "operation"]) + yield data, omit(metadata, ["pages", "operation"]) if core_operation is Nothing: return Nothing @@ -87,11 +85,8 @@ def core_operation(item_type, one_to_many, analysis_task): return string.decode().upper().encode(), metadata def extract(string: bytes, metadata): - print() - print("metadata", metadata) for i, c in project(dict(enumerate(string.decode())), metadata["pages"]).items(): metadata["id"] = i - print("XYZ", metadata) yield c.encode(), metadata def rotate(im: bytes, metadata): diff --git a/test/unit_tests/server/pipeline_test.py b/test/unit_tests/server/pipeline_test.py index 325ed7b..d914252 100644 --- a/test/unit_tests/server/pipeline_test.py +++ b/test/unit_tests/server/pipeline_test.py @@ -1,5 +1,5 @@ import pytest -from funcy import rcompose, compose +from funcy import rcompose, compose, project, second, merge from pyinfra.server.buffering.stream import FlatStreamBuffer from pyinfra.server.client_pipeline import ClientPipeline @@ -15,6 +15,7 @@ from pyinfra.server.receiver.receivers.rest import RestReceiver from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction from pyinfra.server.utils import make_streamable_and_wrap_in_packing_logic from pyinfra.utils.func import lift, llift +from test.utils.input import pair_data_with_queue_message def test_mock_pipeline(): @@ -31,11 +32,21 @@ def test_mock_pipeline(): @pytest.mark.parametrize("client_pipeline_type", ["rest", "basic"]) def test_pipeline(core_operation, client_pipeline, input_data_items, metadata, targets, n_items): assert core_operation is not Nothing + + metadata = add_queue_metadata_to_server_side_metadata(metadata) + output = compose(llift(unpack), client_pipeline)(input_data_items, metadata) assert n_items == 0 or len(output) > 0 assert output == targets +def add_queue_metadata_to_server_side_metadata(metadata): + return [ + merge(project(second(mdt), [*mdt_o.keys(), "pages"]), mdt_o) + for mdt, mdt_o in zip(pair_data_with_queue_message(metadata), metadata) + ] + + @pytest.mark.parametrize("item_type", ["string"]) @pytest.mark.parametrize("n_items", [1]) def test_pipeline_is_lazy(input_data_items, metadata, basic_client_pipeline, buffer_size):