refactoring: further simplififyied queue consuming

This commit is contained in:
Matthias Bisping 2022-05-12 19:06:23 +02:00
parent 8b0c2d4e07
commit 1552cd10cc
3 changed files with 20 additions and 101 deletions

View File

@ -14,9 +14,6 @@ def bufferize(fn, buffer_size=3, persist_fn=identity, null_value=None):
if item is not Nothing:
buffer.append(persist_fn(item))
# print("ITEM", item)
# print(buffer)
response_payload = fn(repeatedly(buffer.popleft, n_items_to_pop(buffer, item is Nothing)))
return response_payload or null_value

View File

@ -2,7 +2,7 @@ import logging
from collections import deque
from itertools import chain, takewhile
from funcy import first, repeatedly, compose
from funcy import first, repeatedly
from pyinfra.server.bufferizer.buffer import bufferize
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
@ -10,43 +10,8 @@ from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
logger = logging.getLogger(__name__)
# class OutputBuffer:
# def __init__(self, fn, queue_streamer):
# self.fn = fn
# self.queue_streamer = queue_streamer
# self.result_stream = chain([])
#
# def __call__(self):
# return self.compute_next()
#
# def compute_next(self) -> Union[Nothing, Any]:
# try:
# return next(self.result_stream)
# except StopIteration:
# self.result_stream = chain(self.result_stream, self.compute())
# return self.compute_next() # Produces stream of `Nothing` if execution queue is empty
# except TypeError as err:
# raise TypeError("Function failed with type error. Is it mappable?") from err
#
# def compute(self):
# yield from compose(flatten, lift(self.fn))(self.queue_streamer())
# yield Nothing
# class OutputBuffer:
# def __init__(self, fn):
# self.fn = fn
# self.result_queue = Queue()
#
# def compute(self, *args, **kwargs):
# for r in self.fn(*args, **kwargs):
# self.result_queue.append(r)
#
# def get_next(self):
# print(self.result_queue)
# return self.result_queue.popleft()
#
# def __bool__(self):
# return bool(self.result_queue)
def stream_queue(queue):
yield from takewhile(is_not_nothing, repeatedly(queue.popleft))
class Queue:
@ -63,6 +28,18 @@ class Queue:
return bool(self.__queue)
def make_buffered_consumer(fn, buffer_size=3):
"""Produces a function, which applied to n inputs, produces m >= n outputs, when called m times. If m > n, then the
function needs to be called m - n times with `Nothing` to return the remaining values.
"""
fn = StreamBuffer(fn, buffer_size=buffer_size)
def consume(items):
return first(chain.from_iterable(map(fn, items))) or Nothing
return consume
class StreamBuffer:
"""Puts a function `fn` between an input and an output buffer. `fn` Needs to be a mappable generator."""
@ -85,58 +62,3 @@ class StreamBuffer:
def pop(self):
return first(chain(self.result_stream, [Nothing]))
# class QueueBufferCoupler:
# def __init__(self, queue: Queue, buffer: StreamBuffer):
# self.queue = queue
# self.buffer = buffer
#
# def __call__(self):
# return self.compute_next()
#
# def compute_next(self):
# return next(self.compute())
#
# def compute(self):
# yield from self.__consume_queue()
# yield from self.__flush_buffer()
# yield self.__signal_termination()
#
# def __consume_queue(self):
# queue_items = stream_queue(self.queue)
# yield from chain.from_iterable(map(self.__add_to_buffer_and_consume, queue_items))
#
# def __flush_buffer(self):
# yield from self.__add_to_buffer_and_consume(Nothing)
#
# def __add_to_buffer_and_consume(self, queue_item):
# self.buffer.push(queue_item)
# yield from takewhile(is_not_nothing, repeatedly(self.buffer.pop))
#
# @staticmethod
# def __signal_termination():
# return Nothing
def make_consumer(buffer: StreamBuffer):
def compute(items):
yield from consume(items)
yield from flush_buffer()
yield signal_termination()
def consume(items):
yield from chain.from_iterable(map(buffer, items))
def flush_buffer():
yield from buffer(Nothing)
def signal_termination():
return Nothing
return compose(first, compute)
def stream_queue(queue):
yield from takewhile(is_not_nothing, repeatedly(queue.popleft))

View File

@ -1,9 +1,10 @@
import logging
from itertools import chain
from flask import Flask, jsonify, request
from funcy import compose, identity
from pyinfra.server.bufferizer.lazy_bufferizer import Queue, StreamBuffer, stream_queue, make_consumer
from pyinfra.server.bufferizer.lazy_bufferizer import Queue, stream_queue, make_buffered_consumer
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.utils import unpack, normalize, pack
from pyinfra.utils.func import starlift, lift
@ -42,16 +43,15 @@ class RestStreamProcessor:
self.submit_suffix = submit_suffix
self.pickup_suffix = pickup_suffix
self.queue = Queue()
# self.processor = QueueBufferCoupler(self.queue, StreamBuffer(fn))
# self.output_stream = chain.from_iterable(map(StreamBuffer(fn), stream_queue(self.queue)))
self.fn = make_consumer(StreamBuffer(fn))
self.buffered_consumer = make_buffered_consumer(fn)
def submit(self, request):
self.queue.append(request.json)
return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix))
def pickup(self):
result = self.fn(stream_queue(self.queue))
items = chain(stream_queue(self.queue), [Nothing])
result = self.buffered_consumer(items)
if not valid(result):
logger.error(f"Received invalid result: {result}")