From 89f562aa717b96b023904eeedc166447173615fb Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Mon, 16 May 2022 14:58:19 +0200 Subject: [PATCH] refactoring: move --- pyinfra/server/debugging.py | 27 +++++++ pyinfra/server/dispatcher/dispatcher.py | 4 +- pyinfra/server/normalization.py | 17 ++++ pyinfra/server/packer/packers/rest.py | 2 +- pyinfra/server/packing.py | 26 ++++++ pyinfra/server/stream/utils.py | 0 pyinfra/server/utils.py | 100 +++--------------------- test/fixtures/input.py | 3 +- test/unit_tests/server/packer_test.py | 2 +- test/unit_tests/server/pipeline_test.py | 2 +- test/unit_tests/server/utils.py | 2 +- 11 files changed, 89 insertions(+), 96 deletions(-) create mode 100644 pyinfra/server/debugging.py create mode 100644 pyinfra/server/normalization.py create mode 100644 pyinfra/server/packing.py delete mode 100644 pyinfra/server/stream/utils.py diff --git a/pyinfra/server/debugging.py b/pyinfra/server/debugging.py new file mode 100644 index 0000000..40ec62a --- /dev/null +++ b/pyinfra/server/debugging.py @@ -0,0 +1,27 @@ +from itertools import tee +from typing import Iterable + + +def inspect(prefix="inspect", embed=False): + """Can be used to inspect compositions of generator functions by placing inbetween two functions.""" + + def inner(x): + + if isinstance(x, Iterable) and not isinstance(x, dict) and not isinstance(x, tuple): + x, y = tee(x) + y = list(y) + else: + y = x + + l = f" {len(y)} items" if isinstance(y, list) else "" + + print(f"{prefix}{l}:", y) + + if embed: + import IPython + + IPython.embed() + + return x + + return inner diff --git a/pyinfra/server/dispatcher/dispatcher.py b/pyinfra/server/dispatcher/dispatcher.py index a9c5ba1..9aa7a44 100644 --- a/pyinfra/server/dispatcher/dispatcher.py +++ b/pyinfra/server/dispatcher/dispatcher.py @@ -19,8 +19,10 @@ def has_next(peekable_iter): class Dispatcher: def __call__(self, packages: Iterable[dict]): - packages = peekable(packages) + yield from self.dispatch_methods(packages) + def dispatch_methods(self, packages): + packages = peekable(packages) for package in packages: method = self.patch if has_next(packages) else self.post response = method(package) diff --git a/pyinfra/server/normalization.py b/pyinfra/server/normalization.py new file mode 100644 index 0000000..03ce0a7 --- /dev/null +++ b/pyinfra/server/normalization.py @@ -0,0 +1,17 @@ +from itertools import chain +from typing import Iterable, Union, Tuple + +from pyinfra.exceptions import UnexpectedItemType + + +def normalize(itr: Iterable[Union[Tuple, Iterable]]) -> Iterable[Tuple]: + return chain.from_iterable(map(normalize_item, normalize_item(itr))) + + +def normalize_item(itm: Union[Tuple, Iterable]) -> Iterable: + if isinstance(itm, tuple): + return [itm] + elif isinstance(itm, Iterable): + return itm + else: + raise UnexpectedItemType("Encountered an item that could not be normalized to a list.") diff --git a/pyinfra/server/packer/packers/rest.py b/pyinfra/server/packer/packers/rest.py index 251f523..ba59a89 100644 --- a/pyinfra/server/packer/packers/rest.py +++ b/pyinfra/server/packer/packers/rest.py @@ -1,7 +1,7 @@ from typing import Iterable from pyinfra.server.packer.packer import Packer -from pyinfra.server.utils import pack_data_and_metadata_for_rest_transfer +from pyinfra.server.packing import pack_data_and_metadata_for_rest_transfer class RestPacker(Packer): diff --git a/pyinfra/server/packing.py b/pyinfra/server/packing.py new file mode 100644 index 0000000..32624e9 --- /dev/null +++ b/pyinfra/server/packing.py @@ -0,0 +1,26 @@ +from _operator import itemgetter +from itertools import starmap +from typing import Iterable + +from funcy import compose + +from pyinfra.utils.func import starlift, lift +from test.utils.server import bytes_to_string, string_to_bytes + + +def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable): + yield from starmap(pack, zip(data, metadata)) + + +def pack(data: bytes, metadata: dict): + package = {"data": bytes_to_string(data), "metadata": metadata} + return package + + +def unpack(package): + data, metadata = itemgetter("data", "metadata")(package) + return string_to_bytes(data), metadata + + +def unpack_fn_pack(fn): + return compose(starlift(pack), fn, lift(unpack)) diff --git a/pyinfra/server/stream/utils.py b/pyinfra/server/stream/utils.py deleted file mode 100644 index e69de29..0000000 diff --git a/pyinfra/server/utils.py b/pyinfra/server/utils.py index 05dd596..e06e78d 100644 --- a/pyinfra/server/utils.py +++ b/pyinfra/server/utils.py @@ -1,97 +1,17 @@ -from _operator import itemgetter -from itertools import takewhile, starmap, repeat, chain, tee -from typing import Iterable, Callable, Dict, List, Union, Tuple +from funcy import compose, identity -import requests -from funcy import repeatedly, ilen, compose, identity - -from pyinfra.exceptions import UnexpectedItemType -from pyinfra.utils.func import lift, star, starlift -from test.utils.server import bytes_to_string, string_to_bytes - - -def stream_response_payloads(endpoint): - def receive(): - response = requests.get(endpoint) - return response - - def more_is_coming(response): - return response.status_code == 206 - - def load_payload(response): - return response.json() - - response_stream = takewhile(more_is_coming, repeatedly(receive)) - payloads = map(load_payload, response_stream) - yield from payloads - - -def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable): - yield from starmap(pack, zip(data, metadata)) - - -def pack(data: bytes, metadata: dict): - package = {"data": bytes_to_string(data), "metadata": metadata} - return package - - -def unpack(package): - data, metadata = itemgetter("data", "metadata")(package) - return string_to_bytes(data), metadata - - -def dispatch_methods(data): - return *repeat(requests.patch, ilen(data) - 1), requests.post - - -def unpack_batchop_pack(operation) -> Callable[[List[Dict]], List[Dict]]: - return compose(starlift(pack), normalize, operation, lift(unpack)) - - -def normalize(itr: Iterable[Union[Tuple, Iterable]]) -> Iterable[Tuple]: - return chain.from_iterable(map(normalize_item, normalize_item(itr))) - - -def normalize_item(itm: Union[Tuple, Iterable]) -> Iterable: - if isinstance(itm, tuple): - return [itm] - elif isinstance(itm, Iterable): - return itm - else: - raise UnexpectedItemType("Encountered an item that could not be normalized to a list.") - - -def inspect(msg="ins", embed=False): - def inner(x): - - if isinstance(x, Iterable) and not isinstance(x, dict) and not isinstance(x, tuple): - x, y = tee(x) - y = list(y) - else: - y = x - - l = len(y) if isinstance(y, list) else "" - print(msg, l, y) - - if embed: - import IPython - - IPython.embed() - - return x - - return inner - - -def make_streamable(fn, batched): - return compose(normalize, (identity if batched else starlift)(fn)) - - -def unpack_fn_pack(fn): - return compose(starlift(pack), fn, lift(unpack)) +from pyinfra.server.normalization import normalize +from pyinfra.server.packing import unpack_fn_pack +from pyinfra.utils.func import starlift def make_streamable_and_wrap_in_packing_logic(fn, batched): fn = make_streamable(fn, batched) fn = unpack_fn_pack(fn) return fn + + +def make_streamable(fn, batched): + return compose(normalize, (identity if batched else starlift)(fn)) + + diff --git a/test/fixtures/input.py b/test/fixtures/input.py index a9bd2a0..209ebd9 100644 --- a/test/fixtures/input.py +++ b/test/fixtures/input.py @@ -6,7 +6,8 @@ from PIL import Image from funcy import lmap, compose, flatten, lflatten from pyinfra.server.dispatcher.dispatcher import Nothing -from pyinfra.server.utils import pack, unpack, normalize_item +from pyinfra.server.normalization import normalize_item +from pyinfra.server.packing import pack, unpack from pyinfra.utils.func import star, lift, lstarlift from test.utils.image import image_to_bytes from test.utils.server import string_to_bytes diff --git a/test/unit_tests/server/packer_test.py b/test/unit_tests/server/packer_test.py index de389a2..f75fe87 100644 --- a/test/unit_tests/server/packer_test.py +++ b/test/unit_tests/server/packer_test.py @@ -2,7 +2,7 @@ import pytest from pyinfra.server.packer.packers.identity import IdentityPacker, bundle from pyinfra.server.packer.packers.rest import RestPacker -from pyinfra.server.utils import pack +from pyinfra.server.packing import pack from pyinfra.utils.func import lstarlift diff --git a/test/unit_tests/server/pipeline_test.py b/test/unit_tests/server/pipeline_test.py index c881b63..c15163e 100644 --- a/test/unit_tests/server/pipeline_test.py +++ b/test/unit_tests/server/pipeline_test.py @@ -10,7 +10,7 @@ from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStre from pyinfra.server.packer.packers.rest import RestPacker from pyinfra.server.receiver.receivers.identity import IdentityReceiver from pyinfra.server.receiver.receivers.rest import RestReceiver -from pyinfra.server.utils import unpack +from pyinfra.server.packing import unpack from pyinfra.utils.func import lift diff --git a/test/unit_tests/server/utils.py b/test/unit_tests/server/utils.py index e1f8846..a398480 100644 --- a/test/unit_tests/server/utils.py +++ b/test/unit_tests/server/utils.py @@ -2,7 +2,7 @@ import pytest from funcy import compose, lzip from pyinfra.server.packer.packers.identity import bundle -from pyinfra.server.utils import pack, unpack +from pyinfra.server.packing import pack, unpack from pyinfra.utils.func import lstarlift from test.utils.server import bytes_to_string