topological sorting of definitions by caller hierarchy

This commit is contained in:
Matthias Bisping 2022-05-03 18:21:51 +02:00
parent 51a6bf9875
commit 7d8659f257

View File

@ -15,65 +15,9 @@ logger = logging.getLogger("PIL.PngImagePlugin")
logger.setLevel(logging.WARNING)
def pack(data: bytes, metadata: dict):
package = {"data": bytes_to_string(data), "metadata": metadata}
return package
def unpack(package):
data, metadata = itemgetter("data", "metadata")(package)
return string_to_bytes(data), metadata
def bundle(data: bytes, metadata: dict):
package = {"data": data, "metadata": metadata}
return package
def normalize(itr: Iterable[Union[Tuple, Iterable]]) -> Iterable[Tuple]:
return chain.from_iterable(map(normalize_item, normalize_item(itr)))
def normalize_item(itm: Union[Tuple, Iterable]) -> Iterable:
if isinstance(itm, tuple):
return [itm]
elif isinstance(itm, Iterable):
return itm
else:
raise UnexpectedItemType("Encountered an item that could not be normalized to a list.")
def unpack_op_pack(operation) -> Callable[[Dict], List[Dict]]:
return compose(lstarlift(pack), normalize, star(operation), unpack)
def unpack_batchop_pack(operation) -> Callable[[List[Dict]], List[Dict]]:
return compose(lstarlift(pack), normalize, operation, lift(unpack))
def inspect(msg="ins", embed=False):
def inner(x):
if isinstance(x, Iterable) and not isinstance(x, dict):
x = list(x)
print(msg, x)
if embed:
import IPython
IPython.embed()
return x
return inner
def dispatch_methods(data):
return *repeat(requests.patch, ilen(data) - 1), requests.post
def post_partial(url, data: Iterable[bytes], metadata: dict):
"""Posts `data` to `url` and aggregates responses for each element of `data`."""
def send(method, data):
response = method(url, json=data)
response.raise_for_status()
@ -94,10 +38,51 @@ def post_partial(url, data: Iterable[bytes], metadata: dict):
return input_data_to_result_data(data, repeat(metadata))
def submit_and_pickup(url, data, metadata):
"""Posts `data` to `url` and aggregates responses for each element of `data` by querying a pickup endpoint.
Requires:
- server must provide 'submit' endpoint
- responses must provide 'pickup_endpoint' field in JSON payload; must be suffix after root path
- responses must have status code 206 for more responses coming and 204 for the last response already sent
"""
def post(package):
response = requests.post(f"{url}/submit", json=package)
response.raise_for_status()
return response
input_data_to_payload_stream = rcompose(
pack_data_and_metadata_for_rest_transfer,
lift(post),
lift(methodcaller("json")),
lift(itemgetter("pickup_endpoint")),
lift(lambda ep: f"{url}/{ep}"),
lift(stream_response_payloads),
flatten,
)
yield from input_data_to_payload_stream(data, repeat(metadata))
def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable):
yield from starmap(pack, zip(data, metadata))
def pack(data: bytes, metadata: dict):
package = {"data": bytes_to_string(data), "metadata": metadata}
return package
def unpack(package):
data, metadata = itemgetter("data", "metadata")(package)
return string_to_bytes(data), metadata
def dispatch_methods(data):
return *repeat(requests.patch, ilen(data) - 1), requests.post
def stream_response_payloads(endpoint):
def receive():
response = requests.get(endpoint)
@ -114,20 +99,40 @@ def stream_response_payloads(endpoint):
yield from payloads
def submit_and_pickup(url, data, metadata):
def post(package):
response = requests.post(f"{url}/submit", json=package)
response.raise_for_status()
return response
def unpack_op_pack(operation) -> Callable[[Dict], List[Dict]]:
return compose(lstarlift(pack), normalize, star(operation), unpack)
input_data_to_payload_stream = rcompose(
pack_data_and_metadata_for_rest_transfer,
lift(post),
lift(methodcaller("json")),
lift(itemgetter("pickup_endpoint")),
lift(lambda ep: f"{url}/{ep}"),
lift(stream_response_payloads),
flatten,
)
yield from input_data_to_payload_stream(data, repeat(metadata))
def unpack_batchop_pack(operation) -> Callable[[List[Dict]], List[Dict]]:
return compose(lstarlift(pack), normalize, operation, lift(unpack))
def normalize(itr: Iterable[Union[Tuple, Iterable]]) -> Iterable[Tuple]:
return chain.from_iterable(map(normalize_item, normalize_item(itr)))
def normalize_item(itm: Union[Tuple, Iterable]) -> Iterable:
if isinstance(itm, tuple):
return [itm]
elif isinstance(itm, Iterable):
return itm
else:
raise UnexpectedItemType("Encountered an item that could not be normalized to a list.")
def inspect(msg="ins", embed=False):
def inner(x):
if isinstance(x, Iterable) and not isinstance(x, dict):
x = list(x)
print(msg, x)
if embed:
import IPython
IPython.embed()
return x
return inner