refactoring

This commit is contained in:
Matthias Bisping 2022-05-04 10:45:13 +02:00
parent cf0a877569
commit a4079f6710

View File

@ -18,13 +18,8 @@ logger.setLevel(logging.WARNING)
def post_partial(url, data: Iterable[bytes], metadata: dict):
"""Posts `data` to `url` and aggregates responses for each element of `data`."""
send_data_with_method_to_analyzer = sender(f"{url}/submit")
input_data_to_result_data = rcompose(
pack_data_and_metadata_for_rest_transfer,
dispatch_http_method_left_and_forward_data_right,
send_data_with_method_to_analyzer,
extract_payload_from_responses,
poster_patcher(url),
flatten, # each analysis call returns an iterable. Can be empty, singleton or multi item. Hence, flatten.
)
@ -40,13 +35,8 @@ def submit_and_pickup(url, data, metadata):
- responses must have status code 206 for more responses coming and 204 for the last response already sent
"""
send_data_with_method_to_analyzer = sender(f"{url}/submit")
input_data_to_payload_stream = rcompose(
pack_data_and_metadata_for_rest_transfer,
dispatch_http_method_left_and_forward_data_right,
send_data_with_method_to_analyzer,
extract_payload_from_responses,
poster_patcher(f"{url}/submit"),
lift(itemgetter("pickup_endpoint")),
lift(lambda ep: f"{url}/{ep}"),
lift(stream_response_payloads),
@ -56,8 +46,16 @@ def submit_and_pickup(url, data, metadata):
yield from input_data_to_payload_stream(data, repeat(metadata))
def sender(url):
return starlift(dispatcher(url))
def poster_patcher(url):
send_data_with_method_to_analyzer = starlift(dispatcher(url))
return rcompose(
pack_data_and_metadata_for_rest_transfer,
dispatch_http_method_left_and_forward_data_right,
send_data_with_method_to_analyzer,
extract_payload_from_responses,
)
def dispatcher(endpoint):