From 7419612c212b2637986b81f5f69674aff75b4372 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Tue, 26 Apr 2022 19:54:37 +0200 Subject: [PATCH] partial request by manual receiver buffering V1 --- .../partial_response_test.py | 13 +++++- test/fixtures/input.py | 4 +- test/fixtures/server.py | 40 ++++++++++++++----- 3 files changed, 44 insertions(+), 13 deletions(-) diff --git a/test/exploration_tests/partial_response_test.py b/test/exploration_tests/partial_response_test.py index 9f51f95..1a61132 100644 --- a/test/exploration_tests/partial_response_test.py +++ b/test/exploration_tests/partial_response_test.py @@ -1,8 +1,9 @@ import json -from itertools import repeat, starmap +from itertools import repeat, starmap, chain import pytest import requests +from funcy import pluck from test.utils.server import bytes_to_string, string_to_bytes @@ -14,9 +15,17 @@ def test_sending_partial_request(url, data_items, metadata): package = json.dumps(package).encode() return package + def post(package): + final = str(0 if package else 1) + return requests.post(f"{url}/process", data=package, headers={"final": final}) + packages = starmap(pack, zip(repeat(metadata), data_items)) - requests.post(f"{url}/process", data=packages, stream=True) + responses = map(post, chain(packages, [""])) + payloads = (json.loads(r.json()) for r in responses) + data = map(string_to_bytes, chain.from_iterable(pluck("data", payloads))) + + assert list(data) == [b"CONTENT"] * 7 @pytest.fixture diff --git a/test/fixtures/input.py b/test/fixtures/input.py index cadc5d6..6a8670a 100644 --- a/test/fixtures/input.py +++ b/test/fixtures/input.py @@ -1,3 +1,5 @@ +from itertools import repeat + import pytest @@ -12,4 +14,4 @@ def data(data_type, pdf): @pytest.fixture def data_items(item_type): if item_type == "string": - return iter([b"content"] * 4) + return repeat(b"content", 7) diff --git a/test/fixtures/server.py b/test/fixtures/server.py index b33d8e6..07a95d7 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -1,13 +1,15 @@ +import json import socket from multiprocessing import Process +import flask import pytest import requests from funcy import retry from waitress import serve from test.server import set_up_processing_server -from test.utils.server import bytes_to_string +from test.utils.server import bytes_to_string, string_to_bytes @pytest.fixture @@ -39,17 +41,35 @@ def server(processor_fn): @pytest.fixture def processor_fn(item_type, data_items): if item_type == "string": - return make_string_processor(data_items) + return make_string_processor() -def make_string_processor(data_items): - def processor_fn(payload): - print(payload.stream.read()) - # def parse(package) - # payload = json.load(payload) - # data = string_to_bytes(payload["data"]) - # # response_payload = {"metadata_type": str(type(metadata)), "data_type": str(type(data))} - # return response_payload +def make_string_processor(): + def processor_fn(payload: flask.Request, buffersize=3): + + final = int(payload.headers["final"]) + + if final or len(buffer) == buffersize: + + result = [bytes_to_string(itm.decode().upper().encode()) for itm in buffer] + + buffer.clear() + + else: + result = [] + + if not final: + + payload = json.loads(payload.data.decode()) + data = string_to_bytes(payload["data"]) + buffer.append(data) + + response_payload = {"data": result} + response = json.dumps(response_payload) + + return response + + buffer = [] return processor_fn