diff --git a/test/exploration_tests/pickup_endpoint_test.py b/test/exploration_tests/pickup_endpoint_test.py index 7973a67..7d9ac83 100644 --- a/test/exploration_tests/pickup_endpoint_test.py +++ b/test/exploration_tests/pickup_endpoint_test.py @@ -12,10 +12,6 @@ from pyinfra.rest import pack, unpack from pyinfra.utils.func import lift -def pluck_pickup_endpoints(payloads): - return map(itemgetter("pickup_endpoint"), payloads) - - def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable): yield from starmap(pack, zip(data, metadata)) @@ -29,31 +25,35 @@ def send_data_to_analyzer(url, payloads: Iterable[Tuple]): yield from map(post, payloads) -def extract_payload_from_responses(responses): - yield from flatten(map(methodcaller("json"), responses)) +def stream_response_payloads(endpoint): + def receive(): + response = requests.get(endpoint) + return response + + def more_is_coming(response): + return response.status_code == 206 + + def load_payload(response): + return response.json() + + response_stream = takewhile(more_is_coming, repeatedly(receive)) + payloads = map(load_payload, response_stream) + yield from payloads def submit_and_pickup(url, data, metadata): - def stream_response_payloads(endpoint): - def receive(): - response = requests.get(f"{url}/{endpoint}") - return response - def more_is_coming(response): - return response.status_code == 206 - - def load_payload(response): - return response.json() - - response_stream = takewhile(more_is_coming, repeatedly(receive)) - payloads = map(load_payload, response_stream) - yield from payloads + def post(package): + response = requests.post(f"{url}/submit", json=package) + response.raise_for_status() + return response input_data_to_payload_stream = rcompose( pack_data_and_metadata_for_rest_transfer, - partial(send_data_to_analyzer, url), - extract_payload_from_responses, - pluck_pickup_endpoints, + lift(post), + lift(methodcaller("json")), + lift(itemgetter("pickup_endpoint")), + lift(lambda ep: f"{url}/{ep}"), lift(stream_response_payloads), flatten, )