refactoring: metadata argument as iterable instead of dict

This commit is contained in:
Matthias Bisping 2022-05-04 14:55:04 +02:00
parent 35045128b4
commit a301876ab9
4 changed files with 16 additions and 16 deletions

View File

@ -14,13 +14,13 @@ logger = logging.getLogger("PIL.PngImagePlugin")
logger.setLevel(logging.WARNING)
def process_eagerly(url, data: Iterable[bytes], metadata: dict):
def process_eagerly(url, data: Iterable[bytes], metadata: Iterable[dict]):
"""Posts `data` to `url` and aggregates responses for each element of `data`."""
pipe = pipeline(url, identity)
yield from pipe(data, repeat(metadata))
yield from pipe(data, metadata)
def process_lazily(url, data, metadata):
def process_lazily(url, data: Iterable[bytes], metadata: Iterable[dict]):
"""Posts `data` to `url` and aggregates responses for each element of `data` by querying a pickup endpoint.
Requires:
@ -37,7 +37,7 @@ def process_lazily(url, data, metadata):
pipe = pipeline(f"{url}/submit", receiver)
yield from pipe(data, repeat(metadata))
yield from pipe(data, metadata)
def stream_response_payloads(endpoint):
@ -58,16 +58,16 @@ def stream_response_payloads(endpoint):
def pipeline(url, receiver):
return rcompose(
sender(url),
head(url),
receiver,
flatten, # each analysis call returns an iterable. Can be empty, singleton or multi item. Hence, flatten.
)
def sender(endpoint):
def head(endpoint):
"""Builds a function that sends post or patch requests an endpoint."""
send_data_with_method_to_analyzer = starlift(http_method_dispatcher(endpoint))
send_data_with_method_to_analyzer = starlift(sender(endpoint))
def send(data: Iterable[bytes], metadata: Iterable[dict]):
"""Sends packages of data and metadata to endpoint and returns response."""
@ -85,7 +85,7 @@ def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable)
yield from starmap(pack, zip(data, metadata))
def http_method_dispatcher(endpoint):
def sender(endpoint):
def send(method, data):
response = method(endpoint, json=data)
response.raise_for_status()

View File

@ -8,4 +8,4 @@ from pyinfra.server.rest import process_eagerly, unpack
@pytest.mark.parametrize("item_type", ["pdf", "string", "image"])
def test_sending_partial_request(url, input_data_items, metadata, operation, target_data_items, server_process):
output = lmap(unpack, process_eagerly(f"{url}/process", input_data_items, metadata))
assert output == lmap(unpack, target_data_items)
assert output == target_data_items

View File

@ -8,4 +8,4 @@ from pyinfra.server.rest import unpack, process_lazily
@pytest.mark.parametrize("item_type", ["string", "image", "pdf"])
def test_pickup_endpoint(url, input_data_items, metadata, operation, target_data_items, server_process):
output = lmap(unpack, process_lazily(url, input_data_items, metadata))
assert output == lmap(unpack, target_data_items)
assert output == target_data_items

View File

@ -1,11 +1,11 @@
from functools import partial
from itertools import starmap, repeat
import numpy as np
import pytest
from PIL import Image
from funcy import lmap, compose, flatten
from pyinfra.server.rest import pack, normalize_item
from pyinfra.server.rest import pack, normalize_item, unpack
from pyinfra.utils.func import star, lift
from test.utils.image import image_to_bytes
@ -32,8 +32,8 @@ def input_data_items(item_type, n_items, pdf):
@pytest.fixture
def target_data_items(input_data_items, item_type, operation, metadata):
op = compose(lift(star(pack)), normalize_item, partial(operation, metadata=metadata))
expected = list(flatten(map(op, input_data_items)))
op = compose(lift(star(pack)), normalize_item, operation)
expected = lmap(unpack, flatten(starmap(op, zip(input_data_items, metadata))))
return expected
@ -65,5 +65,5 @@ def images(n_items):
@pytest.fixture
def metadata():
return {"dummy": True}
def metadata(n_items):
return list(repeat({"dummy": True}, n_items))