pyinfra/test/unit_tests/server/stream_buffer_test.py
Matthias Bisping 092a0e2964 renaming
2022-05-13 17:16:27 +02:00

64 lines
1.6 KiB
Python

import pytest
from funcy import repeatedly, takewhile, notnone, lmap, lmapcat
from pyinfra.server.bufferizer.lazy_bufferizer import FlatStreamBuffer, StreamBuffer
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.server import QueuedStreamFunction
from pyinfra.utils.func import lift, foreach
@pytest.fixture
def func(one_to_many):
def fn(x):
y = x ** 2
return y if not one_to_many else (y, y)
return fn
def test_stream_buffer(func, inputs, outputs, buffer_size):
stream_buffer = StreamBuffer(lift(func), buffer_size=buffer_size)
assert lmapcat(stream_buffer, (*inputs, Nothing)) == outputs
assert lmapcat(stream_buffer, [Nothing]) == []
@pytest.mark.parametrize("n_items", [1])
def test_stream_buffer_catches_type_error(func, inputs, outputs):
stream_buffer = StreamBuffer(func)
with pytest.raises(TypeError):
lmapcat(stream_buffer, inputs)
def test_flat_stream_buffer(func, inputs, outputs, buffer_size):
flat_stream_buffer = FlatStreamBuffer(lift(func), buffer_size=buffer_size)
assert list(flat_stream_buffer(inputs)) == outputs
assert list(flat_stream_buffer([])) == []
def test_queued_function(func, inputs, outputs):
lazy_processor = QueuedStreamFunction(lift(func))
foreach(lazy_processor.push, inputs)
assert list(takewhile(notnone, repeatedly(lazy_processor.pop))) == outputs
@pytest.fixture
def inputs(n_items):
return range(n_items)
@pytest.fixture
def outputs(inputs, func):
return lmap(func, inputs)
@pytest.fixture(params=[0, 1, 3, 10, 12])
def buffer_size(request):
return request.param