adjusted stream buffer test for core-operations taking tuples now
This commit is contained in:
parent
c55e41f2d8
commit
91701929e5
@ -38,7 +38,7 @@ class StreamBuffer:
|
||||
try:
|
||||
yield from self.fn(item)
|
||||
except TypeError as err:
|
||||
raise TypeError("Function failed with type error. Is it mappable?") from err
|
||||
raise TypeError("Function failed with type-error. Is it mappable?") from err
|
||||
|
||||
def pop(self):
|
||||
return first(chain(self.result_stream, [Nothing]))
|
||||
|
||||
@ -26,11 +26,13 @@ logging.basicConfig()
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
# TODO: refactor all fixtures into cleanly separated modules
|
||||
pytest_plugins = [
|
||||
"test.fixtures.consumer",
|
||||
"test.fixtures.input",
|
||||
"test.fixtures.pdf",
|
||||
"test.fixtures.server",
|
||||
"test.integration_tests.serve_test",
|
||||
]
|
||||
|
||||
|
||||
|
||||
@ -81,8 +81,12 @@ def test_serving(server_process, bucket_name, components, targets, data_message_
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def data_message_pairs(input_data_items, metadata):
|
||||
data_metadata_packs = starmap(compose(lambda s: s.encode(), json.dumps, pack), zip(input_data_items, metadata))
|
||||
def data_metadata_packs(input_data_items, metadata):
|
||||
return list(starmap(compose(lambda s: s.encode(), json.dumps, pack), zip(input_data_items, metadata)))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def data_message_pairs(data_metadata_packs):
|
||||
data_message_pairs = pair_data_with_queue_message(data_metadata_packs)
|
||||
return data_message_pairs
|
||||
|
||||
|
||||
@ -4,7 +4,7 @@ from funcy import repeatedly, takewhile, notnone, lmap, lmapcat, lflatten
|
||||
from pyinfra.server.buffering.stream import FlatStreamBuffer, StreamBuffer
|
||||
from pyinfra.server.dispatcher.dispatcher import Nothing
|
||||
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
|
||||
from pyinfra.utils.func import lift, foreach
|
||||
from pyinfra.utils.func import lift, foreach, starlift
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -40,15 +40,15 @@ def test_flat_stream_buffer(func, inputs, outputs, buffer_size):
|
||||
|
||||
|
||||
def test_flat_stream_buffer_on_different_data(
|
||||
core_operation, input_data_items, target_data_items, buffer_size, item_type, one_to_many
|
||||
core_operation, input_data_items, metadata, target_data_items, buffer_size, item_type, one_to_many
|
||||
):
|
||||
|
||||
if core_operation is Nothing:
|
||||
pytest.skip(f"No operation defined for parameter combination: {item_type=}, {one_to_many=}")
|
||||
|
||||
flat_stream_buffer = FlatStreamBuffer(lift(core_operation), buffer_size=buffer_size)
|
||||
flat_stream_buffer = FlatStreamBuffer(starlift(core_operation), buffer_size=buffer_size)
|
||||
|
||||
assert list(flat_stream_buffer(input_data_items)) == target_data_items
|
||||
assert list(flat_stream_buffer(zip(input_data_items, metadata))) == target_data_items
|
||||
assert list(flat_stream_buffer([])) == []
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user