From 51a6bf9875c66c62fab0cbc2929edcddd7bc562e Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Tue, 3 May 2022 18:06:28 +0200 Subject: [PATCH] refactoring --- pyinfra/rest.py | 48 +++++++++++++-- .../exploration_tests/pickup_endpoint_test.py | 61 +------------------ 2 files changed, 45 insertions(+), 64 deletions(-) diff --git a/pyinfra/rest.py b/pyinfra/rest.py index 0d2f715..c20a2f8 100644 --- a/pyinfra/rest.py +++ b/pyinfra/rest.py @@ -1,11 +1,11 @@ import logging -from _operator import methodcaller -from itertools import repeat, chain +from _operator import methodcaller, itemgetter +from itertools import repeat, chain, starmap, takewhile from operator import itemgetter from typing import Iterable, Dict, List, Callable, Union, Tuple import requests -from funcy import compose, ilen, rpartial, identity, flatten, rcompose +from funcy import compose, ilen, identity, flatten, rcompose, repeatedly from pyinfra.exceptions import UnexpectedItemType from pyinfra.utils.func import star, lift, lstarlift, parallel_map, starlift @@ -79,7 +79,6 @@ def post_partial(url, data: Iterable[bytes], metadata: dict): response.raise_for_status() return response - pack_data_and_metadata_for_rest_transfer = lift(rpartial(pack, metadata)) dispatch_http_method_left_and_forward_data_right = parallel_map(dispatch_methods, lift(identity)) send_data_with_method_to_analyzer = starlift(send) extract_payload_from_responses = lift(methodcaller("json")) @@ -92,4 +91,43 @@ def post_partial(url, data: Iterable[bytes], metadata: dict): flatten, # each analysis call returns an iterable. Can be empty, singleton or multi item. Hence, flatten. ) - return input_data_to_result_data(data) + return input_data_to_result_data(data, repeat(metadata)) + + +def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable): + yield from starmap(pack, zip(data, metadata)) + + +def stream_response_payloads(endpoint): + def receive(): + response = requests.get(endpoint) + return response + + def more_is_coming(response): + return response.status_code == 206 + + def load_payload(response): + return response.json() + + response_stream = takewhile(more_is_coming, repeatedly(receive)) + payloads = map(load_payload, response_stream) + yield from payloads + + +def submit_and_pickup(url, data, metadata): + def post(package): + response = requests.post(f"{url}/submit", json=package) + response.raise_for_status() + return response + + input_data_to_payload_stream = rcompose( + pack_data_and_metadata_for_rest_transfer, + lift(post), + lift(methodcaller("json")), + lift(itemgetter("pickup_endpoint")), + lift(lambda ep: f"{url}/{ep}"), + lift(stream_response_payloads), + flatten, + ) + + yield from input_data_to_payload_stream(data, repeat(metadata)) diff --git a/test/exploration_tests/pickup_endpoint_test.py b/test/exploration_tests/pickup_endpoint_test.py index 7d9ac83..bd60de4 100644 --- a/test/exploration_tests/pickup_endpoint_test.py +++ b/test/exploration_tests/pickup_endpoint_test.py @@ -1,64 +1,7 @@ -from functools import partial -from itertools import starmap, repeat -from itertools import takewhile -from operator import itemgetter, methodcaller -from typing import Iterable, Tuple - import pytest -import requests -from funcy import repeatedly, flatten, rcompose, lmap +from funcy import lmap -from pyinfra.rest import pack, unpack -from pyinfra.utils.func import lift - - -def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable): - yield from starmap(pack, zip(data, metadata)) - - -def send_data_to_analyzer(url, payloads: Iterable[Tuple]): - def post(package): - response = requests.post(f"{url}/submit", json=package) - response.raise_for_status() - return response - - yield from map(post, payloads) - - -def stream_response_payloads(endpoint): - def receive(): - response = requests.get(endpoint) - return response - - def more_is_coming(response): - return response.status_code == 206 - - def load_payload(response): - return response.json() - - response_stream = takewhile(more_is_coming, repeatedly(receive)) - payloads = map(load_payload, response_stream) - yield from payloads - - -def submit_and_pickup(url, data, metadata): - - def post(package): - response = requests.post(f"{url}/submit", json=package) - response.raise_for_status() - return response - - input_data_to_payload_stream = rcompose( - pack_data_and_metadata_for_rest_transfer, - lift(post), - lift(methodcaller("json")), - lift(itemgetter("pickup_endpoint")), - lift(lambda ep: f"{url}/{ep}"), - lift(stream_response_payloads), - flatten, - ) - - yield from input_data_to_payload_stream(data, repeat(metadata)) +from pyinfra.rest import unpack, submit_and_pickup @pytest.mark.parametrize("batched", [True, False])