pyinfra/test/exploration_tests/partial_response_test.py

88 lines
2.5 KiB
Python

import logging
from itertools import chain, starmap, tee
from operator import methodcaller, itemgetter
from typing import Iterable
import pytest
import requests
from funcy import curry, rcompose, compose, lmap, rpartial, identity
from more_itertools import peekable
from pyinfra.rest import pack
from test.utils.server import string_to_bytes
logger = logging.getLogger("PIL.PngImagePlugin")
logger.setLevel(logging.INFO)
def lift(fn):
return curry(map)(fn)
def starlift(fn):
return curry(starmap)(fn)
def parallel(*fs):
return lambda *args: (f(a) for f, a in zip(fs, args))
def star(f):
return lambda x: f(*x)
def duplicate_stream_and_apply(f1, f2):
return compose(star(parallel(f1, f2)), tee)
def parallel_map(f1, f2):
"""Applies functions to a stream in parallel and yields a stream of tuples:
parallel_map :: a -> b, a -> c -> [a] -> [(b, c)]
"""
return compose(star(zip), duplicate_stream_and_apply(f1, f2))
def post_partial(url, input_data: Iterable[bytes], metadata):
def send(method, data):
return method(url, data=data)
def dispatch_method(input_data):
def is_last_item():
try:
input_data.peek()
return False
except StopIteration:
return True
input_data = peekable(input_data)
for _ in input_data:
method = requests.post if is_last_item() else requests.patch
yield method
pack_data_and_metadata_for_rest_transfer = lift(rpartial(pack, metadata))
dispatch_http_method_left_and_forward_data_right = parallel_map(dispatch_method, lift(identity))
send_data_with_method_to_analyzer = starlift(send)
extract_data_from_responses = lift(compose(itemgetter("data"), methodcaller("json")))
flatten_buffered_payloads = chain.from_iterable
interpret_payloads = lift(string_to_bytes)
input_data_to_result_data = rcompose(
pack_data_and_metadata_for_rest_transfer,
dispatch_http_method_left_and_forward_data_right,
send_data_with_method_to_analyzer,
extract_data_from_responses,
flatten_buffered_payloads,
interpret_payloads,
)
return input_data_to_result_data(input_data)
@pytest.mark.parametrize("item_type", ["string", "image"])
def test_sending_partial_request(url, data_items, metadata, operation, server_process):
expected = lmap(operation, data_items)
output = list(post_partial(f"{url}/process", data_items, metadata))
assert output == expected