pyinfra/test/exploration_tests/partial_response_test.py

62 lines
2.0 KiB
Python

import logging
from functools import partial
from itertools import chain
from operator import methodcaller
from typing import Iterable
import pytest
import requests
from funcy import rcompose, compose, rpartial, identity, lmap
from more_itertools import peekable
from pyinfra.rest import pack, unpack, bundle
from pyinfra.utils.func import lift, starlift, parallel_map, star
logger = logging.getLogger("PIL.PngImagePlugin")
logger.setLevel(logging.INFO)
def post_partial(url, input_data: Iterable[bytes], metadata):
def send(method, data):
return method(url, json=data)
def dispatch_method(input_data):
def is_last_item():
try:
input_data.peek()
return False
except StopIteration:
return True
input_data = peekable(input_data)
for _ in input_data:
method = requests.post if is_last_item() else requests.patch
yield method
pack_data_and_metadata_for_rest_transfer = lift(rpartial(pack, metadata))
dispatch_http_method_left_and_forward_data_right = parallel_map(dispatch_method, lift(identity))
send_data_with_method_to_analyzer = starlift(send)
extract_payload_from_responses = lift(methodcaller("json"))
flatten_buffered_payloads = chain.from_iterable
interpret_payloads = lift(compose(star(bundle), unpack))
input_data_to_result_data = rcompose(
pack_data_and_metadata_for_rest_transfer,
dispatch_http_method_left_and_forward_data_right,
send_data_with_method_to_analyzer,
extract_payload_from_responses,
flatten_buffered_payloads,
interpret_payloads,
)
return input_data_to_result_data(input_data)
@pytest.mark.parametrize("item_type", ["string", "image"])
def test_sending_partial_request(url, data_items, metadata, operation, server_process):
expected = lmap(compose(star(bundle), partial(operation, metadata=metadata)), data_items)
output = list(post_partial(f"{url}/process", data_items, metadata))
assert output == expected