refactoring: move
This commit is contained in:
parent
1074f44b30
commit
89f562aa71
27
pyinfra/server/debugging.py
Normal file
27
pyinfra/server/debugging.py
Normal file
@ -0,0 +1,27 @@
|
||||
from itertools import tee
|
||||
from typing import Iterable
|
||||
|
||||
|
||||
def inspect(prefix="inspect", embed=False):
|
||||
"""Can be used to inspect compositions of generator functions by placing inbetween two functions."""
|
||||
|
||||
def inner(x):
|
||||
|
||||
if isinstance(x, Iterable) and not isinstance(x, dict) and not isinstance(x, tuple):
|
||||
x, y = tee(x)
|
||||
y = list(y)
|
||||
else:
|
||||
y = x
|
||||
|
||||
l = f" {len(y)} items" if isinstance(y, list) else ""
|
||||
|
||||
print(f"{prefix}{l}:", y)
|
||||
|
||||
if embed:
|
||||
import IPython
|
||||
|
||||
IPython.embed()
|
||||
|
||||
return x
|
||||
|
||||
return inner
|
||||
@ -19,8 +19,10 @@ def has_next(peekable_iter):
|
||||
class Dispatcher:
|
||||
def __call__(self, packages: Iterable[dict]):
|
||||
|
||||
packages = peekable(packages)
|
||||
yield from self.dispatch_methods(packages)
|
||||
|
||||
def dispatch_methods(self, packages):
|
||||
packages = peekable(packages)
|
||||
for package in packages:
|
||||
method = self.patch if has_next(packages) else self.post
|
||||
response = method(package)
|
||||
|
||||
17
pyinfra/server/normalization.py
Normal file
17
pyinfra/server/normalization.py
Normal file
@ -0,0 +1,17 @@
|
||||
from itertools import chain
|
||||
from typing import Iterable, Union, Tuple
|
||||
|
||||
from pyinfra.exceptions import UnexpectedItemType
|
||||
|
||||
|
||||
def normalize(itr: Iterable[Union[Tuple, Iterable]]) -> Iterable[Tuple]:
|
||||
return chain.from_iterable(map(normalize_item, normalize_item(itr)))
|
||||
|
||||
|
||||
def normalize_item(itm: Union[Tuple, Iterable]) -> Iterable:
|
||||
if isinstance(itm, tuple):
|
||||
return [itm]
|
||||
elif isinstance(itm, Iterable):
|
||||
return itm
|
||||
else:
|
||||
raise UnexpectedItemType("Encountered an item that could not be normalized to a list.")
|
||||
@ -1,7 +1,7 @@
|
||||
from typing import Iterable
|
||||
|
||||
from pyinfra.server.packer.packer import Packer
|
||||
from pyinfra.server.utils import pack_data_and_metadata_for_rest_transfer
|
||||
from pyinfra.server.packing import pack_data_and_metadata_for_rest_transfer
|
||||
|
||||
|
||||
class RestPacker(Packer):
|
||||
|
||||
26
pyinfra/server/packing.py
Normal file
26
pyinfra/server/packing.py
Normal file
@ -0,0 +1,26 @@
|
||||
from _operator import itemgetter
|
||||
from itertools import starmap
|
||||
from typing import Iterable
|
||||
|
||||
from funcy import compose
|
||||
|
||||
from pyinfra.utils.func import starlift, lift
|
||||
from test.utils.server import bytes_to_string, string_to_bytes
|
||||
|
||||
|
||||
def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable):
|
||||
yield from starmap(pack, zip(data, metadata))
|
||||
|
||||
|
||||
def pack(data: bytes, metadata: dict):
|
||||
package = {"data": bytes_to_string(data), "metadata": metadata}
|
||||
return package
|
||||
|
||||
|
||||
def unpack(package):
|
||||
data, metadata = itemgetter("data", "metadata")(package)
|
||||
return string_to_bytes(data), metadata
|
||||
|
||||
|
||||
def unpack_fn_pack(fn):
|
||||
return compose(starlift(pack), fn, lift(unpack))
|
||||
@ -1,97 +1,17 @@
|
||||
from _operator import itemgetter
|
||||
from itertools import takewhile, starmap, repeat, chain, tee
|
||||
from typing import Iterable, Callable, Dict, List, Union, Tuple
|
||||
from funcy import compose, identity
|
||||
|
||||
import requests
|
||||
from funcy import repeatedly, ilen, compose, identity
|
||||
|
||||
from pyinfra.exceptions import UnexpectedItemType
|
||||
from pyinfra.utils.func import lift, star, starlift
|
||||
from test.utils.server import bytes_to_string, string_to_bytes
|
||||
|
||||
|
||||
def stream_response_payloads(endpoint):
|
||||
def receive():
|
||||
response = requests.get(endpoint)
|
||||
return response
|
||||
|
||||
def more_is_coming(response):
|
||||
return response.status_code == 206
|
||||
|
||||
def load_payload(response):
|
||||
return response.json()
|
||||
|
||||
response_stream = takewhile(more_is_coming, repeatedly(receive))
|
||||
payloads = map(load_payload, response_stream)
|
||||
yield from payloads
|
||||
|
||||
|
||||
def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable):
|
||||
yield from starmap(pack, zip(data, metadata))
|
||||
|
||||
|
||||
def pack(data: bytes, metadata: dict):
|
||||
package = {"data": bytes_to_string(data), "metadata": metadata}
|
||||
return package
|
||||
|
||||
|
||||
def unpack(package):
|
||||
data, metadata = itemgetter("data", "metadata")(package)
|
||||
return string_to_bytes(data), metadata
|
||||
|
||||
|
||||
def dispatch_methods(data):
|
||||
return *repeat(requests.patch, ilen(data) - 1), requests.post
|
||||
|
||||
|
||||
def unpack_batchop_pack(operation) -> Callable[[List[Dict]], List[Dict]]:
|
||||
return compose(starlift(pack), normalize, operation, lift(unpack))
|
||||
|
||||
|
||||
def normalize(itr: Iterable[Union[Tuple, Iterable]]) -> Iterable[Tuple]:
|
||||
return chain.from_iterable(map(normalize_item, normalize_item(itr)))
|
||||
|
||||
|
||||
def normalize_item(itm: Union[Tuple, Iterable]) -> Iterable:
|
||||
if isinstance(itm, tuple):
|
||||
return [itm]
|
||||
elif isinstance(itm, Iterable):
|
||||
return itm
|
||||
else:
|
||||
raise UnexpectedItemType("Encountered an item that could not be normalized to a list.")
|
||||
|
||||
|
||||
def inspect(msg="ins", embed=False):
|
||||
def inner(x):
|
||||
|
||||
if isinstance(x, Iterable) and not isinstance(x, dict) and not isinstance(x, tuple):
|
||||
x, y = tee(x)
|
||||
y = list(y)
|
||||
else:
|
||||
y = x
|
||||
|
||||
l = len(y) if isinstance(y, list) else ""
|
||||
print(msg, l, y)
|
||||
|
||||
if embed:
|
||||
import IPython
|
||||
|
||||
IPython.embed()
|
||||
|
||||
return x
|
||||
|
||||
return inner
|
||||
|
||||
|
||||
def make_streamable(fn, batched):
|
||||
return compose(normalize, (identity if batched else starlift)(fn))
|
||||
|
||||
|
||||
def unpack_fn_pack(fn):
|
||||
return compose(starlift(pack), fn, lift(unpack))
|
||||
from pyinfra.server.normalization import normalize
|
||||
from pyinfra.server.packing import unpack_fn_pack
|
||||
from pyinfra.utils.func import starlift
|
||||
|
||||
|
||||
def make_streamable_and_wrap_in_packing_logic(fn, batched):
|
||||
fn = make_streamable(fn, batched)
|
||||
fn = unpack_fn_pack(fn)
|
||||
return fn
|
||||
|
||||
|
||||
def make_streamable(fn, batched):
|
||||
return compose(normalize, (identity if batched else starlift)(fn))
|
||||
|
||||
|
||||
|
||||
3
test/fixtures/input.py
vendored
3
test/fixtures/input.py
vendored
@ -6,7 +6,8 @@ from PIL import Image
|
||||
from funcy import lmap, compose, flatten, lflatten
|
||||
|
||||
from pyinfra.server.dispatcher.dispatcher import Nothing
|
||||
from pyinfra.server.utils import pack, unpack, normalize_item
|
||||
from pyinfra.server.normalization import normalize_item
|
||||
from pyinfra.server.packing import pack, unpack
|
||||
from pyinfra.utils.func import star, lift, lstarlift
|
||||
from test.utils.image import image_to_bytes
|
||||
from test.utils.server import string_to_bytes
|
||||
|
||||
@ -2,7 +2,7 @@ import pytest
|
||||
|
||||
from pyinfra.server.packer.packers.identity import IdentityPacker, bundle
|
||||
from pyinfra.server.packer.packers.rest import RestPacker
|
||||
from pyinfra.server.utils import pack
|
||||
from pyinfra.server.packing import pack
|
||||
from pyinfra.utils.func import lstarlift
|
||||
|
||||
|
||||
|
||||
@ -10,7 +10,7 @@ from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStre
|
||||
from pyinfra.server.packer.packers.rest import RestPacker
|
||||
from pyinfra.server.receiver.receivers.identity import IdentityReceiver
|
||||
from pyinfra.server.receiver.receivers.rest import RestReceiver
|
||||
from pyinfra.server.utils import unpack
|
||||
from pyinfra.server.packing import unpack
|
||||
from pyinfra.utils.func import lift
|
||||
|
||||
|
||||
|
||||
@ -2,7 +2,7 @@ import pytest
|
||||
from funcy import compose, lzip
|
||||
|
||||
from pyinfra.server.packer.packers.identity import bundle
|
||||
from pyinfra.server.utils import pack, unpack
|
||||
from pyinfra.server.packing import pack, unpack
|
||||
from pyinfra.utils.func import lstarlift
|
||||
from test.utils.server import bytes_to_string
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user