45 lines
1.1 KiB
Python
45 lines
1.1 KiB
Python
from itertools import takewhile
|
|
|
|
import pytest
|
|
from funcy import repeatedly
|
|
|
|
from pyinfra.server.dispatcher.dispatcher import Nothing
|
|
from pyinfra.server.bufferizer.lazy_bufferizer import LazyBufferizer
|
|
from pyinfra.utils.func import lift
|
|
|
|
|
|
def func(x):
|
|
return [x ** 2]
|
|
|
|
|
|
def test_lazy_bufferizer():
|
|
|
|
lazy_bufferizer = LazyBufferizer(lift(func))
|
|
|
|
for i in range(10):
|
|
lazy_bufferizer.submit(i)
|
|
|
|
output = list(takewhile(lambda r: r != Nothing, repeatedly(lazy_bufferizer.compute_next)))
|
|
|
|
assert output == [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
|
|
|
|
|
|
def test_lazy_bufferizer_returns_nothing_when_input_queue_is_empty():
|
|
|
|
lazy_bufferizer = LazyBufferizer(lift(func))
|
|
assert lazy_bufferizer.compute_next() is Nothing
|
|
assert lazy_bufferizer.compute_next() is Nothing
|
|
|
|
lazy_bufferizer.submit(2)
|
|
assert lazy_bufferizer.compute_next() == 4
|
|
assert lazy_bufferizer.compute_next() is Nothing
|
|
|
|
|
|
def test_lazy_bufferizer_raises_when_wrapped_function_is_not_mappable():
|
|
|
|
lazy_bufferizer = LazyBufferizer(func)
|
|
|
|
with pytest.raises(TypeError):
|
|
lazy_bufferizer.submit(2)
|
|
assert lazy_bufferizer.compute_next() == 4
|