import logging from functools import partial from itertools import chain from operator import methodcaller from typing import Iterable import pytest import requests from funcy import rcompose, compose, rpartial, identity, lmap from more_itertools import peekable from pyinfra.rest import pack, unpack, bundle from pyinfra.utils.func import lift, starlift, parallel_map, star logger = logging.getLogger("PIL.PngImagePlugin") logger.setLevel(logging.INFO) def post_partial(url, input_data: Iterable[bytes], metadata): def send(method, data): return method(url, json=data) def dispatch_method(input_data): def is_last_item(): try: input_data.peek() return False except StopIteration: return True input_data = peekable(input_data) for _ in input_data: method = requests.post if is_last_item() else requests.patch yield method pack_data_and_metadata_for_rest_transfer = lift(rpartial(pack, metadata)) dispatch_http_method_left_and_forward_data_right = parallel_map(dispatch_method, lift(identity)) send_data_with_method_to_analyzer = starlift(send) extract_payload_from_responses = lift(methodcaller("json")) flatten_buffered_payloads = chain.from_iterable interpret_payloads = lift(compose(star(bundle), unpack)) input_data_to_result_data = rcompose( pack_data_and_metadata_for_rest_transfer, dispatch_http_method_left_and_forward_data_right, send_data_with_method_to_analyzer, extract_payload_from_responses, flatten_buffered_payloads, interpret_payloads, ) return input_data_to_result_data(input_data) @pytest.mark.parametrize("item_type", ["string", "image"]) def test_sending_partial_request(url, data_items, metadata, operation, server_process): expected = lmap(compose(star(bundle), partial(operation, metadata=metadata)), data_items) output = list(post_partial(f"{url}/process", data_items, metadata)) assert output == expected