diff --git a/pyinfra/rest.py b/pyinfra/rest.py index 3591fd3..0766ced 100644 --- a/pyinfra/rest.py +++ b/pyinfra/rest.py @@ -2,7 +2,7 @@ from operator import itemgetter from funcy import compose -from pyinfra.utils.func import star +from pyinfra.utils.func import star, lift, lstarlift from test.utils.server import bytes_to_string, string_to_bytes @@ -21,5 +21,9 @@ def bundle(data: bytes, metadata: dict): return package -def wrap_operation(operation): +def unpack_op_pack(operation): return compose(star(pack), star(operation), unpack) + + +def unpack_batchop_pack(operation): + return compose(lstarlift(pack), operation, lift(unpack)) diff --git a/pyinfra/utils/func.py b/pyinfra/utils/func.py index 8b7ed08..fed8161 100644 --- a/pyinfra/utils/func.py +++ b/pyinfra/utils/func.py @@ -15,6 +15,10 @@ def starlift(fn): return curry(starmap)(fn) +def lstarlift(fn): + return compose(list, starlift(fn)) + + def parallel(*fs): return lambda *args: (f(a) for f, a in zip(fs, args)) diff --git a/test/fixtures/server.py b/test/fixtures/server.py index d1d0a96..e0a94a7 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -7,12 +7,12 @@ import fitz import pytest import requests from PIL import Image -from funcy import retry +from funcy import retry, compose from waitress import serve -from pyinfra.rest import wrap_operation +from pyinfra.rest import unpack_op_pack, unpack_batchop_pack from pyinfra.utils.buffer import bufferize -from pyinfra.utils.func import llift +from pyinfra.utils.func import llift, starlift from test.server import set_up_processing_server from test.utils.image import image_to_bytes @@ -45,10 +45,12 @@ def server(processor_fn): @pytest.fixture def processor_fn(operation, buffer_size, batched): - operation = wrap_operation(operation) - if not batched: - operation = llift(operation) + if batched: + operation = starlift(operation) + + wrapper = unpack_batchop_pack if batched else compose(llift, unpack_op_pack) + operation = wrapper(operation) return bufferize(operation, buffer_size=buffer_size, persist_fn=attrgetter("json")) @@ -66,24 +68,18 @@ def operation(item_type, batched): for page in fitz.open(stream=pdf): yield page.get_pixmap().tobytes("png"), metadata - if item_type == "string": - operation = upper - elif item_type == "image": - operation = rotate - elif item_type == "pdf": - operation = stream_pages - else: + try: + return {"string": upper, "image": rotate, "pdf": stream_pages}[item_type] + except KeyError: raise ValueError(f"No operation specified for item type {item_type}") - return operation - @pytest.fixture(params=["string"]) def item_type(request): return request.param -@pytest.fixture(params=[False]) +@pytest.fixture(params=[False, True]) def batched(request): """Controls, whether the buffer processor function of the webserver is applied to batches or single items.""" return request.param