From 7d8659f2576c5ea858d14ff4c4eaa4c247181cbe Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Tue, 3 May 2022 18:21:51 +0200 Subject: [PATCH] topological sorting of definitions by caller hierarchy --- pyinfra/rest.py | 151 +++++++++++++++++++++++++----------------------- 1 file changed, 78 insertions(+), 73 deletions(-) diff --git a/pyinfra/rest.py b/pyinfra/rest.py index c20a2f8..2911eef 100644 --- a/pyinfra/rest.py +++ b/pyinfra/rest.py @@ -15,65 +15,9 @@ logger = logging.getLogger("PIL.PngImagePlugin") logger.setLevel(logging.WARNING) -def pack(data: bytes, metadata: dict): - package = {"data": bytes_to_string(data), "metadata": metadata} - return package - - -def unpack(package): - data, metadata = itemgetter("data", "metadata")(package) - return string_to_bytes(data), metadata - - -def bundle(data: bytes, metadata: dict): - package = {"data": data, "metadata": metadata} - return package - - -def normalize(itr: Iterable[Union[Tuple, Iterable]]) -> Iterable[Tuple]: - return chain.from_iterable(map(normalize_item, normalize_item(itr))) - - -def normalize_item(itm: Union[Tuple, Iterable]) -> Iterable: - if isinstance(itm, tuple): - return [itm] - elif isinstance(itm, Iterable): - return itm - else: - raise UnexpectedItemType("Encountered an item that could not be normalized to a list.") - - -def unpack_op_pack(operation) -> Callable[[Dict], List[Dict]]: - return compose(lstarlift(pack), normalize, star(operation), unpack) - - -def unpack_batchop_pack(operation) -> Callable[[List[Dict]], List[Dict]]: - return compose(lstarlift(pack), normalize, operation, lift(unpack)) - - -def inspect(msg="ins", embed=False): - def inner(x): - - if isinstance(x, Iterable) and not isinstance(x, dict): - x = list(x) - - print(msg, x) - - if embed: - import IPython - - IPython.embed() - - return x - - return inner - - -def dispatch_methods(data): - return *repeat(requests.patch, ilen(data) - 1), requests.post - - def post_partial(url, data: Iterable[bytes], metadata: dict): + """Posts `data` to `url` and aggregates responses for each element of `data`.""" + def send(method, data): response = method(url, json=data) response.raise_for_status() @@ -94,10 +38,51 @@ def post_partial(url, data: Iterable[bytes], metadata: dict): return input_data_to_result_data(data, repeat(metadata)) +def submit_and_pickup(url, data, metadata): + """Posts `data` to `url` and aggregates responses for each element of `data` by querying a pickup endpoint. + + Requires: + - server must provide 'submit' endpoint + - responses must provide 'pickup_endpoint' field in JSON payload; must be suffix after root path + - responses must have status code 206 for more responses coming and 204 for the last response already sent + """ + + def post(package): + response = requests.post(f"{url}/submit", json=package) + response.raise_for_status() + return response + + input_data_to_payload_stream = rcompose( + pack_data_and_metadata_for_rest_transfer, + lift(post), + lift(methodcaller("json")), + lift(itemgetter("pickup_endpoint")), + lift(lambda ep: f"{url}/{ep}"), + lift(stream_response_payloads), + flatten, + ) + + yield from input_data_to_payload_stream(data, repeat(metadata)) + + def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable): yield from starmap(pack, zip(data, metadata)) +def pack(data: bytes, metadata: dict): + package = {"data": bytes_to_string(data), "metadata": metadata} + return package + + +def unpack(package): + data, metadata = itemgetter("data", "metadata")(package) + return string_to_bytes(data), metadata + + +def dispatch_methods(data): + return *repeat(requests.patch, ilen(data) - 1), requests.post + + def stream_response_payloads(endpoint): def receive(): response = requests.get(endpoint) @@ -114,20 +99,40 @@ def stream_response_payloads(endpoint): yield from payloads -def submit_and_pickup(url, data, metadata): - def post(package): - response = requests.post(f"{url}/submit", json=package) - response.raise_for_status() - return response +def unpack_op_pack(operation) -> Callable[[Dict], List[Dict]]: + return compose(lstarlift(pack), normalize, star(operation), unpack) - input_data_to_payload_stream = rcompose( - pack_data_and_metadata_for_rest_transfer, - lift(post), - lift(methodcaller("json")), - lift(itemgetter("pickup_endpoint")), - lift(lambda ep: f"{url}/{ep}"), - lift(stream_response_payloads), - flatten, - ) - yield from input_data_to_payload_stream(data, repeat(metadata)) +def unpack_batchop_pack(operation) -> Callable[[List[Dict]], List[Dict]]: + return compose(lstarlift(pack), normalize, operation, lift(unpack)) + + +def normalize(itr: Iterable[Union[Tuple, Iterable]]) -> Iterable[Tuple]: + return chain.from_iterable(map(normalize_item, normalize_item(itr))) + + +def normalize_item(itm: Union[Tuple, Iterable]) -> Iterable: + if isinstance(itm, tuple): + return [itm] + elif isinstance(itm, Iterable): + return itm + else: + raise UnexpectedItemType("Encountered an item that could not be normalized to a list.") + + +def inspect(msg="ins", embed=False): + def inner(x): + + if isinstance(x, Iterable) and not isinstance(x, dict): + x = list(x) + + print(msg, x) + + if embed: + import IPython + + IPython.embed() + + return x + + return inner