diff --git a/pyinfra/server/rest.py b/pyinfra/server/rest.py index 86c60da..5d7ba46 100644 --- a/pyinfra/server/rest.py +++ b/pyinfra/server/rest.py @@ -14,13 +14,13 @@ logger = logging.getLogger("PIL.PngImagePlugin") logger.setLevel(logging.WARNING) -def process_eagerly(url, data: Iterable[bytes], metadata: dict): +def process_eagerly(url, data: Iterable[bytes], metadata: Iterable[dict]): """Posts `data` to `url` and aggregates responses for each element of `data`.""" pipe = pipeline(url, identity) - yield from pipe(data, repeat(metadata)) + yield from pipe(data, metadata) -def process_lazily(url, data, metadata): +def process_lazily(url, data: Iterable[bytes], metadata: Iterable[dict]): """Posts `data` to `url` and aggregates responses for each element of `data` by querying a pickup endpoint. Requires: @@ -37,7 +37,7 @@ def process_lazily(url, data, metadata): pipe = pipeline(f"{url}/submit", receiver) - yield from pipe(data, repeat(metadata)) + yield from pipe(data, metadata) def stream_response_payloads(endpoint): @@ -58,16 +58,16 @@ def stream_response_payloads(endpoint): def pipeline(url, receiver): return rcompose( - sender(url), + head(url), receiver, flatten, # each analysis call returns an iterable. Can be empty, singleton or multi item. Hence, flatten. ) -def sender(endpoint): +def head(endpoint): """Builds a function that sends post or patch requests an endpoint.""" - send_data_with_method_to_analyzer = starlift(http_method_dispatcher(endpoint)) + send_data_with_method_to_analyzer = starlift(sender(endpoint)) def send(data: Iterable[bytes], metadata: Iterable[dict]): """Sends packages of data and metadata to endpoint and returns response.""" @@ -85,7 +85,7 @@ def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable) yield from starmap(pack, zip(data, metadata)) -def http_method_dispatcher(endpoint): +def sender(endpoint): def send(method, data): response = method(endpoint, json=data) response.raise_for_status() diff --git a/test/exploration_tests/partial_response_test.py b/test/exploration_tests/partial_response_test.py index f60f918..0ed71a1 100644 --- a/test/exploration_tests/partial_response_test.py +++ b/test/exploration_tests/partial_response_test.py @@ -8,4 +8,4 @@ from pyinfra.server.rest import process_eagerly, unpack @pytest.mark.parametrize("item_type", ["pdf", "string", "image"]) def test_sending_partial_request(url, input_data_items, metadata, operation, target_data_items, server_process): output = lmap(unpack, process_eagerly(f"{url}/process", input_data_items, metadata)) - assert output == lmap(unpack, target_data_items) + assert output == target_data_items diff --git a/test/exploration_tests/pickup_endpoint_test.py b/test/exploration_tests/pickup_endpoint_test.py index 516c05a..d78e0d5 100644 --- a/test/exploration_tests/pickup_endpoint_test.py +++ b/test/exploration_tests/pickup_endpoint_test.py @@ -8,4 +8,4 @@ from pyinfra.server.rest import unpack, process_lazily @pytest.mark.parametrize("item_type", ["string", "image", "pdf"]) def test_pickup_endpoint(url, input_data_items, metadata, operation, target_data_items, server_process): output = lmap(unpack, process_lazily(url, input_data_items, metadata)) - assert output == lmap(unpack, target_data_items) + assert output == target_data_items diff --git a/test/fixtures/input.py b/test/fixtures/input.py index 0c8ca60..10247f4 100644 --- a/test/fixtures/input.py +++ b/test/fixtures/input.py @@ -1,11 +1,11 @@ -from functools import partial +from itertools import starmap, repeat import numpy as np import pytest from PIL import Image from funcy import lmap, compose, flatten -from pyinfra.server.rest import pack, normalize_item +from pyinfra.server.rest import pack, normalize_item, unpack from pyinfra.utils.func import star, lift from test.utils.image import image_to_bytes @@ -32,8 +32,8 @@ def input_data_items(item_type, n_items, pdf): @pytest.fixture def target_data_items(input_data_items, item_type, operation, metadata): - op = compose(lift(star(pack)), normalize_item, partial(operation, metadata=metadata)) - expected = list(flatten(map(op, input_data_items))) + op = compose(lift(star(pack)), normalize_item, operation) + expected = lmap(unpack, flatten(starmap(op, zip(input_data_items, metadata)))) return expected @@ -65,5 +65,5 @@ def images(n_items): @pytest.fixture -def metadata(): - return {"dummy": True} +def metadata(n_items): + return list(repeat({"dummy": True}, n_items))