From 14d83abd7266f3a32506f0715522334b2b65d91d Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Wed, 4 May 2022 16:46:24 +0200 Subject: [PATCH] refactoring; rest sender --- pyinfra/server/packer/packers/rest.py | 12 +--- pyinfra/server/rest.py | 95 ++------------------------- pyinfra/server/sender/senders/rest.py | 30 +++++++++ 3 files changed, 37 insertions(+), 100 deletions(-) create mode 100644 pyinfra/server/sender/senders/rest.py diff --git a/pyinfra/server/packer/packers/rest.py b/pyinfra/server/packer/packers/rest.py index eccd38e..251f523 100644 --- a/pyinfra/server/packer/packers/rest.py +++ b/pyinfra/server/packer/packers/rest.py @@ -1,17 +1,7 @@ -from itertools import starmap from typing import Iterable from pyinfra.server.packer.packer import Packer -from test.utils.server import bytes_to_string - - -def pack(data: bytes, metadata: dict): - package = {"data": bytes_to_string(data), "metadata": metadata} - return package - - -def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable): - yield from starmap(pack, zip(data, metadata)) +from pyinfra.server.utils import pack_data_and_metadata_for_rest_transfer class RestPacker(Packer): diff --git a/pyinfra/server/rest.py b/pyinfra/server/rest.py index cc8a3cb..973410c 100644 --- a/pyinfra/server/rest.py +++ b/pyinfra/server/rest.py @@ -1,16 +1,14 @@ import logging -from itertools import repeat, chain, takewhile, starmap -from operator import itemgetter, methodcaller -from typing import Iterable, Dict, List, Callable, Union, Tuple +from operator import itemgetter +from typing import Iterable -import requests -from funcy import compose, ilen, identity, flatten, rcompose, repeatedly +from funcy import identity, rcompose, flatten -from pyinfra.exceptions import UnexpectedItemType from pyinfra.server.packer.packers.rest import RestPacker from pyinfra.server.sender.senders.rest import RestSender -from pyinfra.utils.func import star, lift, lstarlift, parallel_map -from test.utils.server import bytes_to_string, string_to_bytes + +from pyinfra.server.utils import stream_response_payloads, extract_payload_from_responses +from pyinfra.utils.func import lift logger = logging.getLogger("PIL.PngImagePlugin") logger.setLevel(logging.WARNING) @@ -42,22 +40,6 @@ def process_lazily(url, data: Iterable[bytes], metadata: Iterable[dict]): yield from pipe(data, metadata) -def stream_response_payloads(endpoint): - def receive(): - response = requests.get(endpoint) - return response - - def more_is_coming(response): - return response.status_code == 206 - - def load_payload(response): - return response.json() - - response_stream = takewhile(more_is_coming, repeatedly(receive)) - payloads = map(load_payload, response_stream) - yield from payloads - - def pipeline(url, receiver): return rcompose( head(url), @@ -80,10 +62,6 @@ def head(endpoint): return send -def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable): - yield from starmap(pack, zip(data, metadata)) - - def sender(endpoint): def send(method, data): response = method(endpoint, json=data) @@ -91,64 +69,3 @@ def sender(endpoint): return response return send - - -def dispatch_http_method_left_and_forward_data_right(*args): - return parallel_map(dispatch_methods, lift(identity))(*args) - - -def extract_payload_from_responses(payloads): - return map(methodcaller("json"), payloads) - - -def pack(data: bytes, metadata: dict): - package = {"data": bytes_to_string(data), "metadata": metadata} - return package - - -def unpack(package): - data, metadata = itemgetter("data", "metadata")(package) - return string_to_bytes(data), metadata - - -def dispatch_methods(data): - return *repeat(requests.patch, ilen(data) - 1), requests.post - - -def unpack_op_pack(operation) -> Callable[[Dict], List[Dict]]: - return compose(lstarlift(pack), normalize, star(operation), unpack) - - -def unpack_batchop_pack(operation) -> Callable[[List[Dict]], List[Dict]]: - return compose(lstarlift(pack), normalize, operation, lift(unpack)) - - -def normalize(itr: Iterable[Union[Tuple, Iterable]]) -> Iterable[Tuple]: - return chain.from_iterable(map(normalize_item, normalize_item(itr))) - - -def normalize_item(itm: Union[Tuple, Iterable]) -> Iterable: - if isinstance(itm, tuple): - return [itm] - elif isinstance(itm, Iterable): - return itm - else: - raise UnexpectedItemType("Encountered an item that could not be normalized to a list.") - - -def inspect(msg="ins", embed=False): - def inner(x): - - if isinstance(x, Iterable) and not isinstance(x, dict): - x = list(x) - - print(msg, x) - - if embed: - import IPython - - IPython.embed() - - return x - - return inner diff --git a/pyinfra/server/sender/senders/rest.py b/pyinfra/server/sender/senders/rest.py new file mode 100644 index 0000000..ba6a485 --- /dev/null +++ b/pyinfra/server/sender/senders/rest.py @@ -0,0 +1,30 @@ +from typing import Iterable + +import requests +from more_itertools import peekable + +from pyinfra.server.sender.sender import Sender + + +class Nothing: + pass + + +def has_next(peekable_iter): + return peekable_iter.peek(Nothing) != Nothing + + +class RestSender(Sender): + def __init__(self, endpoint): + self.endpoint = endpoint + print(endpoint) + + def __call__(self, packages: Iterable[dict]): + + packages = peekable(packages) + + for package in packages: + if has_next(packages): + yield requests.patch(self.endpoint, json=package) + else: + yield requests.post(self.endpoint, json=package)