refactoring: introduced flat stream buffer class

This commit is contained in:
Matthias Bisping 2022-05-13 12:13:59 +02:00
parent bfdce62ccf
commit c09e5df23e
3 changed files with 16 additions and 15 deletions

View File

@ -2,10 +2,11 @@ import logging
from collections import deque
from itertools import chain, takewhile
from funcy import first, repeatedly
from funcy import first, repeatedly, compose, flatten
from pyinfra.server.bufferizer.buffer import bufferize
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
from pyinfra.utils.func import lift
logger = logging.getLogger(__name__)
@ -28,17 +29,17 @@ class Queue:
return bool(self.__queue)
def make_buffered_consumer(fn, buffer_size=3):
class FlatStreamBuffer:
"""Produces a function, which applied to n inputs, produces m >= n outputs, when called m times. If m > n, then the
function needs to be called m - n times with `Nothing` to return the remaining values.
"""
fn = StreamBuffer(fn, buffer_size=buffer_size)
def __init__(self, fn, buffer_size=3):
"""Function `fn` Needs to be a mappable generator."""
self.fn = lift(StreamBuffer(fn, buffer_size=buffer_size))
def consume(items):
return first(chain.from_iterable(map(fn, items))) or Nothing
return consume
def __call__(self, items):
yield from compose(flatten, self.fn)(items)
class StreamBuffer:

View File

@ -2,9 +2,9 @@ import logging
from itertools import chain
from flask import Flask, jsonify, request
from funcy import compose, identity
from funcy import compose, identity, first
from pyinfra.server.bufferizer.lazy_bufferizer import Queue, stream_queue
from pyinfra.server.bufferizer.lazy_bufferizer import Queue, stream_queue, FlatStreamBuffer
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.utils import unpack, normalize, pack
from pyinfra.utils.func import starlift, lift
@ -27,21 +27,21 @@ def make_streamable_and_wrap_in_packing_logic(fn, batched):
class LazyProcessor:
def __init__(self, buffered_consumer):
def __init__(self, stream_buffer: FlatStreamBuffer):
self.queue = Queue()
self.buffered_consumer = buffered_consumer
self.stream_buffer = stream_buffer
def push(self, item):
self.queue.append(item)
def pop(self):
items = chain(stream_queue(self.queue), [Nothing])
result = self.buffered_consumer(items)
result = first(self.stream_buffer(items)) or Nothing
return result
class LazyRestProcessor:
def __init__(self, lazy_processor, submit_suffix="submit", pickup_suffix="pickup"):
def __init__(self, lazy_processor: LazyProcessor, submit_suffix="submit", pickup_suffix="pickup"):
self.submit_suffix = submit_suffix
self.pickup_suffix = pickup_suffix
self.lazy_processor = lazy_processor

View File

@ -9,7 +9,7 @@ from PIL import Image
from funcy import retry
from waitress import serve
from pyinfra.server.bufferizer.lazy_bufferizer import make_buffered_consumer
from pyinfra.server.bufferizer.lazy_bufferizer import FlatStreamBuffer
from pyinfra.server.server import set_up_processing_server, make_streamable_and_wrap_in_packing_logic, LazyProcessor
from pyinfra.utils.func import starlift
from test.utils.image import image_to_bytes
@ -38,7 +38,7 @@ def url(host, port):
@pytest.fixture
def server(processor_fn, buffer_size):
buffered_consumer = make_buffered_consumer(processor_fn, buffer_size=buffer_size)
buffered_consumer = FlatStreamBuffer(processor_fn, buffer_size=buffer_size)
lazy_processor = LazyProcessor(buffered_consumer)
return set_up_processing_server(lazy_processor)