import logging from itertools import chain, starmap, tee from operator import methodcaller, itemgetter from typing import Iterable import pytest import requests from funcy import curry, rcompose, compose, lmap, rpartial, identity from more_itertools import peekable from pyinfra.rest import pack from test.utils.server import string_to_bytes logger = logging.getLogger("PIL.PngImagePlugin") logger.setLevel(logging.INFO) def lift(fn): return curry(map)(fn) def starlift(fn): return curry(starmap)(fn) def parallel(*fs): return lambda *args: (f(a) for f, a in zip(fs, args)) def star(f): return lambda x: f(*x) def duplicate_stream_and_apply(f1, f2): return compose(star(parallel(f1, f2)), tee) def parallel_map(f1, f2): """Applies functions to a stream in parallel and yields a stream of tuples: parallel_map :: a -> b, a -> c -> [a] -> [(b, c)] """ return compose(star(zip), duplicate_stream_and_apply(f1, f2)) def post_partial(url, input_data: Iterable[bytes], metadata): def send(method, data): return method(url, data=data) def dispatch_method(input_data): def is_last_item(): try: input_data.peek() return False except StopIteration: return True input_data = peekable(input_data) for _ in input_data: method = requests.post if is_last_item() else requests.patch yield method pack_data_and_metadata_for_rest_transfer = lift(rpartial(pack, metadata)) dispatch_http_method_left_and_forward_data_right = parallel_map(dispatch_method, lift(identity)) send_data_with_method_to_analyzer = starlift(send) extract_data_from_responses = lift(compose(itemgetter("data"), methodcaller("json"))) flatten_buffered_payloads = chain.from_iterable interpret_payloads = lift(string_to_bytes) input_data_to_result_data = rcompose( pack_data_and_metadata_for_rest_transfer, dispatch_http_method_left_and_forward_data_right, send_data_with_method_to_analyzer, extract_data_from_responses, flatten_buffered_payloads, interpret_payloads, ) return input_data_to_result_data(input_data) @pytest.mark.parametrize("item_type", ["string", "image"]) def test_sending_partial_request(url, data_items, metadata, operation, server_process): expected = lmap(operation, data_items) output = list(post_partial(f"{url}/process", data_items, metadata)) assert output == expected