diff --git a/pyinfra/server/bufferizer/lazy_bufferizer.py b/pyinfra/server/bufferizer/lazy_bufferizer.py index f460a70..dcc8f09 100644 --- a/pyinfra/server/bufferizer/lazy_bufferizer.py +++ b/pyinfra/server/bufferizer/lazy_bufferizer.py @@ -22,10 +22,22 @@ class Queue: return self.__queue.popleft() if self.__queue else Nothing -class QueueProcessor: - def __init__(self, fn, queue: Queue): - self.fn = fn +class QueueStreamer: + def __init__(self, queue: Queue): self.queue = queue + + def __call__(self): + return self.stream_queue() + + def stream_queue(self): + yield from takewhile(is_not_nothing, repeatedly(self.queue.popleft)) + yield Nothing + + +class OutputBuffer: + def __init__(self, fn, queue_streamer): + self.fn = fn + self.queue_streamer = queue_streamer self.result_stream = chain([]) def __call__(self): @@ -40,10 +52,6 @@ class QueueProcessor: except TypeError as err: raise TypeError("Function failed with type error. Is it mappable?") from err - def stream_queue(self): - yield from takewhile(is_not_nothing, repeatedly(self.queue.popleft)) - yield Nothing - def compute(self): - yield from compose(flatten, lift(self.fn))(self.stream_queue()) + yield from compose(flatten, lift(self.fn))(self.queue_streamer()) yield Nothing diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index 596ab48..bc7d4ab 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -4,7 +4,7 @@ from flask import Flask, jsonify, request from funcy import rcompose from pyinfra.server.bufferizer.buffer import bufferize -from pyinfra.server.bufferizer.lazy_bufferizer import QueueProcessor, Queue +from pyinfra.server.bufferizer.lazy_bufferizer import QueueStreamer, Queue, OutputBuffer from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.utils import unpack, normalize, pack from pyinfra.utils.func import starlift, lift @@ -49,7 +49,9 @@ class RestStreamProcessor: self.submit_suffix = submit_suffix self.pickup_suffix = pickup_suffix self.queue = Queue() - self.queue_processor = QueueProcessor(bufferize(fn, buffer_size=3, null_value=[]), queue=self.queue) + self.queue_processor = OutputBuffer( + bufferize(fn, buffer_size=3, null_value=[]), queue_streamer=QueueStreamer(self.queue) + ) def submit(self, request): self.queue.append(request.json)