diff --git a/pyinfra/server/bufferizer/buffer.py b/pyinfra/server/bufferizer/buffer.py index 3fe212b..7f6e1d9 100644 --- a/pyinfra/server/bufferizer/buffer.py +++ b/pyinfra/server/bufferizer/buffer.py @@ -14,6 +14,9 @@ def bufferize(fn, buffer_size=3, persist_fn=identity, null_value=None): if item is not Nothing: buffer.append(persist_fn(item)) + # print("ITEM", item) + # print(buffer) + response_payload = fn(repeatedly(buffer.popleft, n_items_to_pop(buffer, item is Nothing))) return response_payload or null_value diff --git a/pyinfra/server/bufferizer/lazy_bufferizer.py b/pyinfra/server/bufferizer/lazy_bufferizer.py index dcc8f09..2ed656e 100644 --- a/pyinfra/server/bufferizer/lazy_bufferizer.py +++ b/pyinfra/server/bufferizer/lazy_bufferizer.py @@ -1,12 +1,11 @@ import logging from collections import deque -from itertools import chain, takewhile -from typing import Union, Any +from itertools import chain -from funcy import flatten, compose, repeatedly +from funcy import first -from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing -from pyinfra.utils.func import lift +from pyinfra.server.bufferizer.buffer import bufferize +from pyinfra.server.dispatcher.dispatcher import Nothing logger = logging.getLogger(__name__) @@ -21,37 +20,98 @@ class Queue: def popleft(self): return self.__queue.popleft() if self.__queue else Nothing + def __bool__(self): + return bool(self.__queue) -class QueueStreamer: - def __init__(self, queue: Queue): - self.queue = queue - def __call__(self): - return self.stream_queue() +# class QueueStreamer: +# def __init__(self, queue: Queue): +# self.queue = queue +# +# def __call__(self): +# return self.stream_queue() +# +# def stream_queue(self): +# yield from takewhile(is_not_nothing, repeatedly(self.queue.popleft)) +# yield Nothing - def stream_queue(self): - yield from takewhile(is_not_nothing, repeatedly(self.queue.popleft)) + +# class OutputBuffer: +# def __init__(self, fn, queue_streamer): +# self.fn = fn +# self.queue_streamer = queue_streamer +# self.result_stream = chain([]) +# +# def __call__(self): +# return self.compute_next() +# +# def compute_next(self) -> Union[Nothing, Any]: +# try: +# return next(self.result_stream) +# except StopIteration: +# self.result_stream = chain(self.result_stream, self.compute()) +# return self.compute_next() # Produces stream of `Nothing` if execution queue is empty +# except TypeError as err: +# raise TypeError("Function failed with type error. Is it mappable?") from err +# +# def compute(self): +# yield from compose(flatten, lift(self.fn))(self.queue_streamer()) +# yield Nothing + +# class OutputBuffer: +# def __init__(self, fn): +# self.fn = fn +# self.result_queue = Queue() +# +# def compute(self, *args, **kwargs): +# for r in self.fn(*args, **kwargs): +# self.result_queue.append(r) +# +# def get_next(self): +# print(self.result_queue) +# return self.result_queue.popleft() +# +# def __bool__(self): +# return bool(self.result_queue) + + +class InputOutputBuffer: + def __init__(self, fn, buffer_size=3): + self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[]) + self.result_stream = chain([]) + + def compute_next(self, item): + try: + self.result_stream = chain(self.result_stream, self.compute(item)) + except TypeError as err: + raise TypeError("Function failed with type error. Is it mappable?") from err + + def compute(self, item): + yield from self.fn(item) yield Nothing + def get_next(self): + return first(chain(self.result_stream, [Nothing])) -class OutputBuffer: - def __init__(self, fn, queue_streamer): - self.fn = fn - self.queue_streamer = queue_streamer - self.result_stream = chain([]) + +class QueueBufferCoupler: + def __init__(self, queue: Queue, buffer: InputOutputBuffer): + self.queue = queue + self.buffer = buffer def __call__(self): return self.compute_next() - def compute_next(self) -> Union[Nothing, Any]: - try: - return next(self.result_stream) - except StopIteration: - self.result_stream = chain(self.result_stream, self.compute()) - return self.compute_next() # Produces stream of `Nothing` if execution queue is empty - except TypeError as err: - raise TypeError("Function failed with type error. Is it mappable?") from err + def compute_next(self): + + item = self.buffer.get_next() - def compute(self): - yield from compose(flatten, lift(self.fn))(self.queue_streamer()) - yield Nothing + if item is Nothing: + + item = self.queue.popleft() + print(">>", item) + self.buffer.compute_next(item) + return self.compute_next() + + else: + return item diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index bc7d4ab..c46a83d 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -4,7 +4,7 @@ from flask import Flask, jsonify, request from funcy import rcompose from pyinfra.server.bufferizer.buffer import bufferize -from pyinfra.server.bufferizer.lazy_bufferizer import QueueStreamer, Queue, OutputBuffer +from pyinfra.server.bufferizer.lazy_bufferizer import Queue, InputOutputBuffer, QueueBufferCoupler from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.utils import unpack, normalize, pack from pyinfra.utils.func import starlift, lift @@ -49,16 +49,14 @@ class RestStreamProcessor: self.submit_suffix = submit_suffix self.pickup_suffix = pickup_suffix self.queue = Queue() - self.queue_processor = OutputBuffer( - bufferize(fn, buffer_size=3, null_value=[]), queue_streamer=QueueStreamer(self.queue) - ) + self.processor = QueueBufferCoupler(self.queue, InputOutputBuffer(fn)) def submit(self, request): self.queue.append(request.json) return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix)) def pickup(self): - result = self.queue_processor() + result = self.processor() if not valid(result): logger.error(f"Received invalid result: {result}")