diff --git a/pyinfra/server/bufferizer/lazy_bufferizer.py b/pyinfra/server/bufferizer/lazy_bufferizer.py index 9396b80..4c3feab 100644 --- a/pyinfra/server/bufferizer/lazy_bufferizer.py +++ b/pyinfra/server/bufferizer/lazy_bufferizer.py @@ -2,10 +2,11 @@ import logging from collections import deque from itertools import chain, takewhile -from funcy import first, repeatedly +from funcy import first, repeatedly, compose, flatten from pyinfra.server.bufferizer.buffer import bufferize from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing +from pyinfra.utils.func import lift logger = logging.getLogger(__name__) @@ -28,17 +29,17 @@ class Queue: return bool(self.__queue) -def make_buffered_consumer(fn, buffer_size=3): +class FlatStreamBuffer: """Produces a function, which applied to n inputs, produces m >= n outputs, when called m times. If m > n, then the function needs to be called m - n times with `Nothing` to return the remaining values. """ - fn = StreamBuffer(fn, buffer_size=buffer_size) + def __init__(self, fn, buffer_size=3): + """Function `fn` Needs to be a mappable generator.""" + self.fn = lift(StreamBuffer(fn, buffer_size=buffer_size)) - def consume(items): - return first(chain.from_iterable(map(fn, items))) or Nothing - - return consume + def __call__(self, items): + yield from compose(flatten, self.fn)(items) class StreamBuffer: diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index 83deb3f..420f688 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -2,9 +2,9 @@ import logging from itertools import chain from flask import Flask, jsonify, request -from funcy import compose, identity +from funcy import compose, identity, first -from pyinfra.server.bufferizer.lazy_bufferizer import Queue, stream_queue +from pyinfra.server.bufferizer.lazy_bufferizer import Queue, stream_queue, FlatStreamBuffer from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.utils import unpack, normalize, pack from pyinfra.utils.func import starlift, lift @@ -27,21 +27,21 @@ def make_streamable_and_wrap_in_packing_logic(fn, batched): class LazyProcessor: - def __init__(self, buffered_consumer): + def __init__(self, stream_buffer: FlatStreamBuffer): self.queue = Queue() - self.buffered_consumer = buffered_consumer + self.stream_buffer = stream_buffer def push(self, item): self.queue.append(item) def pop(self): items = chain(stream_queue(self.queue), [Nothing]) - result = self.buffered_consumer(items) + result = first(self.stream_buffer(items)) or Nothing return result class LazyRestProcessor: - def __init__(self, lazy_processor, submit_suffix="submit", pickup_suffix="pickup"): + def __init__(self, lazy_processor: LazyProcessor, submit_suffix="submit", pickup_suffix="pickup"): self.submit_suffix = submit_suffix self.pickup_suffix = pickup_suffix self.lazy_processor = lazy_processor diff --git a/test/fixtures/server.py b/test/fixtures/server.py index 4f0d3cb..2307d6d 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -9,7 +9,7 @@ from PIL import Image from funcy import retry from waitress import serve -from pyinfra.server.bufferizer.lazy_bufferizer import make_buffered_consumer +from pyinfra.server.bufferizer.lazy_bufferizer import FlatStreamBuffer from pyinfra.server.server import set_up_processing_server, make_streamable_and_wrap_in_packing_logic, LazyProcessor from pyinfra.utils.func import starlift from test.utils.image import image_to_bytes @@ -38,7 +38,7 @@ def url(host, port): @pytest.fixture def server(processor_fn, buffer_size): - buffered_consumer = make_buffered_consumer(processor_fn, buffer_size=buffer_size) + buffered_consumer = FlatStreamBuffer(processor_fn, buffer_size=buffer_size) lazy_processor = LazyProcessor(buffered_consumer) return set_up_processing_server(lazy_processor)