diff --git a/pyinfra/server/__init__.py b/pyinfra/server/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/rest.py b/pyinfra/server/rest.py similarity index 85% rename from pyinfra/rest.py rename to pyinfra/server/rest.py index 2911eef..66c0a0d 100644 --- a/pyinfra/rest.py +++ b/pyinfra/server/rest.py @@ -18,14 +18,7 @@ logger.setLevel(logging.WARNING) def post_partial(url, data: Iterable[bytes], metadata: dict): """Posts `data` to `url` and aggregates responses for each element of `data`.""" - def send(method, data): - response = method(url, json=data) - response.raise_for_status() - return response - - dispatch_http_method_left_and_forward_data_right = parallel_map(dispatch_methods, lift(identity)) - send_data_with_method_to_analyzer = starlift(send) - extract_payload_from_responses = lift(methodcaller("json")) + send_data_with_method_to_analyzer = sender(f"{url}/submit") input_data_to_result_data = rcompose( pack_data_and_metadata_for_rest_transfer, @@ -47,15 +40,13 @@ def submit_and_pickup(url, data, metadata): - responses must have status code 206 for more responses coming and 204 for the last response already sent """ - def post(package): - response = requests.post(f"{url}/submit", json=package) - response.raise_for_status() - return response + send_data_with_method_to_analyzer = sender(f"{url}/submit") input_data_to_payload_stream = rcompose( pack_data_and_metadata_for_rest_transfer, - lift(post), - lift(methodcaller("json")), + dispatch_http_method_left_and_forward_data_right, + send_data_with_method_to_analyzer, + extract_payload_from_responses, lift(itemgetter("pickup_endpoint")), lift(lambda ep: f"{url}/{ep}"), lift(stream_response_payloads), @@ -65,6 +56,27 @@ def submit_and_pickup(url, data, metadata): yield from input_data_to_payload_stream(data, repeat(metadata)) +def sender(url): + return starlift(dispatcher(url)) + + +def dispatcher(endpoint): + def send(method, data): + response = method(endpoint, json=data) + response.raise_for_status() + return response + + return send + + +def dispatch_http_method_left_and_forward_data_right(*args): + return parallel_map(dispatch_methods, lift(identity))(*args) + + +def extract_payload_from_responses(payloads): + return map(methodcaller("json"), payloads) + + def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable): yield from starmap(pack, zip(data, metadata)) diff --git a/test/server.py b/pyinfra/server/server.py similarity index 92% rename from test/server.py rename to pyinfra/server/server.py index 849d7e6..694044c 100644 --- a/test/server.py +++ b/pyinfra/server/server.py @@ -30,10 +30,10 @@ def set_up_processing_server(process_fn): response_payload = process_fn(request, final=request.method == "POST") return jsonify(response_payload) - @app.route("/submit", methods=["POST"]) + @app.route("/submit", methods=["POST", "PATCH"]) def submit(): nonlocal response_payload_iter - response_payload_iter = chain(response_payload_iter, process_fn(request, final=True)) + response_payload_iter = chain(response_payload_iter, process_fn(request, final=request.method == "POST")) return jsonify({"pickup_endpoint": "pickup"}) @app.route("/pickup", methods=["GET"]) diff --git a/test/exploration_tests/partial_response_test.py b/test/exploration_tests/partial_response_test.py index af8747c..95a2677 100644 --- a/test/exploration_tests/partial_response_test.py +++ b/test/exploration_tests/partial_response_test.py @@ -1,7 +1,7 @@ import pytest from funcy import lmap -from pyinfra.rest import post_partial, unpack +from pyinfra.server.rest import post_partial, unpack @pytest.mark.parametrize("batched", [True, False]) diff --git a/test/exploration_tests/pickup_endpoint_test.py b/test/exploration_tests/pickup_endpoint_test.py index bd60de4..c73e0e8 100644 --- a/test/exploration_tests/pickup_endpoint_test.py +++ b/test/exploration_tests/pickup_endpoint_test.py @@ -1,7 +1,7 @@ import pytest from funcy import lmap -from pyinfra.rest import unpack, submit_and_pickup +from pyinfra.server.rest import unpack, submit_and_pickup @pytest.mark.parametrize("batched", [True, False]) diff --git a/test/fixtures/input.py b/test/fixtures/input.py index 0e8a70c..0c8ca60 100644 --- a/test/fixtures/input.py +++ b/test/fixtures/input.py @@ -5,7 +5,7 @@ import pytest from PIL import Image from funcy import lmap, compose, flatten -from pyinfra.rest import pack, normalize_item +from pyinfra.server.rest import pack, normalize_item from pyinfra.utils.func import star, lift from test.utils.image import image_to_bytes diff --git a/test/fixtures/server.py b/test/fixtures/server.py index 5c66599..0c99bbf 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -7,13 +7,13 @@ import fitz import pytest import requests from PIL import Image -from funcy import retry, compose, flatten +from funcy import retry, compose from waitress import serve -from pyinfra.rest import unpack_op_pack, unpack_batchop_pack, inspect +from pyinfra.server.rest import unpack_op_pack, unpack_batchop_pack from pyinfra.utils.buffer import bufferize from pyinfra.utils.func import llift, starlift -from test.server import set_up_processing_server +from pyinfra.server.server import set_up_processing_server from test.utils.image import image_to_bytes