From 531ff8d3e06ecb33b0af7dadcca67e2da86e3e4b Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Wed, 4 May 2022 16:57:08 +0200 Subject: [PATCH] refactoring; fixed sender --- pyinfra/server/sender/senders/rest.py | 9 +- pyinfra/server/utils.py | 91 +++++++++++++++++++ .../partial_response_test.py | 3 +- .../exploration_tests/pickup_endpoint_test.py | 3 +- test/fixtures/input.py | 9 +- test/fixtures/server.py | 2 +- test/unit_tests/rest/packer_test.py | 2 +- test/unit_tests/rest/sender_test.py | 14 +-- test/unit_tests/rest/utils.py | 2 +- 9 files changed, 117 insertions(+), 18 deletions(-) diff --git a/pyinfra/server/sender/senders/rest.py b/pyinfra/server/sender/senders/rest.py index ba6a485..53ecbd1 100644 --- a/pyinfra/server/sender/senders/rest.py +++ b/pyinfra/server/sender/senders/rest.py @@ -17,14 +17,13 @@ def has_next(peekable_iter): class RestSender(Sender): def __init__(self, endpoint): self.endpoint = endpoint - print(endpoint) def __call__(self, packages: Iterable[dict]): packages = peekable(packages) for package in packages: - if has_next(packages): - yield requests.patch(self.endpoint, json=package) - else: - yield requests.post(self.endpoint, json=package) + method = requests.patch if has_next(packages) else requests.post + response = method(self.endpoint, json=package) + response.raise_for_status() + yield response diff --git a/pyinfra/server/utils.py b/pyinfra/server/utils.py index e69de29..ebd9606 100644 --- a/pyinfra/server/utils.py +++ b/pyinfra/server/utils.py @@ -0,0 +1,91 @@ +from _operator import methodcaller, itemgetter +from itertools import takewhile, starmap, repeat, chain +from typing import Iterable, Callable, Dict, List, Union, Tuple + +import requests +from funcy import repeatedly, identity, ilen, compose + +from pyinfra.exceptions import UnexpectedItemType +from pyinfra.utils.func import parallel_map, lift, lstarlift, star +from test.utils.server import bytes_to_string, string_to_bytes + + +def stream_response_payloads(endpoint): + def receive(): + response = requests.get(endpoint) + return response + + def more_is_coming(response): + return response.status_code == 206 + + def load_payload(response): + return response.json() + + response_stream = takewhile(more_is_coming, repeatedly(receive)) + payloads = map(load_payload, response_stream) + yield from payloads + + +def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable): + yield from starmap(pack, zip(data, metadata)) + + +def dispatch_http_method_left_and_forward_data_right(*args): + return parallel_map(dispatch_methods, lift(identity))(*args) + + +def extract_payload_from_responses(payloads): + return map(methodcaller("json"), payloads) + + +def pack(data: bytes, metadata: dict): + package = {"data": bytes_to_string(data), "metadata": metadata} + return package + + +def unpack(package): + data, metadata = itemgetter("data", "metadata")(package) + return string_to_bytes(data), metadata + + +def dispatch_methods(data): + return *repeat(requests.patch, ilen(data) - 1), requests.post + + +def unpack_op_pack(operation) -> Callable[[Dict], List[Dict]]: + return compose(lstarlift(pack), normalize, star(operation), unpack) + + +def unpack_batchop_pack(operation) -> Callable[[List[Dict]], List[Dict]]: + return compose(lstarlift(pack), normalize, operation, lift(unpack)) + + +def normalize(itr: Iterable[Union[Tuple, Iterable]]) -> Iterable[Tuple]: + return chain.from_iterable(map(normalize_item, normalize_item(itr))) + + +def normalize_item(itm: Union[Tuple, Iterable]) -> Iterable: + if isinstance(itm, tuple): + return [itm] + elif isinstance(itm, Iterable): + return itm + else: + raise UnexpectedItemType("Encountered an item that could not be normalized to a list.") + + +def inspect(msg="ins", embed=False): + def inner(x): + + if isinstance(x, Iterable) and not isinstance(x, dict): + x = list(x) + + print(msg, x) + + if embed: + import IPython + + IPython.embed() + + return x + + return inner diff --git a/test/exploration_tests/partial_response_test.py b/test/exploration_tests/partial_response_test.py index 0ed71a1..560175f 100644 --- a/test/exploration_tests/partial_response_test.py +++ b/test/exploration_tests/partial_response_test.py @@ -1,7 +1,8 @@ import pytest from funcy import lmap -from pyinfra.server.rest import process_eagerly, unpack +from pyinfra.server.rest import process_eagerly +from pyinfra.server.utils import unpack @pytest.mark.parametrize("batched", [True, False]) diff --git a/test/exploration_tests/pickup_endpoint_test.py b/test/exploration_tests/pickup_endpoint_test.py index d78e0d5..b5b2c9e 100644 --- a/test/exploration_tests/pickup_endpoint_test.py +++ b/test/exploration_tests/pickup_endpoint_test.py @@ -1,7 +1,8 @@ import pytest from funcy import lmap -from pyinfra.server.rest import unpack, process_lazily +from pyinfra.server.rest import process_lazily +from pyinfra.server.utils import unpack @pytest.mark.parametrize("batched", [True, False]) diff --git a/test/fixtures/input.py b/test/fixtures/input.py index 10247f4..abb3282 100644 --- a/test/fixtures/input.py +++ b/test/fixtures/input.py @@ -5,8 +5,8 @@ import pytest from PIL import Image from funcy import lmap, compose, flatten -from pyinfra.server.rest import pack, normalize_item, unpack -from pyinfra.utils.func import star, lift +from pyinfra.server.utils import pack, unpack, normalize_item +from pyinfra.utils.func import star, lift, lstarlift from test.utils.image import image_to_bytes @@ -67,3 +67,8 @@ def images(n_items): @pytest.fixture def metadata(n_items): return list(repeat({"dummy": True}, n_items)) + + +@pytest.fixture +def packages(input_data_items, metadata): + return lstarlift(pack)(zip(input_data_items, metadata)) diff --git a/test/fixtures/server.py b/test/fixtures/server.py index 0c99bbf..81255bb 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -10,7 +10,7 @@ from PIL import Image from funcy import retry, compose from waitress import serve -from pyinfra.server.rest import unpack_op_pack, unpack_batchop_pack +from pyinfra.server.utils import unpack_op_pack, unpack_batchop_pack from pyinfra.utils.buffer import bufferize from pyinfra.utils.func import llift, starlift from pyinfra.server.server import set_up_processing_server diff --git a/test/unit_tests/rest/packer_test.py b/test/unit_tests/rest/packer_test.py index 5f6098b..de389a2 100644 --- a/test/unit_tests/rest/packer_test.py +++ b/test/unit_tests/rest/packer_test.py @@ -2,7 +2,7 @@ import pytest from pyinfra.server.packer.packers.identity import IdentityPacker, bundle from pyinfra.server.packer.packers.rest import RestPacker -from pyinfra.server.rest import pack +from pyinfra.server.utils import pack from pyinfra.utils.func import lstarlift diff --git a/test/unit_tests/rest/sender_test.py b/test/unit_tests/rest/sender_test.py index 2fa0d3d..c6a6d07 100644 --- a/test/unit_tests/rest/sender_test.py +++ b/test/unit_tests/rest/sender_test.py @@ -1,8 +1,10 @@ -def test_identity_sender(data, metadata): - packer = IdentityPacker() - assert list(packer(data, metadata)) == lstarlift(bundle)(zip(data, metadata)) +import pytest + +from pyinfra.server.sender.senders.rest import RestSender -def test_rest_packer(data, metadata): - packer = RestPacker() - assert list(packer(data, metadata)) == lstarlift(pack)(zip(data, metadata)) +@pytest.mark.parametrize("batched", [True, False]) +@pytest.mark.parametrize("item_type", ["string", "image", "pdf"]) +def test_rest_packer(url, packages, server_process): + sender = RestSender(f"{url}/process") + assert all([r.status_code == 200 for r in sender(packages)]) diff --git a/test/unit_tests/rest/utils.py b/test/unit_tests/rest/utils.py index ce24fb6..e1f8846 100644 --- a/test/unit_tests/rest/utils.py +++ b/test/unit_tests/rest/utils.py @@ -2,7 +2,7 @@ import pytest from funcy import compose, lzip from pyinfra.server.packer.packers.identity import bundle -from pyinfra.server.rest import unpack, pack +from pyinfra.server.utils import pack, unpack from pyinfra.utils.func import lstarlift from test.utils.server import bytes_to_string