pyinfra/test/exploration_tests/pickup_endpoint_test.py
2022-05-03 17:52:13 +02:00

86 lines
2.6 KiB
Python

from functools import partial
from itertools import takewhile, starmap, repeat, chain
from itertools import takewhile
from operator import itemgetter, methodcaller
from typing import Iterable, Tuple
import pytest
import requests
from funcy import compose, rpartial, repeatedly, flatten, rcompose, take, curry, first, lmap
from pyinfra.rest import pack, inspect, unpack
from pyinfra.utils.func import lift
def pluck_pickup_endpoints(payloads):
return map(itemgetter("pickup_endpoint"), payloads)
def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable):
yield from starmap(pack, zip(data, metadata))
def send_data_to_analyzer(url, payloads: Iterable[Tuple]):
def post(package):
print("sending", package)
response = requests.post(f"{url}/submit", json=package)
response.raise_for_status()
return response
yield from map(post, payloads)
def extract_payload_from_responses(responses):
yield from flatten(map(methodcaller("json"), responses))
def submit_and_pickup(url, data, metadata):
def stream_response_payloads(endpoint):
def receive():
response = requests.get(f"{url}/{endpoint}")
print("text", response.status_code)
return response
def more_is_coming(response):
return response.status_code == 206
def load_payload(response):
print("received payload", response.json())
return response.json()
response_stream = takewhile(more_is_coming, repeatedly(receive))
payloads = map(load_payload, response_stream)
for item in payloads:
print("item", item)
yield item
# input_data_to_payload_stream = compose(
# stream_response_payloads,
# itemgetter("pickup_endpoint"),
# methodcaller("json"),
# post,
# rpartial(pack, metadata),
# )
print()
input_data_to_payload_stream = rcompose(
pack_data_and_metadata_for_rest_transfer,
partial(send_data_to_analyzer, url),
extract_payload_from_responses,
pluck_pickup_endpoints,
lift(stream_response_payloads),
flatten,
)
yield from input_data_to_payload_stream(data, repeat(metadata))
@pytest.mark.parametrize("batched", [True, False])
@pytest.mark.parametrize("item_type", ["string"])
def test_pickup_endpoint(url, input_data_items, metadata, operation, target_data_items, server_process):
output = lmap(unpack, submit_and_pickup(url, input_data_items, metadata))
print("exp", lmap(unpack, target_data_items))
print("out", output)
assert output == lmap(unpack, target_data_items)