refactoring; rest sender

This commit is contained in:
Matthias Bisping 2022-05-04 16:46:24 +02:00
parent 00ea224379
commit 14d83abd72
3 changed files with 37 additions and 100 deletions

View File

@ -1,17 +1,7 @@
from itertools import starmap
from typing import Iterable
from pyinfra.server.packer.packer import Packer
from test.utils.server import bytes_to_string
def pack(data: bytes, metadata: dict):
package = {"data": bytes_to_string(data), "metadata": metadata}
return package
def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable):
yield from starmap(pack, zip(data, metadata))
from pyinfra.server.utils import pack_data_and_metadata_for_rest_transfer
class RestPacker(Packer):

View File

@ -1,16 +1,14 @@
import logging
from itertools import repeat, chain, takewhile, starmap
from operator import itemgetter, methodcaller
from typing import Iterable, Dict, List, Callable, Union, Tuple
from operator import itemgetter
from typing import Iterable
import requests
from funcy import compose, ilen, identity, flatten, rcompose, repeatedly
from funcy import identity, rcompose, flatten
from pyinfra.exceptions import UnexpectedItemType
from pyinfra.server.packer.packers.rest import RestPacker
from pyinfra.server.sender.senders.rest import RestSender
from pyinfra.utils.func import star, lift, lstarlift, parallel_map
from test.utils.server import bytes_to_string, string_to_bytes
from pyinfra.server.utils import stream_response_payloads, extract_payload_from_responses
from pyinfra.utils.func import lift
logger = logging.getLogger("PIL.PngImagePlugin")
logger.setLevel(logging.WARNING)
@ -42,22 +40,6 @@ def process_lazily(url, data: Iterable[bytes], metadata: Iterable[dict]):
yield from pipe(data, metadata)
def stream_response_payloads(endpoint):
def receive():
response = requests.get(endpoint)
return response
def more_is_coming(response):
return response.status_code == 206
def load_payload(response):
return response.json()
response_stream = takewhile(more_is_coming, repeatedly(receive))
payloads = map(load_payload, response_stream)
yield from payloads
def pipeline(url, receiver):
return rcompose(
head(url),
@ -80,10 +62,6 @@ def head(endpoint):
return send
def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable):
yield from starmap(pack, zip(data, metadata))
def sender(endpoint):
def send(method, data):
response = method(endpoint, json=data)
@ -91,64 +69,3 @@ def sender(endpoint):
return response
return send
def dispatch_http_method_left_and_forward_data_right(*args):
return parallel_map(dispatch_methods, lift(identity))(*args)
def extract_payload_from_responses(payloads):
return map(methodcaller("json"), payloads)
def pack(data: bytes, metadata: dict):
package = {"data": bytes_to_string(data), "metadata": metadata}
return package
def unpack(package):
data, metadata = itemgetter("data", "metadata")(package)
return string_to_bytes(data), metadata
def dispatch_methods(data):
return *repeat(requests.patch, ilen(data) - 1), requests.post
def unpack_op_pack(operation) -> Callable[[Dict], List[Dict]]:
return compose(lstarlift(pack), normalize, star(operation), unpack)
def unpack_batchop_pack(operation) -> Callable[[List[Dict]], List[Dict]]:
return compose(lstarlift(pack), normalize, operation, lift(unpack))
def normalize(itr: Iterable[Union[Tuple, Iterable]]) -> Iterable[Tuple]:
return chain.from_iterable(map(normalize_item, normalize_item(itr)))
def normalize_item(itm: Union[Tuple, Iterable]) -> Iterable:
if isinstance(itm, tuple):
return [itm]
elif isinstance(itm, Iterable):
return itm
else:
raise UnexpectedItemType("Encountered an item that could not be normalized to a list.")
def inspect(msg="ins", embed=False):
def inner(x):
if isinstance(x, Iterable) and not isinstance(x, dict):
x = list(x)
print(msg, x)
if embed:
import IPython
IPython.embed()
return x
return inner

View File

@ -0,0 +1,30 @@
from typing import Iterable
import requests
from more_itertools import peekable
from pyinfra.server.sender.sender import Sender
class Nothing:
pass
def has_next(peekable_iter):
return peekable_iter.peek(Nothing) != Nothing
class RestSender(Sender):
def __init__(self, endpoint):
self.endpoint = endpoint
print(endpoint)
def __call__(self, packages: Iterable[dict]):
packages = peekable(packages)
for package in packages:
if has_next(packages):
yield requests.patch(self.endpoint, json=package)
else:
yield requests.post(self.endpoint, json=package)