diff --git a/test/fixtures/input.py b/test/fixtures/input.py index 9562159..a9bd2a0 100644 --- a/test/fixtures/input.py +++ b/test/fixtures/input.py @@ -3,12 +3,13 @@ from itertools import starmap, repeat import numpy as np import pytest from PIL import Image -from funcy import lmap, compose, flatten +from funcy import lmap, compose, flatten, lflatten from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.utils import pack, unpack, normalize_item from pyinfra.utils.func import star, lift, lstarlift from test.utils.image import image_to_bytes +from test.utils.server import string_to_bytes @pytest.fixture @@ -32,9 +33,22 @@ def input_data_items(item_type, n_items, pdf): @pytest.fixture -def targets(input_data_items, item_type, operation, metadata): +def target_data_items(input_data_items, core_operation): + + if core_operation is Nothing: + return Nothing + + op = compose(normalize_item, core_operation) + expected = lflatten(map(op, input_data_items)) + return expected + + +@pytest.fixture +def targets(input_data_items, operation, metadata): + if operation is Nothing: return Nothing + op = compose(lift(star(pack)), normalize_item, operation) expected = lmap(unpack, flatten(starmap(op, zip(input_data_items, metadata)))) return expected diff --git a/test/fixtures/server.py b/test/fixtures/server.py index dd277be..1e89047 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -72,10 +72,6 @@ def operation(core_operation): return op -class InvalidParameterCombination(ValueError): - pass - - @pytest.fixture def core_operation(item_type, one_to_many): def upper(string: bytes): diff --git a/test/unit_tests/server/pipeline_test.py b/test/unit_tests/server/pipeline_test.py index 6ba291f..c881b63 100644 --- a/test/unit_tests/server/pipeline_test.py +++ b/test/unit_tests/server/pipeline_test.py @@ -12,7 +12,6 @@ from pyinfra.server.receiver.receivers.identity import IdentityReceiver from pyinfra.server.receiver.receivers.rest import RestReceiver from pyinfra.server.utils import unpack from pyinfra.utils.func import lift -from test.fixtures.server import InvalidParameterCombination def test_mock_pipeline(): @@ -26,16 +25,6 @@ def test_mock_pipeline(): assert list(pipeline(data)) == list(rcompose(f, g, h, u)(data)) -def pass_invalid_combination(fn): - def inner(*args, **kwargs): - try: - return fn(*args, **kwargs) - except InvalidParameterCombination: - pass - - return inner - - @pytest.mark.parametrize( "client_pipeline_type", [ diff --git a/test/unit_tests/server/stream_buffer_test.py b/test/unit_tests/server/stream_buffer_test.py index 2ee429c..662ce3c 100644 --- a/test/unit_tests/server/stream_buffer_test.py +++ b/test/unit_tests/server/stream_buffer_test.py @@ -39,6 +39,17 @@ def test_flat_stream_buffer(func, inputs, outputs, buffer_size): assert list(flat_stream_buffer([])) == [] +def test_flat_stream_buffer_on_different_data(core_operation, input_data_items, target_data_items, buffer_size, item_type, one_to_many): + + if core_operation is Nothing: + pytest.skip(f"No operation defined for parameter combination: {item_type=}, {one_to_many=}") + + flat_stream_buffer = FlatStreamBuffer(lift(core_operation), buffer_size=buffer_size) + + assert list(flat_stream_buffer(input_data_items)) == target_data_items + assert list(flat_stream_buffer([])) == [] + + def test_queued_stream_function(func, inputs, outputs): queued_stream_function = QueuedStreamFunction(lift(func))