pyinfra/test/exploration_tests/pickup_endpoint_test.py
Matthias Bisping 85d7ad52dc refactoring
2022-05-03 18:02:53 +02:00

69 lines
2.0 KiB
Python

from functools import partial
from itertools import starmap, repeat
from itertools import takewhile
from operator import itemgetter, methodcaller
from typing import Iterable, Tuple
import pytest
import requests
from funcy import repeatedly, flatten, rcompose, lmap
from pyinfra.rest import pack, unpack
from pyinfra.utils.func import lift
def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable):
yield from starmap(pack, zip(data, metadata))
def send_data_to_analyzer(url, payloads: Iterable[Tuple]):
def post(package):
response = requests.post(f"{url}/submit", json=package)
response.raise_for_status()
return response
yield from map(post, payloads)
def stream_response_payloads(endpoint):
def receive():
response = requests.get(endpoint)
return response
def more_is_coming(response):
return response.status_code == 206
def load_payload(response):
return response.json()
response_stream = takewhile(more_is_coming, repeatedly(receive))
payloads = map(load_payload, response_stream)
yield from payloads
def submit_and_pickup(url, data, metadata):
def post(package):
response = requests.post(f"{url}/submit", json=package)
response.raise_for_status()
return response
input_data_to_payload_stream = rcompose(
pack_data_and_metadata_for_rest_transfer,
lift(post),
lift(methodcaller("json")),
lift(itemgetter("pickup_endpoint")),
lift(lambda ep: f"{url}/{ep}"),
lift(stream_response_payloads),
flatten,
)
yield from input_data_to_payload_stream(data, repeat(metadata))
@pytest.mark.parametrize("batched", [True, False])
@pytest.mark.parametrize("item_type", ["string", "image", "pdf"])
def test_pickup_endpoint(url, input_data_items, metadata, operation, target_data_items, server_process):
output = lmap(unpack, submit_and_pickup(url, input_data_items, metadata))
assert output == lmap(unpack, target_data_items)