diff --git a/pyinfra/utils/buffer.py b/pyinfra/server/processor/buffer.py similarity index 100% rename from pyinfra/utils/buffer.py rename to pyinfra/server/processor/buffer.py diff --git a/pyinfra/server/processor/processor.py b/pyinfra/server/processor/lazy_bufferizer.py similarity index 54% rename from pyinfra/server/processor/processor.py rename to pyinfra/server/processor/lazy_bufferizer.py index adaf120..be11701 100644 --- a/pyinfra/server/processor/processor.py +++ b/pyinfra/server/processor/lazy_bufferizer.py @@ -1,18 +1,21 @@ +import logging from itertools import chain from typing import Union, Any -from funcy import flatten, compose, compact +from funcy import flatten, compose from pyinfra.server.dispatcher.dispatcher import Nothing -from pyinfra.utils.buffer import bufferize +from pyinfra.server.processor.buffer import bufferize from pyinfra.utils.func import lift +logger = logging.getLogger(__name__) -class OnDemandProcessor: - def __init__(self, fn): - """Function `fn` has to return an iterable and ideally is a generator.""" + +class LazyBufferizer: + def __init__(self, fn, buffer_size=3): + """Function `fn` has to return an iterable, ideally is a generator and needs to be mappable.""" self.execution_queue = chain([]) - self.fn = bufferize(fn, null_value=[]) + self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[]) self.result_stream = chain([]) def submit(self, package, **kwargs) -> None: @@ -23,7 +26,9 @@ class OnDemandProcessor: return next(self.result_stream) except StopIteration: self.result_stream = chain(self.result_stream, self.compute()) - return self.compute_next() + return self.compute_next() # Produces stream of `Nothing` if execution queue is empty + except TypeError as err: + raise TypeError("Function failed with type error. Is it mappable?") from err def compute(self): items = chain(self.execution_queue, [Nothing]) diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index 5ecfc0e..d684525 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -5,7 +5,7 @@ from flask import Flask, jsonify, request from funcy import rcompose, compose, first, chunks, flatten from pyinfra.server.dispatcher.dispatcher import Nothing -from pyinfra.server.processor.processor import OnDemandProcessor +from pyinfra.server.processor.lazy_bufferizer import LazyBufferizer from pyinfra.server.utils import unpack, normalize, pack from pyinfra.utils.func import starlift, lift @@ -32,7 +32,7 @@ def make_streamable(operation, batched, batch_size): return operation -class RestStreamProcessor(OnDemandProcessor): +class RestStreamProcessor(LazyBufferizer): """Wraps an on-demand-processor. Combine with a webserver that provides the endpoints 'submit' and 'pickup'.""" def __init__(self, fn, submit_suffix="submit", pickup_suffix="pickup"): diff --git a/test/conftest.py b/test/conftest.py index d1c9da6..d5c3f5b 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -31,7 +31,11 @@ pytest_plugins = [ ] -@pytest.fixture(autouse=True) +logging.getLogger("PIL.PngImagePlugin").setLevel(level=logging.CRITICAL + 1) +logging.getLogger("waitress").setLevel(level=logging.CRITICAL + 1) + + +@pytest.fixture(autouse=False) def mute_logger(): logger.setLevel(logging.CRITICAL + 1) diff --git a/test/unit_tests/server/buffer_test.py b/test/unit_tests/server/buffer_test.py index 4942fdf..66fd186 100644 --- a/test/unit_tests/server/buffer_test.py +++ b/test/unit_tests/server/buffer_test.py @@ -1,7 +1,7 @@ from funcy import compose, lmapcat, compact, flatten from pyinfra.server.dispatcher.dispatcher import Nothing -from pyinfra.utils.buffer import bufferize +from pyinfra.server.processor.buffer import bufferize def test_buffer(): diff --git a/test/unit_tests/server/lazy_bufferizer_test.py b/test/unit_tests/server/lazy_bufferizer_test.py new file mode 100644 index 0000000..e035172 --- /dev/null +++ b/test/unit_tests/server/lazy_bufferizer_test.py @@ -0,0 +1,44 @@ +from itertools import takewhile + +import pytest +from funcy import repeatedly + +from pyinfra.server.dispatcher.dispatcher import Nothing +from pyinfra.server.processor.lazy_bufferizer import LazyBufferizer +from pyinfra.utils.func import lift + + +def func(x): + return [x ** 2] + + +def test_lazy_bufferizer(): + + lazy_bufferizer = LazyBufferizer(lift(func)) + + for i in range(10): + lazy_bufferizer.submit(i) + + output = list(takewhile(lambda r: r != Nothing, repeatedly(lazy_bufferizer.compute_next))) + + assert output == [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] + + +def test_lazy_bufferizer_returns_nothing_when_input_queue_is_empty(): + + lazy_bufferizer = LazyBufferizer(lift(func)) + assert lazy_bufferizer.compute_next() is Nothing + assert lazy_bufferizer.compute_next() is Nothing + + lazy_bufferizer.submit(2) + assert lazy_bufferizer.compute_next() == 4 + assert lazy_bufferizer.compute_next() is Nothing + + +def test_lazy_bufferizer_raises_when_wrapped_function_is_not_mappable(): + + lazy_bufferizer = LazyBufferizer(func) + + with pytest.raises(TypeError): + lazy_bufferizer.submit(2) + assert lazy_bufferizer.compute_next() == 4 diff --git a/test/unit_tests/server/processor_test.py b/test/unit_tests/server/processor_test.py deleted file mode 100644 index d19a0f2..0000000 --- a/test/unit_tests/server/processor_test.py +++ /dev/null @@ -1,20 +0,0 @@ -from itertools import takewhile - -from funcy import repeatedly - -from pyinfra.server.dispatcher.dispatcher import Nothing -from pyinfra.server.processor.processor import OnDemandProcessor - - -def test_processor(): - def func(x): - return [x ** 2] - - processor = OnDemandProcessor(func) - - for i in range(10): - processor.submit(i) - - output = list(takewhile(lambda r: r != Nothing, repeatedly(processor.compute_next))) - - assert output == [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]