diff --git a/pyinfra/server/processor/__init__.py b/pyinfra/server/processor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/server/processor/processor.py b/pyinfra/server/processor/processor.py new file mode 100644 index 0000000..482846d --- /dev/null +++ b/pyinfra/server/processor/processor.py @@ -0,0 +1,21 @@ +from itertools import chain +from typing import Union, Any + +from pyinfra.server.dispatcher.dispatcher import Nothing + + +class OnDemandProcessor: + def __init__(self, processor_fn): + self.execution_queue = chain([]) + self.processor_fn = processor_fn + + def submit(self, package, final) -> None: + """Submit computation request to execution queue; computation is performed on demand.""" + self.execution_queue = chain(self.execution_queue, self.processor_fn(package, final=final)) + + def compute_next(self) -> Union[Nothing, Any]: + """Processes the next request.""" + try: + return next(self.execution_queue) + except StopIteration: + return Nothing diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index b860964..b0b8df0 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -1,12 +1,10 @@ -import abc import logging -from itertools import chain -from typing import Union, Any from flask import Flask, jsonify, request from funcy import compose, flatten from pyinfra.server.dispatcher.dispatcher import Nothing +from pyinfra.server.processor.processor import OnDemandProcessor from pyinfra.server.utils import unpack_batchop_pack, unpack_op_pack from pyinfra.utils.buffer import bufferize from pyinfra.utils.func import lift @@ -14,7 +12,7 @@ from pyinfra.utils.func import lift logger = logging.getLogger() -def make_processor_fn(operation, buffer_size, batched): +def make_streamable(operation, buffer_size, batched): wrapper = unpack_batchop_pack if batched else compose(lift, unpack_op_pack) operation = wrapper(operation) @@ -24,43 +22,26 @@ def make_processor_fn(operation, buffer_size, batched): return operation -class OnDemandProcessor: +class RestOndDemandProcessor(OnDemandProcessor): def __init__(self, processor_fn): - self.execution_queue = chain([]) - self.processor_fn = processor_fn - - def submit(self, package, final) -> None: - """Submit computation request to execution queue; computation is performed on demand.""" - self.execution_queue = chain(self.execution_queue, self.processor_fn(package, final=final)) - - def compute_next(self) -> Union[Nothing, Any]: - """Processes the next request.""" - try: - return next(self.execution_queue) - except StopIteration: - return Nothing - - -class RestOndDemandProcessorAdapter(OnDemandProcessor): - def __init__(self, processor_fn): - super(RestOndDemandProcessorAdapter, self).__init__(processor_fn=processor_fn) + super(RestOndDemandProcessor, self).__init__(processor_fn=processor_fn) def submit(self, request, **kwargs) -> None: - super(RestOndDemandProcessorAdapter, self).submit(request.json, final=request.method == "POST") + super(RestOndDemandProcessor, self).submit(request.json, final=request.method == "POST") -class RestStreamer: +class RestStreamProcessor(RestOndDemandProcessor): """Wraps an on-demand-processor. Combine with a webserver that provides the endpoints 'submit' and 'pickup'.""" - def __init__(self, processor): - self.processor = processor + def __init__(self, processor_fn): + super(RestStreamProcessor, self).__init__(processor_fn=processor_fn) - def submit(self, request): - self.processor.submit(request) + def submit(self, request, **kwargs): + super(RestStreamProcessor, self).submit(request) return jsonify(f"{request.base_url.replace('/submit', '')}/pickup") def pickup(self): - result = self.processor.compute_next() + result = self.compute_next() if not valid(result): logger.error(f"Received invalid result: {result}") @@ -83,7 +64,7 @@ def valid(result): def set_up_processing_server(process_fn): app = Flask(__name__) - streamer = RestStreamer(RestOndDemandProcessorAdapter(process_fn)) + stream = RestStreamProcessor(process_fn) @app.route("/ready", methods=["GET"]) def ready(): @@ -93,10 +74,10 @@ def set_up_processing_server(process_fn): @app.route("/submit", methods=["POST", "PATCH"]) def submit(): - return streamer.submit(request) + return stream.submit(request) @app.route("/pickup", methods=["GET"]) def pickup(): - return streamer.pickup() + return stream.pickup() return app diff --git a/test/fixtures/server.py b/test/fixtures/server.py index 3ebaedb..b212846 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -9,7 +9,7 @@ from PIL import Image from funcy import retry from waitress import serve -from pyinfra.server.server import set_up_processing_server, make_processor_fn +from pyinfra.server.server import set_up_processing_server, make_streamable from pyinfra.utils.func import starlift from test.utils.image import image_to_bytes @@ -42,7 +42,7 @@ def server(processor_fn): @pytest.fixture def processor_fn(operation_conditionally_batched, buffer_size, batched): - return make_processor_fn(operation_conditionally_batched, buffer_size, batched) + return make_streamable(operation_conditionally_batched, buffer_size, batched) @pytest.fixture