diff --git a/pyinfra/server/rest.py b/pyinfra/server/rest.py index 41e48da..b0a5261 100644 --- a/pyinfra/server/rest.py +++ b/pyinfra/server/rest.py @@ -10,26 +10,30 @@ from pyinfra.server.pipeline import Pipeline from pyinfra.server.receiver.receivers.rest import RestReceiver -def process_eagerly(url, data: Iterable[bytes], metadata: Iterable[dict]): +def process_eagerly(endpoint, data: Iterable[bytes], metadata: Iterable[dict]): """Posts `data` to `url` and aggregates responses for each element of `data`.""" - - pipeline = Pipeline(*pipeline_head(url), IdentityInterpreter()) - + pipeline = get_eager_pipeline(endpoint) yield from pipeline(data, metadata) -def process_lazily(url, data: Iterable[bytes], metadata: Iterable[dict]): +def process_lazily(endpoint, data: Iterable[bytes], metadata: Iterable[dict]): """Posts `data` to `url` and aggregates responses for each element of `data` by querying a pickup endpoint. Requires: - responses must provide return pickup_endpoint as JSON payload - responses must have status code 206 for more responses coming and 204 for the last response already sent """ - - pipeline = Pipeline(*pipeline_head(url), rcompose(RestPickupStreamer(), RestReceiver())) - + pipeline = get_lazy_pipeline(endpoint) yield from pipeline(data, metadata) +def get_eager_pipeline(endpoint): + return Pipeline(*pipeline_head(endpoint), IdentityInterpreter()) + + +def get_lazy_pipeline(endpoint): + return Pipeline(*pipeline_head(endpoint), rcompose(RestPickupStreamer(), RestReceiver())) + + def pipeline_head(url): return RestPacker(), RestDispatcher(url), RestReceiver() diff --git a/test/exploration_tests/partial_response_test.py b/test/exploration_tests/partial_response_test.py index 560175f..0f5ae24 100644 --- a/test/exploration_tests/partial_response_test.py +++ b/test/exploration_tests/partial_response_test.py @@ -7,6 +7,6 @@ from pyinfra.server.utils import unpack @pytest.mark.parametrize("batched", [True, False]) @pytest.mark.parametrize("item_type", ["pdf", "string", "image"]) -def test_sending_partial_request(url, input_data_items, metadata, operation, target_data_items, server_process): +def test_sending_partial_request(url, input_data_items, metadata, target_data_items, server_process): output = lmap(unpack, process_eagerly(f"{url}/process", input_data_items, metadata)) assert output == target_data_items diff --git a/test/fixtures/input.py b/test/fixtures/input.py index abb3282..e0b1faf 100644 --- a/test/fixtures/input.py +++ b/test/fixtures/input.py @@ -37,6 +37,14 @@ def target_data_items(input_data_items, item_type, operation, metadata): return expected +@pytest.fixture +def endpoint(url, analysis_type): + return { + "eager": f"{url}/process", + "lazy": f"{url}/submit", + }[analysis_type] + + @pytest.fixture(params=[0, 1, 5, 10]) def n_items(request): return request.param @@ -47,6 +55,16 @@ def n_pages(request): return request.param +@pytest.fixture(params=["eager", "lazy"]) +def analysis_type(request): + return request.param + + +@pytest.fixture(params=[1, 5, 90]) +def buffer_size(request): + return request.param + + def array_to_image(array) -> Image.Image: return Image.fromarray(np.uint8(array * 255), mode="RGB") @@ -55,11 +73,6 @@ def input_batch(n_items): return np.random.random_sample(size=(n_items, 3, 30, 30)) -@pytest.fixture(params=[1, 5, 90]) -def buffer_size(request): - return request.param - - def images(n_items): return lmap(compose(image_to_bytes, array_to_image), input_batch(n_items)) diff --git a/test/unit_tests/pipeline_test.py b/test/unit_tests/pipeline_test.py index c4395eb..a1a1415 100644 --- a/test/unit_tests/pipeline_test.py +++ b/test/unit_tests/pipeline_test.py @@ -1,6 +1,13 @@ -from funcy import rcompose +import pytest +from funcy import rcompose, lmap +from pyinfra.server.dispatcher.senders.rest import RestDispatcher +from pyinfra.server.interpreter.interpreters.identity import IdentityInterpreter +from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer +from pyinfra.server.packer.packers.rest import RestPacker from pyinfra.server.pipeline import Pipeline +from pyinfra.server.receiver.receivers.rest import RestReceiver +from pyinfra.server.utils import unpack from pyinfra.utils.func import lift @@ -13,3 +20,28 @@ def test_mock_pipeline(): pipeline = Pipeline(f, g, h, u) assert list(pipeline(data)) == list(rcompose(f, g, h, u)(data)) + + +@pytest.mark.parametrize("batched", [True, False]) +@pytest.mark.parametrize("item_type", ["pdf", "string", "image"]) +def test_pipeline(pipeline, input_data_items, metadata, target_data_items): + output = lmap(unpack, pipeline(input_data_items, metadata)) + assert output == target_data_items + + +@pytest.fixture +def pipeline(rest_pipeline): + return rest_pipeline + + +@pytest.fixture +def rest_pipeline(server_process, endpoint, rest_interpreter): + return Pipeline(RestPacker(), RestDispatcher(endpoint), RestReceiver(), rest_interpreter) + + +@pytest.fixture +def rest_interpreter(analysis_type): + return { + "eager": IdentityInterpreter(), + "lazy": rcompose(RestPickupStreamer(), RestReceiver()), + }[analysis_type] diff --git a/test/unit_tests/rest/receiver_test.py b/test/unit_tests/rest/receiver_test.py index b71eefa..f1c66d3 100644 --- a/test/unit_tests/rest/receiver_test.py +++ b/test/unit_tests/rest/receiver_test.py @@ -6,7 +6,7 @@ from pyinfra.server.dispatcher.senders.rest import RestDispatcher @pytest.mark.parametrize("batched", [True, False]) @pytest.mark.parametrize("item_type", ["string", "image", "pdf"]) -def test_rest_receiver(url, packages, server_process): - sender = RestDispatcher(f"{url}/process") +def test_rest_rest_receiver(url, packages, server_process): + dispatcher = RestDispatcher(f"{url}/process") receiver = RestReceiver() - assert all((isinstance(r, list) for r in receiver(sender(packages)))) + assert all((isinstance(r, list) for r in receiver(dispatcher(packages))))