diff --git a/pyinfra/rest.py b/pyinfra/rest.py index 624b6dd..3591fd3 100644 --- a/pyinfra/rest.py +++ b/pyinfra/rest.py @@ -1,8 +1,6 @@ -from collections import deque from operator import itemgetter -import flask -from funcy import repeatedly, lmap, compose +from funcy import compose from pyinfra.utils.func import star from test.utils.server import bytes_to_string, string_to_bytes @@ -25,24 +23,3 @@ def bundle(data: bytes, metadata: dict): def wrap_operation(operation): return compose(star(pack), star(operation), unpack) - - -def make_processor(operation, buffer_size=3): - def processor_fn(request: flask.Request, final=False): - buffer.append(request.json) - response_payload = lmap(operation, repeatedly(buffer.popleft, n_items_to_pop(buffer, final))) - return response_payload - - def buffer_full(current_buffer_size): - assert current_buffer_size <= buffer_size - return current_buffer_size == buffer_size - - def n_items_to_pop(buffer, final): - current_buffer_size = len(buffer) - return (final or buffer_full(current_buffer_size)) * current_buffer_size - - operation = wrap_operation(operation) - - buffer = deque() - - return processor_fn diff --git a/pyinfra/utils/buffer.py b/pyinfra/utils/buffer.py new file mode 100644 index 0000000..030f290 --- /dev/null +++ b/pyinfra/utils/buffer.py @@ -0,0 +1,23 @@ +from collections import deque +from typing import Any + +from funcy import lmap, repeatedly + + +def bufferize(fn, buffer_size=3, persist_fn=lambda x: x): + def buffered_fn(item: Any, final=False): + buffer.append(persist_fn(item)) + response_payload = lmap(fn, repeatedly(buffer.popleft, n_items_to_pop(buffer, final))) + return response_payload + + def buffer_full(current_buffer_size): + assert current_buffer_size <= buffer_size + return current_buffer_size == buffer_size + + def n_items_to_pop(buffer, final): + current_buffer_size = len(buffer) + return (final or buffer_full(current_buffer_size)) * current_buffer_size + + buffer = deque() + + return buffered_fn diff --git a/test/fixtures/server.py b/test/fixtures/server.py index 006e9d5..6a7b6a8 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -1,6 +1,7 @@ import io import socket from multiprocessing import Process +from operator import attrgetter import fitz import pytest @@ -9,7 +10,8 @@ from PIL import Image from funcy import retry from waitress import serve -from pyinfra.rest import make_processor +from pyinfra.rest import wrap_operation +from pyinfra.utils.buffer import bufferize from test.server import set_up_processing_server from test.utils.image import image_to_bytes @@ -42,7 +44,7 @@ def server(processor_fn): @pytest.fixture def processor_fn(operation, buffer_size): - return make_processor(operation, buffer_size=buffer_size) + return bufferize(wrap_operation(operation), buffer_size=buffer_size, persist_fn=attrgetter("json")) @pytest.fixture