From a4079f6710e13a56379173eeed2907302fe3faf4 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Wed, 4 May 2022 10:45:13 +0200 Subject: [PATCH] refactoring --- pyinfra/server/rest.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/pyinfra/server/rest.py b/pyinfra/server/rest.py index 66c0a0d..8c9c47c 100644 --- a/pyinfra/server/rest.py +++ b/pyinfra/server/rest.py @@ -18,13 +18,8 @@ logger.setLevel(logging.WARNING) def post_partial(url, data: Iterable[bytes], metadata: dict): """Posts `data` to `url` and aggregates responses for each element of `data`.""" - send_data_with_method_to_analyzer = sender(f"{url}/submit") - input_data_to_result_data = rcompose( - pack_data_and_metadata_for_rest_transfer, - dispatch_http_method_left_and_forward_data_right, - send_data_with_method_to_analyzer, - extract_payload_from_responses, + poster_patcher(url), flatten, # each analysis call returns an iterable. Can be empty, singleton or multi item. Hence, flatten. ) @@ -40,13 +35,8 @@ def submit_and_pickup(url, data, metadata): - responses must have status code 206 for more responses coming and 204 for the last response already sent """ - send_data_with_method_to_analyzer = sender(f"{url}/submit") - input_data_to_payload_stream = rcompose( - pack_data_and_metadata_for_rest_transfer, - dispatch_http_method_left_and_forward_data_right, - send_data_with_method_to_analyzer, - extract_payload_from_responses, + poster_patcher(f"{url}/submit"), lift(itemgetter("pickup_endpoint")), lift(lambda ep: f"{url}/{ep}"), lift(stream_response_payloads), @@ -56,8 +46,16 @@ def submit_and_pickup(url, data, metadata): yield from input_data_to_payload_stream(data, repeat(metadata)) -def sender(url): - return starlift(dispatcher(url)) +def poster_patcher(url): + + send_data_with_method_to_analyzer = starlift(dispatcher(url)) + + return rcompose( + pack_data_and_metadata_for_rest_transfer, + dispatch_http_method_left_and_forward_data_right, + send_data_with_method_to_analyzer, + extract_payload_from_responses, + ) def dispatcher(endpoint):