From 1a04dfb42652056a2d23b88552a65e59ca66552d Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Thu, 12 May 2022 14:42:42 +0200 Subject: [PATCH] renaming --- pyinfra/server/bufferizer/lazy_bufferizer.py | 14 ++++++++------ pyinfra/server/server.py | 7 +++++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/pyinfra/server/bufferizer/lazy_bufferizer.py b/pyinfra/server/bufferizer/lazy_bufferizer.py index 629c6dc..83c2de1 100644 --- a/pyinfra/server/bufferizer/lazy_bufferizer.py +++ b/pyinfra/server/bufferizer/lazy_bufferizer.py @@ -63,12 +63,14 @@ class Queue: return bool(self.__queue) -class InputOutputBuffer: +class StreamBuffer: + """Puts a function `fn` between an input and an output buffer. `fn` Needs to be a mappable generator.""" + def __init__(self, fn, buffer_size=3): self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[]) self.result_stream = chain([]) - def compute_next(self, item): + def push(self, item): try: self.result_stream = chain(self.result_stream, self.compute(item)) except TypeError as err: @@ -77,12 +79,12 @@ class InputOutputBuffer: def compute(self, item): yield from self.fn(item) - def get_next(self): + def pop(self): return first(chain(self.result_stream, [Nothing])) class QueueBufferCoupler: - def __init__(self, queue: Queue, buffer: InputOutputBuffer): + def __init__(self, queue: Queue, buffer: StreamBuffer): self.queue = queue self.buffer = buffer self.result_stream = self.compute() @@ -106,8 +108,8 @@ class QueueBufferCoupler: yield from self.__add_to_buffer_and_consume(Nothing) def __add_to_buffer_and_consume(self, queue_item): - self.buffer.compute_next(queue_item) - yield from takewhile(is_not_nothing, repeatedly(self.buffer.get_next)) + self.buffer.push(queue_item) + yield from takewhile(is_not_nothing, repeatedly(self.buffer.pop)) @staticmethod def __signal_termination(): diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index c46a83d..b0e1196 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -4,7 +4,7 @@ from flask import Flask, jsonify, request from funcy import rcompose from pyinfra.server.bufferizer.buffer import bufferize -from pyinfra.server.bufferizer.lazy_bufferizer import Queue, InputOutputBuffer, QueueBufferCoupler +from pyinfra.server.bufferizer.lazy_bufferizer import Queue, StreamBuffer, QueueBufferCoupler from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.utils import unpack, normalize, pack from pyinfra.utils.func import starlift, lift @@ -36,6 +36,9 @@ class StreamProcessor: return self.pipe(packages) + + + def make_streamable(operation, batched, batch_size): operation = operation if batched else starlift(operation) operation = StreamProcessor(operation) @@ -49,7 +52,7 @@ class RestStreamProcessor: self.submit_suffix = submit_suffix self.pickup_suffix = pickup_suffix self.queue = Queue() - self.processor = QueueBufferCoupler(self.queue, InputOutputBuffer(fn)) + self.processor = QueueBufferCoupler(self.queue, StreamBuffer(fn)) def submit(self, request): self.queue.append(request.json)