refactoring: made queued buffer coupler into a function

This commit is contained in:
Matthias Bisping 2022-05-12 17:19:52 +02:00
parent e3793e5c7c
commit 9b5fc4ff77
2 changed files with 57 additions and 25 deletions

View File

@ -87,37 +87,66 @@ class StreamBuffer:
return first(chain(self.result_stream, [Nothing]))
class QueueBufferCoupler:
def __init__(self, queue: Queue, buffer: StreamBuffer):
self.queue = queue
self.buffer = buffer
# class QueueBufferCoupler:
# def __init__(self, queue: Queue, buffer: StreamBuffer):
# self.queue = queue
# self.buffer = buffer
#
# def __call__(self):
# return self.compute_next()
#
# def compute_next(self):
# return next(self.compute())
#
# def compute(self):
# yield from self.__consume_queue()
# yield from self.__flush_buffer()
# yield self.__signal_termination()
#
# def __consume_queue(self):
# queue_items = stream_queue(self.queue)
# yield from chain.from_iterable(map(self.__add_to_buffer_and_consume, queue_items))
#
# def __flush_buffer(self):
# yield from self.__add_to_buffer_and_consume(Nothing)
#
# def __add_to_buffer_and_consume(self, queue_item):
# self.buffer.push(queue_item)
# yield from takewhile(is_not_nothing, repeatedly(self.buffer.pop))
#
# @staticmethod
# def __signal_termination():
# return Nothing
def __call__(self):
return self.compute_next()
def compute_next(self):
return next(self.compute())
def make_queue_consumer(buffer: StreamBuffer):
def call(queue):
return compute_next(queue)
def compute(self):
yield from self.__consume_queue()
yield from self.__flush_buffer()
yield self.__signal_termination()
def compute_next(queue):
return next(compute(queue))
def __consume_queue(self):
queue_items = stream_queue(self.queue)
yield from chain.from_iterable(map(self.__add_to_buffer_and_consume, queue_items))
def compute(queue):
yield from consume_queue(queue)
yield from flush_buffer()
yield signal_termination()
def __flush_buffer(self):
yield from self.__add_to_buffer_and_consume(Nothing)
def consume_queue(queue):
queue_items = stream_queue(queue)
yield from chain.from_iterable(map(add_to_buffer_and_consume, queue_items))
def __add_to_buffer_and_consume(self, queue_item):
self.buffer.push(queue_item)
yield from takewhile(is_not_nothing, repeatedly(self.buffer.pop))
def flush_buffer():
yield from add_to_buffer_and_consume(Nothing)
@staticmethod
def __signal_termination():
def signal_termination():
return Nothing
def add_to_buffer_and_consume(queue_item):
buffer.push(queue_item)
yield from takewhile(is_not_nothing, repeatedly(buffer.pop))
return call
def stream_queue(queue):
yield from takewhile(is_not_nothing, repeatedly(queue.popleft))

View File

@ -4,7 +4,8 @@ from itertools import chain
from flask import Flask, jsonify, request
from funcy import compose, identity
from pyinfra.server.bufferizer.lazy_bufferizer import Queue, StreamBuffer, QueueBufferCoupler, stream_queue
from pyinfra.server.bufferizer.lazy_bufferizer import Queue, StreamBuffer, QueueBufferCoupler, stream_queue, \
make_queue_consumer
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.utils import unpack, normalize, pack
from pyinfra.utils.func import starlift, lift
@ -43,14 +44,16 @@ class RestStreamProcessor:
self.submit_suffix = submit_suffix
self.pickup_suffix = pickup_suffix
self.queue = Queue()
self.processor = QueueBufferCoupler(self.queue, StreamBuffer(fn))
# self.processor = QueueBufferCoupler(self.queue, StreamBuffer(fn))
# self.output_stream = chain.from_iterable(map(StreamBuffer(fn), stream_queue(self.queue)))
self.fn = make_queue_consumer(StreamBuffer(fn))
def submit(self, request):
self.queue.append(request.json)
return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix))
def pickup(self):
result = self.processor()
result = self.fn(self.queue)
if not valid(result):
logger.error(f"Received invalid result: {result}")