pyinfra/test/unit_tests/server/processor_test.py
2022-05-07 00:23:22 +02:00

21 lines
493 B
Python

from itertools import takewhile
from funcy import repeatedly
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.processor.processor import OnDemandProcessor
def test_processor():
def func(x):
return [x ** 2]
processor = OnDemandProcessor(func)
for i in range(10):
processor.submit(i)
output = list(takewhile(lambda r: r != Nothing, repeatedly(processor.compute_next)))
assert output == [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]