diff --git a/pyinfra/server/bufferizer/buffer.py b/pyinfra/server/bufferizer/buffer.py index 7f6e1d9..3fe212b 100644 --- a/pyinfra/server/bufferizer/buffer.py +++ b/pyinfra/server/bufferizer/buffer.py @@ -14,9 +14,6 @@ def bufferize(fn, buffer_size=3, persist_fn=identity, null_value=None): if item is not Nothing: buffer.append(persist_fn(item)) - # print("ITEM", item) - # print(buffer) - response_payload = fn(repeatedly(buffer.popleft, n_items_to_pop(buffer, item is Nothing))) return response_payload or null_value diff --git a/pyinfra/server/bufferizer/lazy_bufferizer.py b/pyinfra/server/bufferizer/lazy_bufferizer.py index 75a6897..a21d85f 100644 --- a/pyinfra/server/bufferizer/lazy_bufferizer.py +++ b/pyinfra/server/bufferizer/lazy_bufferizer.py @@ -2,7 +2,7 @@ import logging from collections import deque from itertools import chain, takewhile -from funcy import first, repeatedly, compose +from funcy import first, repeatedly from pyinfra.server.bufferizer.buffer import bufferize from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing @@ -10,43 +10,8 @@ from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing logger = logging.getLogger(__name__) -# class OutputBuffer: -# def __init__(self, fn, queue_streamer): -# self.fn = fn -# self.queue_streamer = queue_streamer -# self.result_stream = chain([]) -# -# def __call__(self): -# return self.compute_next() -# -# def compute_next(self) -> Union[Nothing, Any]: -# try: -# return next(self.result_stream) -# except StopIteration: -# self.result_stream = chain(self.result_stream, self.compute()) -# return self.compute_next() # Produces stream of `Nothing` if execution queue is empty -# except TypeError as err: -# raise TypeError("Function failed with type error. Is it mappable?") from err -# -# def compute(self): -# yield from compose(flatten, lift(self.fn))(self.queue_streamer()) -# yield Nothing - -# class OutputBuffer: -# def __init__(self, fn): -# self.fn = fn -# self.result_queue = Queue() -# -# def compute(self, *args, **kwargs): -# for r in self.fn(*args, **kwargs): -# self.result_queue.append(r) -# -# def get_next(self): -# print(self.result_queue) -# return self.result_queue.popleft() -# -# def __bool__(self): -# return bool(self.result_queue) +def stream_queue(queue): + yield from takewhile(is_not_nothing, repeatedly(queue.popleft)) class Queue: @@ -63,6 +28,18 @@ class Queue: return bool(self.__queue) +def make_buffered_consumer(fn, buffer_size=3): + """Produces a function, which applied to n inputs, produces m >= n outputs, when called m times. If m > n, then the + function needs to be called m - n times with `Nothing` to return the remaining values. + """ + fn = StreamBuffer(fn, buffer_size=buffer_size) + + def consume(items): + return first(chain.from_iterable(map(fn, items))) or Nothing + + return consume + + class StreamBuffer: """Puts a function `fn` between an input and an output buffer. `fn` Needs to be a mappable generator.""" @@ -85,58 +62,3 @@ class StreamBuffer: def pop(self): return first(chain(self.result_stream, [Nothing])) - - -# class QueueBufferCoupler: -# def __init__(self, queue: Queue, buffer: StreamBuffer): -# self.queue = queue -# self.buffer = buffer -# -# def __call__(self): -# return self.compute_next() -# -# def compute_next(self): -# return next(self.compute()) -# -# def compute(self): -# yield from self.__consume_queue() -# yield from self.__flush_buffer() -# yield self.__signal_termination() -# -# def __consume_queue(self): -# queue_items = stream_queue(self.queue) -# yield from chain.from_iterable(map(self.__add_to_buffer_and_consume, queue_items)) -# -# def __flush_buffer(self): -# yield from self.__add_to_buffer_and_consume(Nothing) -# -# def __add_to_buffer_and_consume(self, queue_item): -# self.buffer.push(queue_item) -# yield from takewhile(is_not_nothing, repeatedly(self.buffer.pop)) -# -# @staticmethod -# def __signal_termination(): -# return Nothing - - -def make_consumer(buffer: StreamBuffer): - - def compute(items): - yield from consume(items) - yield from flush_buffer() - yield signal_termination() - - def consume(items): - yield from chain.from_iterable(map(buffer, items)) - - def flush_buffer(): - yield from buffer(Nothing) - - def signal_termination(): - return Nothing - - return compose(first, compute) - - -def stream_queue(queue): - yield from takewhile(is_not_nothing, repeatedly(queue.popleft)) diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index af718d3..c8ac436 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -1,9 +1,10 @@ import logging +from itertools import chain from flask import Flask, jsonify, request from funcy import compose, identity -from pyinfra.server.bufferizer.lazy_bufferizer import Queue, StreamBuffer, stream_queue, make_consumer +from pyinfra.server.bufferizer.lazy_bufferizer import Queue, stream_queue, make_buffered_consumer from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.utils import unpack, normalize, pack from pyinfra.utils.func import starlift, lift @@ -42,16 +43,15 @@ class RestStreamProcessor: self.submit_suffix = submit_suffix self.pickup_suffix = pickup_suffix self.queue = Queue() - # self.processor = QueueBufferCoupler(self.queue, StreamBuffer(fn)) - # self.output_stream = chain.from_iterable(map(StreamBuffer(fn), stream_queue(self.queue))) - self.fn = make_consumer(StreamBuffer(fn)) + self.buffered_consumer = make_buffered_consumer(fn) def submit(self, request): self.queue.append(request.json) return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix)) def pickup(self): - result = self.fn(stream_queue(self.queue)) + items = chain(stream_queue(self.queue), [Nothing]) + result = self.buffered_consumer(items) if not valid(result): logger.error(f"Received invalid result: {result}")