From 6cb13051eb9d53bf2e665269a9fdaf24277fb678 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Tue, 17 May 2022 21:48:16 +0200 Subject: [PATCH] fixed following bugs: - upper() did yield instead of return - metdadata was not repeated when zipping with results generator - since test metadata was empty dict, target data was therefore empty always, since results were zipped with {} - hence added check for target lengths > 0 - fixed return value of queued stream function dispatcher; only returned first item of 1 -> n results --- pyinfra/server/client_pipeline.py | 1 + pyinfra/server/dispatcher/dispatchers/queue.py | 11 +++++++++-- test/conftest.py | 2 +- test/fixtures/input.py | 2 +- test/fixtures/server.py | 6 ++++-- test/unit_tests/server/pipeline_test.py | 6 +++++- 6 files changed, 21 insertions(+), 7 deletions(-) diff --git a/pyinfra/server/client_pipeline.py b/pyinfra/server/client_pipeline.py index 4ebde5a..cd9c56e 100644 --- a/pyinfra/server/client_pipeline.py +++ b/pyinfra/server/client_pipeline.py @@ -1,6 +1,7 @@ from funcy import rcompose, flatten +# TODO: remove the dispatcher component from the pipeline; it no longer actually dispatches class ClientPipeline: def __init__(self, packer, dispatcher, receiver, interpreter): self.pipe = rcompose( diff --git a/pyinfra/server/dispatcher/dispatchers/queue.py b/pyinfra/server/dispatcher/dispatchers/queue.py index b61374b..347c917 100644 --- a/pyinfra/server/dispatcher/dispatchers/queue.py +++ b/pyinfra/server/dispatcher/dispatchers/queue.py @@ -1,3 +1,7 @@ +from itertools import takewhile + +from funcy import repeatedly, notnone + from pyinfra.server.dispatcher.dispatcher import Dispatcher from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction @@ -8,7 +12,10 @@ class QueuedStreamFunctionDispatcher(Dispatcher): def patch(self, package): self.queued_stream_function.push(package) - return self.queued_stream_function.pop() + # TODO: this is wonky and a result of the pipeline components having shifted behaviour through previous + # refactorings. The analogous functionality for the rest pipeline is in the interpreter. Correct this + # asymmetry! + yield from takewhile(notnone, repeatedly(self.queued_stream_function.pop)) def post(self, package): - return self.patch(package) + yield from self.patch(package) diff --git a/test/conftest.py b/test/conftest.py index 8c19604..660f016 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -36,7 +36,7 @@ logging.getLogger("PIL.PngImagePlugin").setLevel(level=logging.CRITICAL + 1) logging.getLogger("waitress").setLevel(level=logging.CRITICAL + 1) -@pytest.fixture(autouse=False) +@pytest.fixture(autouse=True) def mute_logger(): logger.setLevel(logging.CRITICAL + 1) diff --git a/test/fixtures/input.py b/test/fixtures/input.py index 2024889..07e3fd9 100644 --- a/test/fixtures/input.py +++ b/test/fixtures/input.py @@ -93,7 +93,7 @@ def images(n_items): @pytest.fixture def metadata(n_items): - return list(repeat({}, n_items)) + return list(repeat({"key": "value"}, n_items)) @pytest.fixture diff --git a/test/fixtures/server.py b/test/fixtures/server.py index 65ccb7a..ce2a1ed 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -1,5 +1,6 @@ import io import socket +from itertools import repeat from multiprocessing import Process from typing import Generator @@ -58,9 +59,10 @@ def operation_conditionally_batched(operation, batched): @pytest.fixture def operation(core_operation): def op(data, metadata): + assert isinstance(metadata, dict) result = core_operation(data) if isinstance(result, Generator): - return zip(result, metadata) + return zip(result, repeat(metadata)) else: return result, metadata @@ -72,7 +74,7 @@ def operation(core_operation): @pytest.fixture def core_operation(item_type, one_to_many): def upper(string: bytes): - yield string.decode().upper().encode() + return string.decode().upper().encode() def duplicate(string: bytes): for _ in range(2): diff --git a/test/unit_tests/server/pipeline_test.py b/test/unit_tests/server/pipeline_test.py index 8d7fcd0..aae4717 100644 --- a/test/unit_tests/server/pipeline_test.py +++ b/test/unit_tests/server/pipeline_test.py @@ -29,10 +29,14 @@ def test_mock_pipeline(): @pytest.mark.parametrize("client_pipeline_type", ["rest", "basic"]) -def test_pipeline(core_operation, client_pipeline, input_data_items, metadata, targets, item_type, one_to_many): +def test_pipeline( + core_operation, client_pipeline, input_data_items, metadata, targets, item_type, one_to_many, n_items +): if core_operation is Nothing: pytest.skip(f"No operation defined for parameter combination: {item_type=}, {one_to_many=}") output = compose(llift(unpack), client_pipeline)(input_data_items, metadata) + if n_items > 0: + assert len(output) > 0 assert output == targets