From 630ed51b27e4df35e74ac64e4f890bdf7e1fb4d9 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Wed, 4 May 2022 10:52:19 +0200 Subject: [PATCH] refactoring --- pyinfra/server/rest.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/pyinfra/server/rest.py b/pyinfra/server/rest.py index 8c9c47c..4ddcbe8 100644 --- a/pyinfra/server/rest.py +++ b/pyinfra/server/rest.py @@ -17,13 +17,8 @@ logger.setLevel(logging.WARNING) def post_partial(url, data: Iterable[bytes], metadata: dict): """Posts `data` to `url` and aggregates responses for each element of `data`.""" - - input_data_to_result_data = rcompose( - poster_patcher(url), - flatten, # each analysis call returns an iterable. Can be empty, singleton or multi item. Hence, flatten. - ) - - return input_data_to_result_data(data, repeat(metadata)) + pipe = pipeline(url, identity) + yield from pipe(data, repeat(metadata)) def submit_and_pickup(url, data, metadata): @@ -35,18 +30,26 @@ def submit_and_pickup(url, data, metadata): - responses must have status code 206 for more responses coming and 204 for the last response already sent """ - input_data_to_payload_stream = rcompose( - poster_patcher(f"{url}/submit"), + receiver = rcompose( lift(itemgetter("pickup_endpoint")), lift(lambda ep: f"{url}/{ep}"), lift(stream_response_payloads), - flatten, ) - yield from input_data_to_payload_stream(data, repeat(metadata)) + pipe = pipeline(f"{url}/submit", receiver) + + yield from pipe(data, repeat(metadata)) -def poster_patcher(url): +def pipeline(url, receiver): + return rcompose( + sender(url), + receiver, + flatten, # each analysis call returns an iterable. Can be empty, singleton or multi item. Hence, flatten. + ) + + +def sender(url): send_data_with_method_to_analyzer = starlift(dispatcher(url))