diff --git a/test/exploration_tests/pickup_endpoint_test.py b/test/exploration_tests/pickup_endpoint_test.py index 2631ad5..925a2e5 100644 --- a/test/exploration_tests/pickup_endpoint_test.py +++ b/test/exploration_tests/pickup_endpoint_test.py @@ -1,24 +1,85 @@ -import json -from operator import itemgetter, methodcaller, attrgetter +from functools import partial +from itertools import takewhile, starmap, repeat, chain +from itertools import takewhile +from operator import itemgetter, methodcaller +from typing import Iterable, Tuple import pytest import requests -from funcy import compose, rpartial +from funcy import compose, rpartial, repeatedly, flatten, rcompose, take, curry, first, lmap -from pyinfra.rest import pack +from pyinfra.rest import pack, inspect, unpack +from pyinfra.utils.func import lift -@pytest.mark.parametrize("item_type", ["pdf"]) -def test_pickup_endpoint(url, server_process, pdf, metadata, operation): +def pluck_pickup_endpoints(payloads): + return map(itemgetter("pickup_endpoint"), payloads) + + +def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable): + yield from starmap(pack, zip(data, metadata)) + + +def send_data_to_analyzer(url, payloads: Iterable[Tuple]): def post(package): - return requests.post(f"{url}/submit", json=package) + print("sending", package) + response = requests.post(f"{url}/submit", json=package) + response.raise_for_status() + return response - pickup = compose( - itemgetter("pickup_endpoint"), - methodcaller("json"), - post, - rpartial(pack, metadata), - )(pdf) + yield from map(post, payloads) - # while True: - # response = requests.get(f"{url}/{pickup}") + +def extract_payload_from_responses(responses): + yield from flatten(map(methodcaller("json"), responses)) + + +def submit_and_pickup(url, data, metadata): + def stream_response_payloads(endpoint): + def receive(): + response = requests.get(f"{url}/{endpoint}") + print("text", response.status_code) + return response + + def more_is_coming(response): + return response.status_code == 206 + + def load_payload(response): + print("received payload", response.json()) + return response.json() + + response_stream = takewhile(more_is_coming, repeatedly(receive)) + payloads = map(load_payload, response_stream) + for item in payloads: + print("item", item) + yield item + + # input_data_to_payload_stream = compose( + # stream_response_payloads, + # itemgetter("pickup_endpoint"), + # methodcaller("json"), + # post, + # rpartial(pack, metadata), + # ) + + print() + + input_data_to_payload_stream = rcompose( + pack_data_and_metadata_for_rest_transfer, + partial(send_data_to_analyzer, url), + extract_payload_from_responses, + pluck_pickup_endpoints, + lift(stream_response_payloads), + flatten, + ) + + yield from input_data_to_payload_stream(data, repeat(metadata)) + + +@pytest.mark.parametrize("batched", [True, False]) +@pytest.mark.parametrize("item_type", ["string"]) +def test_pickup_endpoint(url, input_data_items, metadata, operation, target_data_items, server_process): + output = lmap(unpack, submit_and_pickup(url, input_data_items, metadata)) + print("exp", lmap(unpack, target_data_items)) + print("out", output) + assert output == lmap(unpack, target_data_items) diff --git a/test/server.py b/test/server.py index 80dfb89..a4fb914 100644 --- a/test/server.py +++ b/test/server.py @@ -1,10 +1,24 @@ +import logging +import traceback +from itertools import chain + from flask import Flask, jsonify, request -from more_itertools import peekable +from funcy import flatten + +logger = logging.getLogger() + + +class Nothing: + pass + + +def has_next(peekable_iter): + return peekable_iter.peek(Nothing) == Nothing def set_up_processing_server(process_fn): app = Flask(__name__) - response_payload_iter = None + response_payload_iter = [] @app.route("/ready", methods=["GET"]) def ready(): @@ -20,17 +34,24 @@ def set_up_processing_server(process_fn): @app.route("/submit", methods=["POST"]) def submit(): nonlocal response_payload_iter - response_payload_iter = peekable(process_fn(request)) + response_payload_iter = chain(response_payload_iter, process_fn(request, final=True)) return jsonify({"pickup_endpoint": "pickup"}) @app.route("/pickup", methods=["GET"]) def pickup(): - print([*response_payload_iter]) - response_payload = next(response_payload_iter) - print("pl", response_payload) - resp = jsonify({"a": 1}) - resp.status_code = 200 + try: + response_payload = next(response_payload_iter) + print("response_payload", response_payload) + resp = jsonify(response_payload) + resp.status_code = 206 + + except StopIteration: + resp = jsonify("No more items left") + resp.status_code = 204 + + except Exception as err: + logger.error(traceback.format_exc()) return resp