diff --git a/pyinfra/server/bufferizer/lazy_bufferizer.py b/pyinfra/server/bufferizer/lazy_bufferizer.py index 090b285..75a6897 100644 --- a/pyinfra/server/bufferizer/lazy_bufferizer.py +++ b/pyinfra/server/bufferizer/lazy_bufferizer.py @@ -2,7 +2,7 @@ import logging from collections import deque from itertools import chain, takewhile -from funcy import first, repeatedly +from funcy import first, repeatedly, compose from pyinfra.server.bufferizer.buffer import bufferize from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing @@ -119,19 +119,15 @@ class StreamBuffer: # return Nothing -def make_queue_consumer(buffer: StreamBuffer): +def make_consumer(buffer: StreamBuffer): - def compute_next(queue): - return next(compute(queue)) - - def compute(queue): - yield from consume_queue(queue) + def compute(items): + yield from consume(items) yield from flush_buffer() yield signal_termination() - def consume_queue(queue): - queue_items = stream_queue(queue) - yield from chain.from_iterable(map(buffer, queue_items)) + def consume(items): + yield from chain.from_iterable(map(buffer, items)) def flush_buffer(): yield from buffer(Nothing) @@ -139,7 +135,7 @@ def make_queue_consumer(buffer: StreamBuffer): def signal_termination(): return Nothing - return compute_next + return compose(first, compute) def stream_queue(queue): diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index f24cdeb..af718d3 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -3,7 +3,7 @@ import logging from flask import Flask, jsonify, request from funcy import compose, identity -from pyinfra.server.bufferizer.lazy_bufferizer import Queue, StreamBuffer, make_queue_consumer +from pyinfra.server.bufferizer.lazy_bufferizer import Queue, StreamBuffer, stream_queue, make_consumer from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.utils import unpack, normalize, pack from pyinfra.utils.func import starlift, lift @@ -44,14 +44,14 @@ class RestStreamProcessor: self.queue = Queue() # self.processor = QueueBufferCoupler(self.queue, StreamBuffer(fn)) # self.output_stream = chain.from_iterable(map(StreamBuffer(fn), stream_queue(self.queue))) - self.fn = make_queue_consumer(StreamBuffer(fn)) + self.fn = make_consumer(StreamBuffer(fn)) def submit(self, request): self.queue.append(request.json) return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix)) def pickup(self): - result = self.fn(self.queue) + result = self.fn(stream_queue(self.queue)) if not valid(result): logger.error(f"Received invalid result: {result}") diff --git a/test/fixtures/server.py b/test/fixtures/server.py index ef6bb83..494ddc0 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -51,10 +51,14 @@ def operation_conditionally_batched(operation, batched): @pytest.fixture -def operation(item_type, batched): +def operation(item_type, batched, one_to_many): def upper(string: bytes, metadata): return string.decode().upper().encode(), metadata + def duplicate(string: bytes, metadata): + for _ in range(2): + yield upper(string, metadata) + def rotate(im: bytes, metadata): im = Image.open(io.BytesIO(im)) return image_to_bytes(im.rotate(90)), metadata @@ -65,7 +69,7 @@ def operation(item_type, batched): yield f"page_{i}".encode(), metadata try: - return {"string": upper, "image": rotate, "pdf": stream_pages}[item_type] + return {"string": duplicate if one_to_many else upper, "image": rotate, "pdf": stream_pages}[item_type] except KeyError: raise ValueError(f"No operation specified for item type {item_type}") @@ -75,6 +79,11 @@ def item_type(request): return request.param +@pytest.fixture(params=[True, False]) +def one_to_many(request): + return request.param + + @pytest.fixture(params=[False, True]) def batched(request): """Controls, whether the buffer processor function of the webserver is applied to batches or single items."""