added batching wrapper for internally batching functions

This commit is contained in:
Matthias Bisping 2022-04-29 10:35:55 +02:00
parent bc0e9ed643
commit 00276dbcc7
3 changed files with 22 additions and 18 deletions

View File

@ -2,7 +2,7 @@ from operator import itemgetter
from funcy import compose
from pyinfra.utils.func import star
from pyinfra.utils.func import star, lift, lstarlift
from test.utils.server import bytes_to_string, string_to_bytes
@ -21,5 +21,9 @@ def bundle(data: bytes, metadata: dict):
return package
def wrap_operation(operation):
def unpack_op_pack(operation):
return compose(star(pack), star(operation), unpack)
def unpack_batchop_pack(operation):
return compose(lstarlift(pack), operation, lift(unpack))

View File

@ -15,6 +15,10 @@ def starlift(fn):
return curry(starmap)(fn)
def lstarlift(fn):
return compose(list, starlift(fn))
def parallel(*fs):
return lambda *args: (f(a) for f, a in zip(fs, args))

View File

@ -7,12 +7,12 @@ import fitz
import pytest
import requests
from PIL import Image
from funcy import retry
from funcy import retry, compose
from waitress import serve
from pyinfra.rest import wrap_operation
from pyinfra.rest import unpack_op_pack, unpack_batchop_pack
from pyinfra.utils.buffer import bufferize
from pyinfra.utils.func import llift
from pyinfra.utils.func import llift, starlift
from test.server import set_up_processing_server
from test.utils.image import image_to_bytes
@ -45,10 +45,12 @@ def server(processor_fn):
@pytest.fixture
def processor_fn(operation, buffer_size, batched):
operation = wrap_operation(operation)
if not batched:
operation = llift(operation)
if batched:
operation = starlift(operation)
wrapper = unpack_batchop_pack if batched else compose(llift, unpack_op_pack)
operation = wrapper(operation)
return bufferize(operation, buffer_size=buffer_size, persist_fn=attrgetter("json"))
@ -66,24 +68,18 @@ def operation(item_type, batched):
for page in fitz.open(stream=pdf):
yield page.get_pixmap().tobytes("png"), metadata
if item_type == "string":
operation = upper
elif item_type == "image":
operation = rotate
elif item_type == "pdf":
operation = stream_pages
else:
try:
return {"string": upper, "image": rotate, "pdf": stream_pages}[item_type]
except KeyError:
raise ValueError(f"No operation specified for item type {item_type}")
return operation
@pytest.fixture(params=["string"])
def item_type(request):
return request.param
@pytest.fixture(params=[False])
@pytest.fixture(params=[False, True])
def batched(request):
"""Controls, whether the buffer processor function of the webserver is applied to batches or single items."""
return request.param