refactoring

This commit is contained in:
Matthias Bisping 2022-05-04 10:37:46 +02:00
parent 7d8659f257
commit cf0a877569
7 changed files with 34 additions and 22 deletions

View File

View File

@ -18,14 +18,7 @@ logger.setLevel(logging.WARNING)
def post_partial(url, data: Iterable[bytes], metadata: dict): def post_partial(url, data: Iterable[bytes], metadata: dict):
"""Posts `data` to `url` and aggregates responses for each element of `data`.""" """Posts `data` to `url` and aggregates responses for each element of `data`."""
def send(method, data): send_data_with_method_to_analyzer = sender(f"{url}/submit")
response = method(url, json=data)
response.raise_for_status()
return response
dispatch_http_method_left_and_forward_data_right = parallel_map(dispatch_methods, lift(identity))
send_data_with_method_to_analyzer = starlift(send)
extract_payload_from_responses = lift(methodcaller("json"))
input_data_to_result_data = rcompose( input_data_to_result_data = rcompose(
pack_data_and_metadata_for_rest_transfer, pack_data_and_metadata_for_rest_transfer,
@ -47,15 +40,13 @@ def submit_and_pickup(url, data, metadata):
- responses must have status code 206 for more responses coming and 204 for the last response already sent - responses must have status code 206 for more responses coming and 204 for the last response already sent
""" """
def post(package): send_data_with_method_to_analyzer = sender(f"{url}/submit")
response = requests.post(f"{url}/submit", json=package)
response.raise_for_status()
return response
input_data_to_payload_stream = rcompose( input_data_to_payload_stream = rcompose(
pack_data_and_metadata_for_rest_transfer, pack_data_and_metadata_for_rest_transfer,
lift(post), dispatch_http_method_left_and_forward_data_right,
lift(methodcaller("json")), send_data_with_method_to_analyzer,
extract_payload_from_responses,
lift(itemgetter("pickup_endpoint")), lift(itemgetter("pickup_endpoint")),
lift(lambda ep: f"{url}/{ep}"), lift(lambda ep: f"{url}/{ep}"),
lift(stream_response_payloads), lift(stream_response_payloads),
@ -65,6 +56,27 @@ def submit_and_pickup(url, data, metadata):
yield from input_data_to_payload_stream(data, repeat(metadata)) yield from input_data_to_payload_stream(data, repeat(metadata))
def sender(url):
return starlift(dispatcher(url))
def dispatcher(endpoint):
def send(method, data):
response = method(endpoint, json=data)
response.raise_for_status()
return response
return send
def dispatch_http_method_left_and_forward_data_right(*args):
return parallel_map(dispatch_methods, lift(identity))(*args)
def extract_payload_from_responses(payloads):
return map(methodcaller("json"), payloads)
def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable): def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable):
yield from starmap(pack, zip(data, metadata)) yield from starmap(pack, zip(data, metadata))

View File

@ -30,10 +30,10 @@ def set_up_processing_server(process_fn):
response_payload = process_fn(request, final=request.method == "POST") response_payload = process_fn(request, final=request.method == "POST")
return jsonify(response_payload) return jsonify(response_payload)
@app.route("/submit", methods=["POST"]) @app.route("/submit", methods=["POST", "PATCH"])
def submit(): def submit():
nonlocal response_payload_iter nonlocal response_payload_iter
response_payload_iter = chain(response_payload_iter, process_fn(request, final=True)) response_payload_iter = chain(response_payload_iter, process_fn(request, final=request.method == "POST"))
return jsonify({"pickup_endpoint": "pickup"}) return jsonify({"pickup_endpoint": "pickup"})
@app.route("/pickup", methods=["GET"]) @app.route("/pickup", methods=["GET"])

View File

@ -1,7 +1,7 @@
import pytest import pytest
from funcy import lmap from funcy import lmap
from pyinfra.rest import post_partial, unpack from pyinfra.server.rest import post_partial, unpack
@pytest.mark.parametrize("batched", [True, False]) @pytest.mark.parametrize("batched", [True, False])

View File

@ -1,7 +1,7 @@
import pytest import pytest
from funcy import lmap from funcy import lmap
from pyinfra.rest import unpack, submit_and_pickup from pyinfra.server.rest import unpack, submit_and_pickup
@pytest.mark.parametrize("batched", [True, False]) @pytest.mark.parametrize("batched", [True, False])

View File

@ -5,7 +5,7 @@ import pytest
from PIL import Image from PIL import Image
from funcy import lmap, compose, flatten from funcy import lmap, compose, flatten
from pyinfra.rest import pack, normalize_item from pyinfra.server.rest import pack, normalize_item
from pyinfra.utils.func import star, lift from pyinfra.utils.func import star, lift
from test.utils.image import image_to_bytes from test.utils.image import image_to_bytes

View File

@ -7,13 +7,13 @@ import fitz
import pytest import pytest
import requests import requests
from PIL import Image from PIL import Image
from funcy import retry, compose, flatten from funcy import retry, compose
from waitress import serve from waitress import serve
from pyinfra.rest import unpack_op_pack, unpack_batchop_pack, inspect from pyinfra.server.rest import unpack_op_pack, unpack_batchop_pack
from pyinfra.utils.buffer import bufferize from pyinfra.utils.buffer import bufferize
from pyinfra.utils.func import llift, starlift from pyinfra.utils.func import llift, starlift
from test.server import set_up_processing_server from pyinfra.server.server import set_up_processing_server
from test.utils.image import image_to_bytes from test.utils.image import image_to_bytes