From bfdce62ccfc266e04e10abe2d3884e0990e0d06d Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Thu, 12 May 2022 19:46:29 +0200 Subject: [PATCH] refactoring; renaming --- pyinfra/server/bufferizer/lazy_bufferizer.py | 1 + pyinfra/server/server.py | 54 ++++++++++---------- test/fixtures/server.py | 13 +++-- 3 files changed, 36 insertions(+), 32 deletions(-) diff --git a/pyinfra/server/bufferizer/lazy_bufferizer.py b/pyinfra/server/bufferizer/lazy_bufferizer.py index a21d85f..9396b80 100644 --- a/pyinfra/server/bufferizer/lazy_bufferizer.py +++ b/pyinfra/server/bufferizer/lazy_bufferizer.py @@ -32,6 +32,7 @@ def make_buffered_consumer(fn, buffer_size=3): """Produces a function, which applied to n inputs, produces m >= n outputs, when called m times. If m > n, then the function needs to be called m - n times with `Nothing` to return the remaining values. """ + fn = StreamBuffer(fn, buffer_size=buffer_size) def consume(items): diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index c8ac436..83deb3f 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -4,7 +4,7 @@ from itertools import chain from flask import Flask, jsonify, request from funcy import compose, identity -from pyinfra.server.bufferizer.lazy_bufferizer import Queue, stream_queue, make_buffered_consumer +from pyinfra.server.bufferizer.lazy_bufferizer import Queue, stream_queue from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.utils import unpack, normalize, pack from pyinfra.utils.func import starlift, lift @@ -12,16 +12,6 @@ from pyinfra.utils.func import starlift, lift logger = logging.getLogger() -class ServerPipeline: - def __init__(self): - """ - - dequeue - - - - """ - pass - - def make_streamable(fn, batched): return compose(normalize, (identity if batched else starlift)(fn)) @@ -30,28 +20,38 @@ def unpack_fn_pack(fn): return compose(starlift(pack), fn, lift(unpack)) -def make_streamable_and_wrap_in_packing_logic(fn, batched, batch_size): +def make_streamable_and_wrap_in_packing_logic(fn, batched): fn = make_streamable(fn, batched) fn = unpack_fn_pack(fn) return fn -class RestStreamProcessor: - """Wraps an on-demand-processor. Combine with a webserver that provides the endpoints 'submit' and 'pickup'.""" - - def __init__(self, fn, submit_suffix="submit", pickup_suffix="pickup"): - self.submit_suffix = submit_suffix - self.pickup_suffix = pickup_suffix +class LazyProcessor: + def __init__(self, buffered_consumer): self.queue = Queue() - self.buffered_consumer = make_buffered_consumer(fn) + self.buffered_consumer = buffered_consumer - def submit(self, request): - self.queue.append(request.json) - return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix)) + def push(self, item): + self.queue.append(item) - def pickup(self): + def pop(self): items = chain(stream_queue(self.queue), [Nothing]) result = self.buffered_consumer(items) + return result + + +class LazyRestProcessor: + def __init__(self, lazy_processor, submit_suffix="submit", pickup_suffix="pickup"): + self.submit_suffix = submit_suffix + self.pickup_suffix = pickup_suffix + self.lazy_processor = lazy_processor + + def push(self, request): + self.lazy_processor.push(request.json) + return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix)) + + def pop(self): + result = self.lazy_processor.pop() if not valid(result): logger.error(f"Received invalid result: {result}") @@ -72,9 +72,9 @@ def valid(result): return isinstance(result, dict) or result is Nothing -def set_up_processing_server(process_fn): +def set_up_processing_server(package_processor): app = Flask(__name__) - stream = RestStreamProcessor(process_fn, submit_suffix="submit", pickup_suffix="pickup") + stream = LazyRestProcessor(package_processor, submit_suffix="submit", pickup_suffix="pickup") @app.route("/ready", methods=["GET"]) def ready(): @@ -84,10 +84,10 @@ def set_up_processing_server(process_fn): @app.route("/submit", methods=["POST", "PATCH"]) def submit(): - return stream.submit(request) + return stream.push(request) @app.route("/pickup", methods=["GET"]) def pickup(): - return stream.pickup() + return stream.pop() return app diff --git a/test/fixtures/server.py b/test/fixtures/server.py index 494ddc0..4f0d3cb 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -9,7 +9,8 @@ from PIL import Image from funcy import retry from waitress import serve -from pyinfra.server.server import set_up_processing_server, make_streamable_and_wrap_in_packing_logic +from pyinfra.server.bufferizer.lazy_bufferizer import make_buffered_consumer +from pyinfra.server.server import set_up_processing_server, make_streamable_and_wrap_in_packing_logic, LazyProcessor from pyinfra.utils.func import starlift from test.utils.image import image_to_bytes @@ -36,13 +37,15 @@ def url(host, port): @pytest.fixture -def server(processor_fn): - return set_up_processing_server(processor_fn) +def server(processor_fn, buffer_size): + buffered_consumer = make_buffered_consumer(processor_fn, buffer_size=buffer_size) + lazy_processor = LazyProcessor(buffered_consumer) + return set_up_processing_server(lazy_processor) @pytest.fixture -def processor_fn(operation_conditionally_batched, buffer_size, batched): - return make_streamable_and_wrap_in_packing_logic(operation_conditionally_batched, batched, buffer_size) +def processor_fn(operation_conditionally_batched, batched): + return make_streamable_and_wrap_in_packing_logic(operation_conditionally_batched, batched) @pytest.fixture