diff --git a/pyinfra/server/bufferizer/lazy_bufferizer.py b/pyinfra/server/bufferizer/lazy_bufferizer.py index 9fdb976..f460a70 100644 --- a/pyinfra/server/bufferizer/lazy_bufferizer.py +++ b/pyinfra/server/bufferizer/lazy_bufferizer.py @@ -5,30 +5,27 @@ from typing import Union, Any from funcy import flatten, compose, repeatedly -from pyinfra.server.bufferizer.buffer import bufferize from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing from pyinfra.utils.func import lift logger = logging.getLogger(__name__) -class LazyBufferizer: - def __init__(self, fn, buffer_size=3): - """Function `fn` has to return an iterable, ideally is a generator and needs to be mappable.""" - self.input_queue = deque() - self.queue_processor = QueueProcessor(bufferize(fn, buffer_size=buffer_size, null_value=[]), self.input_queue) +class Queue: + def __init__(self): + self.__queue = deque() - def submit(self, package, **kwargs) -> None: - self.input_queue.append(package) + def append(self, package) -> None: + self.__queue.append(package) - def compute_next(self): - return self.queue_processor.compute_next() + def popleft(self): + return self.__queue.popleft() if self.__queue else Nothing class QueueProcessor: - def __init__(self, fn, queue): + def __init__(self, fn, queue: Queue): self.fn = fn - self.input_queue = queue + self.queue = queue self.result_stream = chain([]) def __call__(self): @@ -43,11 +40,8 @@ class QueueProcessor: except TypeError as err: raise TypeError("Function failed with type error. Is it mappable?") from err - def consume_queue(self): - return self.input_queue.popleft() if self.input_queue else Nothing - def stream_queue(self): - yield from takewhile(is_not_nothing, repeatedly(self.consume_queue)) + yield from takewhile(is_not_nothing, repeatedly(self.queue.popleft)) yield Nothing def compute(self): diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index c41c1b6..596ab48 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -3,7 +3,8 @@ import logging from flask import Flask, jsonify, request from funcy import rcompose -from pyinfra.server.bufferizer.lazy_bufferizer import LazyBufferizer +from pyinfra.server.bufferizer.buffer import bufferize +from pyinfra.server.bufferizer.lazy_bufferizer import QueueProcessor, Queue from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.utils import unpack, normalize, pack from pyinfra.utils.func import starlift, lift @@ -11,6 +12,16 @@ from pyinfra.utils.func import starlift, lift logger = logging.getLogger() +class ServerPipeline: + def __init__(self): + """ + - dequeue + - + + """ + pass + + class StreamProcessor: def __init__(self, fn): """Function `fn` has to return an iterable and ideally is a generator.""" @@ -34,17 +45,18 @@ def make_streamable(operation, batched, batch_size): class RestStreamProcessor: """Wraps an on-demand-processor. Combine with a webserver that provides the endpoints 'submit' and 'pickup'.""" - def __init__(self, bufferizer, submit_suffix="submit", pickup_suffix="pickup"): - self.bufferizer = bufferizer + def __init__(self, fn, submit_suffix="submit", pickup_suffix="pickup"): self.submit_suffix = submit_suffix self.pickup_suffix = pickup_suffix + self.queue = Queue() + self.queue_processor = QueueProcessor(bufferize(fn, buffer_size=3, null_value=[]), queue=self.queue) - def submit(self, request, **kwargs): - self.bufferizer.submit(request.json) + def submit(self, request): + self.queue.append(request.json) return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix)) def pickup(self): - result = self.bufferizer.compute_next() + result = self.queue_processor() if not valid(result): logger.error(f"Received invalid result: {result}") @@ -67,7 +79,7 @@ def valid(result): def set_up_processing_server(process_fn): app = Flask(__name__) - stream = RestStreamProcessor(LazyBufferizer(process_fn), submit_suffix="submit", pickup_suffix="pickup") + stream = RestStreamProcessor(process_fn, submit_suffix="submit", pickup_suffix="pickup") @app.route("/ready", methods=["GET"]) def ready():