diff --git a/pyinfra/utils/buffer.py b/pyinfra/utils/buffer.py index 030f290..7127a12 100644 --- a/pyinfra/utils/buffer.py +++ b/pyinfra/utils/buffer.py @@ -1,13 +1,13 @@ from collections import deque from typing import Any -from funcy import lmap, repeatedly +from funcy import repeatedly def bufferize(fn, buffer_size=3, persist_fn=lambda x: x): def buffered_fn(item: Any, final=False): buffer.append(persist_fn(item)) - response_payload = lmap(fn, repeatedly(buffer.popleft, n_items_to_pop(buffer, final))) + response_payload = fn(repeatedly(buffer.popleft, n_items_to_pop(buffer, final))) return response_payload def buffer_full(current_buffer_size): diff --git a/pyinfra/utils/func.py b/pyinfra/utils/func.py index 059bfb3..8b7ed08 100644 --- a/pyinfra/utils/func.py +++ b/pyinfra/utils/func.py @@ -7,6 +7,10 @@ def lift(fn): return curry(map)(fn) +def llift(fn): + return compose(list, lift(fn)) + + def starlift(fn): return curry(starmap)(fn) diff --git a/test/fixtures/server.py b/test/fixtures/server.py index 6a7b6a8..e7c77b4 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -12,6 +12,7 @@ from waitress import serve from pyinfra.rest import wrap_operation from pyinfra.utils.buffer import bufferize +from pyinfra.utils.func import lift, llift from test.server import set_up_processing_server from test.utils.image import image_to_bytes @@ -43,12 +44,21 @@ def server(processor_fn): @pytest.fixture -def processor_fn(operation, buffer_size): - return bufferize(wrap_operation(operation), buffer_size=buffer_size, persist_fn=attrgetter("json")) +def processor_fn(operation, buffer_size, batched): + + fn = wrap_operation(operation) + + if not batched: + fn = llift(fn) + + return bufferize(fn, buffer_size=buffer_size, persist_fn=attrgetter("json")) @pytest.fixture def operation(item_type): + def upper(string: bytes, metadata): + return string.decode().upper().encode(), metadata + def rotate(im: bytes, metadata): im = Image.open(io.BytesIO(im)) return image_to_bytes(im.rotate(90)), metadata @@ -57,9 +67,6 @@ def operation(item_type): for page in fitz.open(stream=pdf): yield page.get_pixmap().tobytes("png"), metadata - def upper(string: bytes, metadata): - return string.decode().upper().encode(), metadata - if item_type == "string": return upper elif item_type == "image": @@ -75,6 +82,11 @@ def item_type(request): return request.param +@pytest.fixture(params=[False]) +def batched(request): + """Controls, whether the buffer processor function of the webserver is applied to batches or single items.""" + return request.param + @pytest.fixture(params=[1, 3, 7, 100]) def buffer_size(request): return request.param