This commit is contained in:
Matthias Bisping 2022-05-12 14:42:42 +02:00
parent e151d2005b
commit 1a04dfb426
2 changed files with 13 additions and 8 deletions

View File

@ -63,12 +63,14 @@ class Queue:
return bool(self.__queue)
class InputOutputBuffer:
class StreamBuffer:
"""Puts a function `fn` between an input and an output buffer. `fn` Needs to be a mappable generator."""
def __init__(self, fn, buffer_size=3):
self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[])
self.result_stream = chain([])
def compute_next(self, item):
def push(self, item):
try:
self.result_stream = chain(self.result_stream, self.compute(item))
except TypeError as err:
@ -77,12 +79,12 @@ class InputOutputBuffer:
def compute(self, item):
yield from self.fn(item)
def get_next(self):
def pop(self):
return first(chain(self.result_stream, [Nothing]))
class QueueBufferCoupler:
def __init__(self, queue: Queue, buffer: InputOutputBuffer):
def __init__(self, queue: Queue, buffer: StreamBuffer):
self.queue = queue
self.buffer = buffer
self.result_stream = self.compute()
@ -106,8 +108,8 @@ class QueueBufferCoupler:
yield from self.__add_to_buffer_and_consume(Nothing)
def __add_to_buffer_and_consume(self, queue_item):
self.buffer.compute_next(queue_item)
yield from takewhile(is_not_nothing, repeatedly(self.buffer.get_next))
self.buffer.push(queue_item)
yield from takewhile(is_not_nothing, repeatedly(self.buffer.pop))
@staticmethod
def __signal_termination():

View File

@ -4,7 +4,7 @@ from flask import Flask, jsonify, request
from funcy import rcompose
from pyinfra.server.bufferizer.buffer import bufferize
from pyinfra.server.bufferizer.lazy_bufferizer import Queue, InputOutputBuffer, QueueBufferCoupler
from pyinfra.server.bufferizer.lazy_bufferizer import Queue, StreamBuffer, QueueBufferCoupler
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.utils import unpack, normalize, pack
from pyinfra.utils.func import starlift, lift
@ -36,6 +36,9 @@ class StreamProcessor:
return self.pipe(packages)
def make_streamable(operation, batched, batch_size):
operation = operation if batched else starlift(operation)
operation = StreamProcessor(operation)
@ -49,7 +52,7 @@ class RestStreamProcessor:
self.submit_suffix = submit_suffix
self.pickup_suffix = pickup_suffix
self.queue = Queue()
self.processor = QueueBufferCoupler(self.queue, InputOutputBuffer(fn))
self.processor = QueueBufferCoupler(self.queue, StreamBuffer(fn))
def submit(self, request):
self.queue.append(request.json)