pipelin laziness test works again

This commit is contained in:
Matthias Bisping 2022-05-17 10:44:43 +02:00
parent 9c262e7138
commit 5590669939
2 changed files with 32 additions and 29 deletions

View File

@ -10,5 +10,4 @@ class QueuedStreamFunctionDispatcher(Dispatcher):
return self.queue.pop()
def post(self, package):
self.queue.push(package)
return self.queue.pop()
return self.patch(package)

View File

@ -1,5 +1,5 @@
import pytest
from funcy import rcompose, lmap
from funcy import rcompose, compose
from pyinfra.server.buffering.stream import FlatStreamBuffer
from pyinfra.server.client_pipeline import ClientPipeline
@ -13,7 +13,8 @@ from pyinfra.server.packing import unpack
from pyinfra.server.receiver.receivers.identity import QueuedStreamFunctionReceiver
from pyinfra.server.receiver.receivers.rest import RestReceiver
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
from pyinfra.utils.func import lift
from pyinfra.server.utils import make_streamable_and_wrap_in_packing_logic
from pyinfra.utils.func import lift, llift
def test_mock_pipeline():
@ -37,31 +38,26 @@ def test_mock_pipeline():
def test_pipeline(core_operation, client_pipeline, input_data_items, metadata, targets, item_type, one_to_many):
if core_operation is Nothing:
pytest.skip(f"No operation defined for parameter combination: {item_type=}, {one_to_many=}")
output = client_pipeline(input_data_items, metadata)
output = lmap(unpack, output)
output = compose(llift(unpack), client_pipeline)(input_data_items, metadata)
assert output == targets
# @pytest.mark.parametrize("item_type", ["string"])
# @pytest.mark.parametrize("n_items", [1])
# def test_pipeline_is_lazy(input_data_items, metadata):
# def lazy_test_fn(*args, **kwargs):
# probe["executed"] = True
# return b"null", {}
#
# probe = {"executed": False}
# processor_fn = make_streamable(lazy_test_fn, buffer_size=3, batched=False)
#
# client_pipeline = ClientPipeline(
# RestPacker(), ForwardingDispatcher(processor_fn), IdentityReceiver(), IdentityInterpreter()
# )
# output = client_pipeline(input_data_items, metadata)
#
# assert not probe["executed"]
#
# list(output)
#
# assert probe["executed"]
@pytest.mark.parametrize("item_type", ["string"])
@pytest.mark.parametrize("n_items", [1])
def test_pipeline_is_lazy(input_data_items, metadata, basic_client_pipeline, buffer_size):
def lazy_test_fn(*args, **kwargs):
probe["executed"] = True
return b"null", {}
probe = {"executed": False}
stream_function = make_streamable_and_wrap_in_packing_logic(lazy_test_fn, batched=False)
client_pipeline = get_basic_client_pipeline(stream_function, buffer_size=buffer_size)
output = client_pipeline(input_data_items, metadata)
assert not probe["executed"]
list(output)
assert probe["executed"]
@pytest.fixture
@ -80,13 +76,21 @@ def rest_client_pipeline(server_process, endpoint, rest_interpreter):
@pytest.fixture
def basic_client_pipeline(endpoint, rest_interpreter, server_stream_function, buffer_size):
return get_basic_client_pipeline(server_stream_function, buffer_size)
flat_stream_buffer = FlatStreamBuffer(server_stream_function, buffer_size=buffer_size)
queued_stream_function = QueuedStreamFunction(flat_stream_buffer)
def get_basic_client_pipeline(stream_function, buffer_size=3):
return ClientPipeline(
RestPacker(),
QueuedStreamFunctionDispatcher(queued_stream_function),
QueuedStreamFunctionDispatcher(
QueuedStreamFunction(
FlatStreamBuffer(
stream_function,
buffer_size=buffer_size,
),
),
),
QueuedStreamFunctionReceiver(),
IdentityInterpreter(),
)