diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index 16945ee..42ab636 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -25,32 +25,31 @@ def make_streamable_and_wrap_in_packing_logic(fn, batched): return fn -class LazyProcessor: - """Accepts computation requests (push) and lazily produces results (pop).""" - def __init__(self, flat_stream_buffer: FlatStreamBuffer): +class QueuedFunction: + def __init__(self, fn): self.queue = Queue() - self.flat_stream_buffer = flat_stream_buffer + self.fn = fn def push(self, item): self.queue.append(item) def pop(self): items = stream_queue(self.queue) - return first(self.flat_stream_buffer(items)) + return self.fn(items) class LazyRestProcessor: - def __init__(self, lazy_processor: LazyProcessor, submit_suffix="submit", pickup_suffix="pickup"): + def __init__(self, queued_function: QueuedFunction, submit_suffix="submit", pickup_suffix="pickup"): self.submit_suffix = submit_suffix self.pickup_suffix = pickup_suffix - self.lazy_processor = lazy_processor + self.queued_function = queued_function def push(self, request): - self.lazy_processor.push(request.json) + self.queued_function.push(request.json) return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix)) def pop(self): - result = self.lazy_processor.pop() or Nothing + result = self.queued_function.pop() or Nothing if not valid(result): logger.error(f"Received invalid result: {result}") @@ -71,9 +70,9 @@ def valid(result): return isinstance(result, dict) or result is Nothing -def set_up_processing_server(lazy_processor: LazyProcessor): +def set_up_processing_server(queued_function: QueuedFunction): app = Flask(__name__) - processor = LazyRestProcessor(lazy_processor, submit_suffix="submit", pickup_suffix="pickup") + processor = LazyRestProcessor(queued_function, submit_suffix="submit", pickup_suffix="pickup") @app.route("/ready", methods=["GET"]) def ready(): diff --git a/test/fixtures/server.py b/test/fixtures/server.py index 393e653..7fa4ebb 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -6,11 +6,11 @@ import fitz import pytest import requests from PIL import Image -from funcy import retry +from funcy import retry, first, compose from waitress import serve from pyinfra.server.bufferizer.lazy_bufferizer import FlatStreamBuffer -from pyinfra.server.server import set_up_processing_server, make_streamable_and_wrap_in_packing_logic, LazyProcessor +from pyinfra.server.server import set_up_processing_server, make_streamable_and_wrap_in_packing_logic, QueuedFunction from pyinfra.utils.func import starlift from test.utils.image import image_to_bytes @@ -39,8 +39,8 @@ def url(host, port): @pytest.fixture def server(processor_fn, buffer_size): flat_stream_buffer = FlatStreamBuffer(processor_fn, buffer_size=buffer_size) - lazy_processor = LazyProcessor(flat_stream_buffer) - return set_up_processing_server(lazy_processor) + queued_function = QueuedFunction(compose(first, flat_stream_buffer)) + return set_up_processing_server(queued_function) @pytest.fixture diff --git a/test/unit_tests/server/stream_buffer_test.py b/test/unit_tests/server/stream_buffer_test.py index fb43386..ce6efa8 100644 --- a/test/unit_tests/server/stream_buffer_test.py +++ b/test/unit_tests/server/stream_buffer_test.py @@ -3,14 +3,15 @@ from funcy import repeatedly, takewhile, notnone, lmap, lmapcat from pyinfra.server.bufferizer.lazy_bufferizer import FlatStreamBuffer, StreamBuffer from pyinfra.server.dispatcher.dispatcher import Nothing -from pyinfra.server.server import LazyProcessor +from pyinfra.server.server import QueuedFunction from pyinfra.utils.func import lift, foreach @pytest.fixture -def func(): +def func(one_to_many): def fn(x): - return x ** 2 + y = x ** 2 + return y if not one_to_many else (y, y) return fn @@ -38,9 +39,9 @@ def test_flat_stream_buffer(func, inputs, outputs, buffer_size): assert list(flat_stream_buffer([])) == [] -def test_lazy_processor(func, inputs, outputs): - stream_buffer = FlatStreamBuffer(lift(func)) - lazy_processor = LazyProcessor(stream_buffer) +def test_queued_function(func, inputs, outputs): + + lazy_processor = QueuedFunction(lift(func)) foreach(lazy_processor.push, inputs)