fixed bug in compute_next of on demand processor that skipped all but the first return value of 1 -> n functions

This commit is contained in:
Matthias Bisping 2022-05-10 11:07:54 +02:00
parent 3dba896038
commit 949413af4a
3 changed files with 10 additions and 7 deletions

View File

@ -12,15 +12,20 @@ class OnDemandProcessor:
def __init__(self, fn):
"""Function `fn` has to return an iterable and ideally is a generator."""
self.execution_queue = chain([])
self.fn = bufferize(fn)
self.fn = bufferize(fn, null_value=[])
self.result_stream = chain([])
def submit(self, package, **kwargs) -> None:
self.execution_queue = chain(self.execution_queue, [package])
def compute_next(self) -> Union[Nothing, Any]:
return next(self.compute())
try:
return next(self.result_stream)
except StopIteration:
self.result_stream = chain(self.result_stream, self.compute())
return self.compute_next()
def compute(self):
items = chain(self.execution_queue, [Nothing])
yield from compose(flatten, compact, lift(self.fn))(items)
yield from compose(flatten, lift(self.fn))(items)
yield Nothing

View File

@ -7,7 +7,7 @@ from test.utils.pdf import pdf_stream
@pytest.fixture
def pdf(n_pages):
pdf = fpdf.FPDF(unit="pt")
for _ in range(n_pages + 1):
for _ in range(n_pages):
pdf.add_page()
return pdf_stream(pdf)

View File

@ -1,15 +1,14 @@
import pytest
from funcy import rcompose, lmap
from pyinfra.server.client_pipeline import ClientPipeline
from pyinfra.server.dispatcher.dispatchers.forwarding import ForwardingDispatcher
from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher
from pyinfra.server.interpreter.interpreters.identity import IdentityInterpreter
from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer
from pyinfra.server.packer.packers.rest import RestPacker
from pyinfra.server.client_pipeline import ClientPipeline
from pyinfra.server.receiver.receivers.identity import IdentityReceiver
from pyinfra.server.receiver.receivers.rest import RestReceiver
from pyinfra.server.server import make_streamable
from pyinfra.server.utils import unpack
from pyinfra.utils.func import lift
@ -32,7 +31,6 @@ def test_mock_pipeline():
# "basic",
],
)
@pytest.mark.parametrize("item_type", ["string"])
def test_pipeline(client_pipeline, input_data_items, metadata, target_data_items):
output = client_pipeline(input_data_items, metadata)
output = lmap(unpack, output)