From fd572616311a82a7800a55aa3ace24a9351ecc1f Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Fri, 29 Apr 2022 15:43:20 +0200 Subject: [PATCH] signature harminization for 1 -> 1 and 1 -> n WIP --- pyinfra/rest.py | 37 ++++++++++++++----- .../partial_response_test.py | 23 ++++++++---- .../exploration_tests/pickup_endpoint_test.py | 25 +++++++------ test/fixtures/server.py | 6 +-- 4 files changed, 59 insertions(+), 32 deletions(-) diff --git a/pyinfra/rest.py b/pyinfra/rest.py index 845292f..b44eedf 100644 --- a/pyinfra/rest.py +++ b/pyinfra/rest.py @@ -1,8 +1,8 @@ -from itertools import chain +from itertools import chain, tee from operator import itemgetter +from typing import Iterable -from funcy import compose, first, rcompose -from more_itertools import flatten +from funcy import compose, first, rcompose, flatten from pyinfra.utils.func import star, lift, lstarlift, starlift from test.utils.server import bytes_to_string, string_to_bytes @@ -24,13 +24,30 @@ def bundle(data: bytes, metadata: dict): def unpack_op_pack(operation): - return compose(first, lstarlift(pack), star(operation), unpack) + return compose(inspect("A2"), flatten, inspect("A1"), lstarlift(pack), star(operation), unpack) def unpack_batchop_pack(operation): - return rcompose( - lift(unpack), # unpack the buffer items - operation, # apply operation on unpacked items - flatten, # operations may be 1 -> 1, 1 -> n or n -> 1, hence flatten - lstarlift(pack), - ) + raise BrokenPipeError + # return rcompose( + # lift(unpack), # unpack the buffer items + # operation, # apply operation on unpacked items + # flatten, # operations may be 1 -> 1, 1 -> n or n -> 1, hence flatten + # lstarlift(pack), + # ) + + +def inspect(msg="ins"): + def inner(x): + + if isinstance(x, Iterable) and not isinstance(x, dict): + print(11111111111111111111) + x = list(x) + else: + print("00000000000") + + print(msg, x) + + return x + + return inner diff --git a/test/exploration_tests/partial_response_test.py b/test/exploration_tests/partial_response_test.py index 1f73c09..84fa242 100644 --- a/test/exploration_tests/partial_response_test.py +++ b/test/exploration_tests/partial_response_test.py @@ -7,8 +7,9 @@ from typing import Iterable import pytest import requests from funcy import rcompose, compose, rpartial, identity, lmap, ilen, first +from more_itertools import flatten -from pyinfra.rest import pack, unpack, bundle +from pyinfra.rest import pack, unpack, bundle, inspect from pyinfra.utils.func import lift, starlift, parallel_map, star, lstarlift logger = logging.getLogger("PIL.PngImagePlugin") @@ -26,25 +27,33 @@ def post_partial(url, input_data: Iterable[bytes], metadata): pack_data_and_metadata_for_rest_transfer = lift(rpartial(pack, metadata)) dispatch_http_method_left_and_forward_data_right = parallel_map(dispatch_methods, lift(identity)) send_data_with_method_to_analyzer = starlift(send) - extract_payload_from_responses = lift(methodcaller("json")) - flatten_buffered_payloads = chain.from_iterable - interpret_payloads = lift(compose(star(bundle), unpack)) + extract_payload_from_responses = compose(flatten, lift(methodcaller("json"))) + flatten_buffered_payloads = flatten input_data_to_result_data = rcompose( pack_data_and_metadata_for_rest_transfer, dispatch_http_method_left_and_forward_data_right, send_data_with_method_to_analyzer, + inspect("B"), extract_payload_from_responses, + inspect("C"), flatten_buffered_payloads, - interpret_payloads, + inspect("D"), ) return input_data_to_result_data(input_data) -@pytest.mark.parametrize("item_type", ["string", "image"]) +@pytest.mark.parametrize("item_type", ["string"]) def test_sending_partial_request(url, data_items, metadata, operation, server_process): - expected = lmap(compose(first, lstarlift(bundle), partial(operation, metadata=metadata)), data_items) + op = compose(lstarlift(pack), partial(operation, metadata=metadata)) + expected = list(flatten(map(op, data_items))) + print() + print("exp") + print(expected) output = list(post_partial(f"{url}/process", data_items, metadata)) + print() + print("out") + print(output) assert output == expected diff --git a/test/exploration_tests/pickup_endpoint_test.py b/test/exploration_tests/pickup_endpoint_test.py index 4bc651b..74149f7 100644 --- a/test/exploration_tests/pickup_endpoint_test.py +++ b/test/exploration_tests/pickup_endpoint_test.py @@ -1,5 +1,5 @@ import json -from operator import itemgetter, methodcaller +from operator import itemgetter, methodcaller, attrgetter import pytest import requests @@ -9,17 +9,18 @@ from pyinfra.rest import pack @pytest.mark.parametrize("item_type", ["pdf"]) -def test_sending_partial_request(url, server_process, pdf, metadata, operation): +def test_pickup_endpoint(url, server_process, pdf, metadata, operation): + def post(package): + return requests.post(f"{url}/submit", json=package) - def post(data): - return requests.post(f"{url}/submit", data=data) - - pickup = compose(itemgetter("pickup_endpoint"), methodcaller("json"), post, rpartial(pack, metadata))(pdf) + pickup = compose( + itemgetter("pickup_endpoint"), + methodcaller("json"), + post, + rpartial(pack, metadata), + )(pdf) print(pickup) - while True: - response = requests.get(f"{url}/{pickup}") - print(response) - - - + # while True: + # response = requests.get(f"{url}/{pickup}") + # print(response) diff --git a/test/fixtures/server.py b/test/fixtures/server.py index e0a94a7..bf5940a 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -58,11 +58,11 @@ def processor_fn(operation, buffer_size, batched): @pytest.fixture def operation(item_type, batched): def upper(string: bytes, metadata): - return string.decode().upper().encode(), metadata + return [(string.decode().upper().encode(), metadata)] def rotate(im: bytes, metadata): im = Image.open(io.BytesIO(im)) - return image_to_bytes(im.rotate(90)), metadata + return [(image_to_bytes(im.rotate(90)), metadata)] def stream_pages(pdf: bytes, metadata): for page in fitz.open(stream=pdf): @@ -79,7 +79,7 @@ def item_type(request): return request.param -@pytest.fixture(params=[False, True]) +@pytest.fixture(params=[False]) def batched(request): """Controls, whether the buffer processor function of the webserver is applied to batches or single items.""" return request.param