From 3b0d0868b9b2c955b7698d9e8f920b9f9da78d30 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Thu, 28 Apr 2022 13:01:01 +0200 Subject: [PATCH] refactoring: method dispatch via peekable rather than special empty data request --- pyinfra/rest.py | 38 ++++++++++++- .../partial_response_test.py | 54 ++++++++++++++++--- test/fixtures/server.py | 37 +------------ test/server.py | 10 ++-- 4 files changed, 90 insertions(+), 49 deletions(-) diff --git a/pyinfra/rest.py b/pyinfra/rest.py index f2c90fe..18d6a57 100644 --- a/pyinfra/rest.py +++ b/pyinfra/rest.py @@ -1,9 +1,43 @@ import json +from _operator import itemgetter +from collections import deque -from test.utils.server import bytes_to_string +import flask +from funcy import repeatedly, lmap, compose + +from test.utils.server import bytes_to_string, string_to_bytes def pack(data: bytes, metadata: dict): package = {"data": bytes_to_string(data), "metadata": metadata} package = json.dumps(package).encode() - return package \ No newline at end of file + return package + + +def make_processor(operation, buffer_size=3): + def processor_fn(request: flask.Request, final=False): + + data = get_data_from_request(request) + + buffer.append(data) + + items = repeatedly(buffer.popleft, len(buffer)) if consume_buffer_now(buffer, data, final) else [] + + result = lmap(compose(bytes_to_string, operation), items) + + response_payload = {"data": result} + return response_payload + + def consume_buffer_now(buffer, data, final): + current_buffer_size = len(buffer) + assert current_buffer_size <= buffer_size + + buffer_is_full = current_buffer_size >= buffer_size + last_request_has_been_sent = not data + return final or buffer_is_full or last_request_has_been_sent + + get_data_from_request = compose(string_to_bytes, itemgetter("data"), json.loads, lambda p: p.data.decode()) + + buffer = deque() + + return processor_fn diff --git a/test/exploration_tests/partial_response_test.py b/test/exploration_tests/partial_response_test.py index 70f5b88..bcb6841 100644 --- a/test/exploration_tests/partial_response_test.py +++ b/test/exploration_tests/partial_response_test.py @@ -1,11 +1,12 @@ import logging -from itertools import chain +from itertools import chain, starmap, tee from operator import methodcaller, itemgetter from typing import Iterable import pytest import requests -from funcy import curry, rcompose, compose, lmap, rpartial +from funcy import curry, rcompose, compose, lmap, rpartial, identity +from more_itertools import peekable from pyinfra.rest import pack from test.utils.server import string_to_bytes @@ -18,25 +19,64 @@ def lift(fn): return curry(map)(fn) +def starlift(fn): + return curry(starmap)(fn) + + +def parallel(*fs): + return lambda *args: (f(a) for f, a in zip(fs, args)) + + +def star(f): + return lambda x: f(*x) + + +def duplicate_stream_and_apply(f1, f2): + return compose(star(parallel(f1, f2)), tee) + + +def parallel_map(f1, f2): + """Applies functions to a stream in parallel and yields a stream of tuples: + parallel_map :: a -> b, a -> c -> [a] -> [(b, c)] + """ + return compose(star(zip), duplicate_stream_and_apply(f1, f2)) + + def post_partial(url, input_data: Iterable[bytes], metadata): - def post(data): - return requests.post(url, data=data) + def send(method, data): + return method(url, data=data) + + def dispatch_method(input_data): + def is_last_item(): + try: + input_data.peek() + return False + except StopIteration: + return True + + input_data = peekable(input_data) + + for _ in input_data: + method = requests.post if is_last_item() else requests.patch + yield method pack_data_and_metadata_for_rest_transfer = lift(rpartial(pack, metadata)) - send_packages_to_analyzer_and_receive_responses = lift(post) + dispatch_http_method_left_and_forward_data_right = parallel_map(dispatch_method, lift(identity)) + send_data_with_method_to_analyzer = starlift(send) extract_data_from_responses = lift(compose(itemgetter("data"), methodcaller("json"))) flatten_buffered_payloads = chain.from_iterable interpret_payloads = lift(string_to_bytes) input_data_to_result_data = rcompose( pack_data_and_metadata_for_rest_transfer, - send_packages_to_analyzer_and_receive_responses, + dispatch_http_method_left_and_forward_data_right, + send_data_with_method_to_analyzer, extract_data_from_responses, flatten_buffered_payloads, interpret_payloads, ) - return input_data_to_result_data((*input_data, b"")) + return input_data_to_result_data(input_data) @pytest.mark.parametrize("item_type", ["string", "image"]) diff --git a/test/fixtures/server.py b/test/fixtures/server.py index da1ba6e..c374130 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -1,21 +1,17 @@ import io -import json import socket -from collections import deque from multiprocessing import Process -from operator import itemgetter import fitz -import flask import pytest import requests from PIL import Image -from funcy import retry, compose, repeatedly, lmap +from funcy import retry from waitress import serve +from pyinfra.rest import make_processor from test.server import set_up_processing_server from test.utils.image import image_to_bytes -from test.utils.server import bytes_to_string, string_to_bytes @pytest.fixture @@ -79,35 +75,6 @@ def buffer_size(request): return request.param -def make_processor(operation, buffer_size=3): - def processor_fn(request: flask.Request): - - data = get_data_from_request(request) - - items = repeatedly(buffer.popleft, len(buffer)) if consume_buffer_now(buffer, data) else [] - - result = lmap(compose(bytes_to_string, operation), items) - - buffer.append(data) - - response_payload = {"data": result} - return response_payload - - def consume_buffer_now(buffer, data): - current_buffer_size = len(buffer) - assert current_buffer_size <= buffer_size - - buffer_is_full = current_buffer_size >= buffer_size - last_request_has_been_sent = not data - return buffer_is_full or last_request_has_been_sent - - get_data_from_request = compose(string_to_bytes, itemgetter("data"), json.loads, lambda p: p.data.decode()) - - buffer = deque() - - return processor_fn - - @pytest.fixture def host_and_port(host, port): return {"host": host, "port": port} diff --git a/test/server.py b/test/server.py index 327aa06..80dfb89 100644 --- a/test/server.py +++ b/test/server.py @@ -12,22 +12,22 @@ def set_up_processing_server(process_fn): resp.status_code = 200 return resp - @app.route("/process", methods=["POST"]) + @app.route("/process", methods=["POST", "PATCH"]) def process(): - response_payload = process_fn(request) + response_payload = process_fn(request, final=request.method == "POST") return jsonify(response_payload) @app.route("/submit", methods=["POST"]) def submit(): nonlocal response_payload_iter - response_payload_iter = peekable(iter(process_fn(request))) + response_payload_iter = peekable(process_fn(request)) return jsonify({"pickup_endpoint": "pickup"}) @app.route("/pickup", methods=["GET"]) def pickup(): - + print([*response_payload_iter]) response_payload = next(response_payload_iter) - print(response_payload) + print("pl", response_payload) resp = jsonify({"a": 1}) resp.status_code = 200