diff --git a/pyinfra/server/dispatcher/dispatchers/forwarding.py b/pyinfra/server/dispatcher/dispatchers/forwarding.py index 68306e3..390cc52 100644 --- a/pyinfra/server/dispatcher/dispatchers/forwarding.py +++ b/pyinfra/server/dispatcher/dispatchers/forwarding.py @@ -10,5 +10,4 @@ class QueuedStreamFunctionDispatcher(Dispatcher): return self.queue.pop() def post(self, package): - self.queue.push(package) - return self.queue.pop() + return self.patch(package) diff --git a/test/unit_tests/server/pipeline_test.py b/test/unit_tests/server/pipeline_test.py index 0a3bcc9..ec48807 100644 --- a/test/unit_tests/server/pipeline_test.py +++ b/test/unit_tests/server/pipeline_test.py @@ -1,5 +1,5 @@ import pytest -from funcy import rcompose, lmap +from funcy import rcompose, compose from pyinfra.server.buffering.stream import FlatStreamBuffer from pyinfra.server.client_pipeline import ClientPipeline @@ -13,7 +13,8 @@ from pyinfra.server.packing import unpack from pyinfra.server.receiver.receivers.identity import QueuedStreamFunctionReceiver from pyinfra.server.receiver.receivers.rest import RestReceiver from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction -from pyinfra.utils.func import lift +from pyinfra.server.utils import make_streamable_and_wrap_in_packing_logic +from pyinfra.utils.func import lift, llift def test_mock_pipeline(): @@ -37,31 +38,26 @@ def test_mock_pipeline(): def test_pipeline(core_operation, client_pipeline, input_data_items, metadata, targets, item_type, one_to_many): if core_operation is Nothing: pytest.skip(f"No operation defined for parameter combination: {item_type=}, {one_to_many=}") - output = client_pipeline(input_data_items, metadata) - output = lmap(unpack, output) + output = compose(llift(unpack), client_pipeline)(input_data_items, metadata) assert output == targets -# @pytest.mark.parametrize("item_type", ["string"]) -# @pytest.mark.parametrize("n_items", [1]) -# def test_pipeline_is_lazy(input_data_items, metadata): -# def lazy_test_fn(*args, **kwargs): -# probe["executed"] = True -# return b"null", {} -# -# probe = {"executed": False} -# processor_fn = make_streamable(lazy_test_fn, buffer_size=3, batched=False) -# -# client_pipeline = ClientPipeline( -# RestPacker(), ForwardingDispatcher(processor_fn), IdentityReceiver(), IdentityInterpreter() -# ) -# output = client_pipeline(input_data_items, metadata) -# -# assert not probe["executed"] -# -# list(output) -# -# assert probe["executed"] +@pytest.mark.parametrize("item_type", ["string"]) +@pytest.mark.parametrize("n_items", [1]) +def test_pipeline_is_lazy(input_data_items, metadata, basic_client_pipeline, buffer_size): + def lazy_test_fn(*args, **kwargs): + probe["executed"] = True + return b"null", {} + + probe = {"executed": False} + stream_function = make_streamable_and_wrap_in_packing_logic(lazy_test_fn, batched=False) + + client_pipeline = get_basic_client_pipeline(stream_function, buffer_size=buffer_size) + output = client_pipeline(input_data_items, metadata) + + assert not probe["executed"] + list(output) + assert probe["executed"] @pytest.fixture @@ -80,13 +76,21 @@ def rest_client_pipeline(server_process, endpoint, rest_interpreter): @pytest.fixture def basic_client_pipeline(endpoint, rest_interpreter, server_stream_function, buffer_size): + return get_basic_client_pipeline(server_stream_function, buffer_size) - flat_stream_buffer = FlatStreamBuffer(server_stream_function, buffer_size=buffer_size) - queued_stream_function = QueuedStreamFunction(flat_stream_buffer) + +def get_basic_client_pipeline(stream_function, buffer_size=3): return ClientPipeline( RestPacker(), - QueuedStreamFunctionDispatcher(queued_stream_function), + QueuedStreamFunctionDispatcher( + QueuedStreamFunction( + FlatStreamBuffer( + stream_function, + buffer_size=buffer_size, + ), + ), + ), QueuedStreamFunctionReceiver(), IdentityInterpreter(), )