diff --git a/pyinfra/server/bufferizer/lazy_bufferizer.py b/pyinfra/server/bufferizer/lazy_bufferizer.py index 06727a0..25636d0 100644 --- a/pyinfra/server/bufferizer/lazy_bufferizer.py +++ b/pyinfra/server/bufferizer/lazy_bufferizer.py @@ -87,37 +87,66 @@ class StreamBuffer: return first(chain(self.result_stream, [Nothing])) -class QueueBufferCoupler: - def __init__(self, queue: Queue, buffer: StreamBuffer): - self.queue = queue - self.buffer = buffer +# class QueueBufferCoupler: +# def __init__(self, queue: Queue, buffer: StreamBuffer): +# self.queue = queue +# self.buffer = buffer +# +# def __call__(self): +# return self.compute_next() +# +# def compute_next(self): +# return next(self.compute()) +# +# def compute(self): +# yield from self.__consume_queue() +# yield from self.__flush_buffer() +# yield self.__signal_termination() +# +# def __consume_queue(self): +# queue_items = stream_queue(self.queue) +# yield from chain.from_iterable(map(self.__add_to_buffer_and_consume, queue_items)) +# +# def __flush_buffer(self): +# yield from self.__add_to_buffer_and_consume(Nothing) +# +# def __add_to_buffer_and_consume(self, queue_item): +# self.buffer.push(queue_item) +# yield from takewhile(is_not_nothing, repeatedly(self.buffer.pop)) +# +# @staticmethod +# def __signal_termination(): +# return Nothing - def __call__(self): - return self.compute_next() - def compute_next(self): - return next(self.compute()) +def make_queue_consumer(buffer: StreamBuffer): + def call(queue): + return compute_next(queue) - def compute(self): - yield from self.__consume_queue() - yield from self.__flush_buffer() - yield self.__signal_termination() + def compute_next(queue): + return next(compute(queue)) - def __consume_queue(self): - queue_items = stream_queue(self.queue) - yield from chain.from_iterable(map(self.__add_to_buffer_and_consume, queue_items)) + def compute(queue): + yield from consume_queue(queue) + yield from flush_buffer() + yield signal_termination() - def __flush_buffer(self): - yield from self.__add_to_buffer_and_consume(Nothing) + def consume_queue(queue): + queue_items = stream_queue(queue) + yield from chain.from_iterable(map(add_to_buffer_and_consume, queue_items)) - def __add_to_buffer_and_consume(self, queue_item): - self.buffer.push(queue_item) - yield from takewhile(is_not_nothing, repeatedly(self.buffer.pop)) + def flush_buffer(): + yield from add_to_buffer_and_consume(Nothing) - @staticmethod - def __signal_termination(): + def signal_termination(): return Nothing + def add_to_buffer_and_consume(queue_item): + buffer.push(queue_item) + yield from takewhile(is_not_nothing, repeatedly(buffer.pop)) + + return call + def stream_queue(queue): yield from takewhile(is_not_nothing, repeatedly(queue.popleft)) diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index 9e77176..f561113 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -4,7 +4,8 @@ from itertools import chain from flask import Flask, jsonify, request from funcy import compose, identity -from pyinfra.server.bufferizer.lazy_bufferizer import Queue, StreamBuffer, QueueBufferCoupler, stream_queue +from pyinfra.server.bufferizer.lazy_bufferizer import Queue, StreamBuffer, QueueBufferCoupler, stream_queue, \ + make_queue_consumer from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.utils import unpack, normalize, pack from pyinfra.utils.func import starlift, lift @@ -43,14 +44,16 @@ class RestStreamProcessor: self.submit_suffix = submit_suffix self.pickup_suffix = pickup_suffix self.queue = Queue() - self.processor = QueueBufferCoupler(self.queue, StreamBuffer(fn)) + # self.processor = QueueBufferCoupler(self.queue, StreamBuffer(fn)) + # self.output_stream = chain.from_iterable(map(StreamBuffer(fn), stream_queue(self.queue))) + self.fn = make_queue_consumer(StreamBuffer(fn)) def submit(self, request): self.queue.append(request.json) return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix)) def pickup(self): - result = self.processor() + result = self.fn(self.queue) if not valid(result): logger.error(f"Received invalid result: {result}")