Matthias Bisping 630ed51b27 refactoring
2022-05-04 10:52:19 +02:00

152 lines
4.2 KiB
Python

import logging
from _operator import methodcaller, itemgetter
from itertools import repeat, chain, starmap, takewhile
from operator import itemgetter
from typing import Iterable, Dict, List, Callable, Union, Tuple
import requests
from funcy import compose, ilen, identity, flatten, rcompose, repeatedly
from pyinfra.exceptions import UnexpectedItemType
from pyinfra.utils.func import star, lift, lstarlift, parallel_map, starlift
from test.utils.server import bytes_to_string, string_to_bytes
logger = logging.getLogger("PIL.PngImagePlugin")
logger.setLevel(logging.WARNING)
def post_partial(url, data: Iterable[bytes], metadata: dict):
"""Posts `data` to `url` and aggregates responses for each element of `data`."""
pipe = pipeline(url, identity)
yield from pipe(data, repeat(metadata))
def submit_and_pickup(url, data, metadata):
"""Posts `data` to `url` and aggregates responses for each element of `data` by querying a pickup endpoint.
Requires:
- server must provide 'submit' endpoint
- responses must provide 'pickup_endpoint' field in JSON payload; must be suffix after root path
- responses must have status code 206 for more responses coming and 204 for the last response already sent
"""
receiver = rcompose(
lift(itemgetter("pickup_endpoint")),
lift(lambda ep: f"{url}/{ep}"),
lift(stream_response_payloads),
)
pipe = pipeline(f"{url}/submit", receiver)
yield from pipe(data, repeat(metadata))
def pipeline(url, receiver):
return rcompose(
sender(url),
receiver,
flatten, # each analysis call returns an iterable. Can be empty, singleton or multi item. Hence, flatten.
)
def sender(url):
send_data_with_method_to_analyzer = starlift(dispatcher(url))
return rcompose(
pack_data_and_metadata_for_rest_transfer,
dispatch_http_method_left_and_forward_data_right,
send_data_with_method_to_analyzer,
extract_payload_from_responses,
)
def dispatcher(endpoint):
def send(method, data):
response = method(endpoint, json=data)
response.raise_for_status()
return response
return send
def dispatch_http_method_left_and_forward_data_right(*args):
return parallel_map(dispatch_methods, lift(identity))(*args)
def extract_payload_from_responses(payloads):
return map(methodcaller("json"), payloads)
def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable):
yield from starmap(pack, zip(data, metadata))
def pack(data: bytes, metadata: dict):
package = {"data": bytes_to_string(data), "metadata": metadata}
return package
def unpack(package):
data, metadata = itemgetter("data", "metadata")(package)
return string_to_bytes(data), metadata
def dispatch_methods(data):
return *repeat(requests.patch, ilen(data) - 1), requests.post
def stream_response_payloads(endpoint):
def receive():
response = requests.get(endpoint)
return response
def more_is_coming(response):
return response.status_code == 206
def load_payload(response):
return response.json()
response_stream = takewhile(more_is_coming, repeatedly(receive))
payloads = map(load_payload, response_stream)
yield from payloads
def unpack_op_pack(operation) -> Callable[[Dict], List[Dict]]:
return compose(lstarlift(pack), normalize, star(operation), unpack)
def unpack_batchop_pack(operation) -> Callable[[List[Dict]], List[Dict]]:
return compose(lstarlift(pack), normalize, operation, lift(unpack))
def normalize(itr: Iterable[Union[Tuple, Iterable]]) -> Iterable[Tuple]:
return chain.from_iterable(map(normalize_item, normalize_item(itr)))
def normalize_item(itm: Union[Tuple, Iterable]) -> Iterable:
if isinstance(itm, tuple):
return [itm]
elif isinstance(itm, Iterable):
return itm
else:
raise UnexpectedItemType("Encountered an item that could not be normalized to a list.")
def inspect(msg="ins", embed=False):
def inner(x):
if isinstance(x, Iterable) and not isinstance(x, dict):
x = list(x)
print(msg, x)
if embed:
import IPython
IPython.embed()
return x
return inner