refactoring; fixed sender

This commit is contained in:
Matthias Bisping 2022-05-04 16:57:08 +02:00
parent 14d83abd72
commit 531ff8d3e0
9 changed files with 117 additions and 18 deletions

View File

@ -17,14 +17,13 @@ def has_next(peekable_iter):
class RestSender(Sender):
def __init__(self, endpoint):
self.endpoint = endpoint
print(endpoint)
def __call__(self, packages: Iterable[dict]):
packages = peekable(packages)
for package in packages:
if has_next(packages):
yield requests.patch(self.endpoint, json=package)
else:
yield requests.post(self.endpoint, json=package)
method = requests.patch if has_next(packages) else requests.post
response = method(self.endpoint, json=package)
response.raise_for_status()
yield response

View File

@ -0,0 +1,91 @@
from _operator import methodcaller, itemgetter
from itertools import takewhile, starmap, repeat, chain
from typing import Iterable, Callable, Dict, List, Union, Tuple
import requests
from funcy import repeatedly, identity, ilen, compose
from pyinfra.exceptions import UnexpectedItemType
from pyinfra.utils.func import parallel_map, lift, lstarlift, star
from test.utils.server import bytes_to_string, string_to_bytes
def stream_response_payloads(endpoint):
def receive():
response = requests.get(endpoint)
return response
def more_is_coming(response):
return response.status_code == 206
def load_payload(response):
return response.json()
response_stream = takewhile(more_is_coming, repeatedly(receive))
payloads = map(load_payload, response_stream)
yield from payloads
def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable):
yield from starmap(pack, zip(data, metadata))
def dispatch_http_method_left_and_forward_data_right(*args):
return parallel_map(dispatch_methods, lift(identity))(*args)
def extract_payload_from_responses(payloads):
return map(methodcaller("json"), payloads)
def pack(data: bytes, metadata: dict):
package = {"data": bytes_to_string(data), "metadata": metadata}
return package
def unpack(package):
data, metadata = itemgetter("data", "metadata")(package)
return string_to_bytes(data), metadata
def dispatch_methods(data):
return *repeat(requests.patch, ilen(data) - 1), requests.post
def unpack_op_pack(operation) -> Callable[[Dict], List[Dict]]:
return compose(lstarlift(pack), normalize, star(operation), unpack)
def unpack_batchop_pack(operation) -> Callable[[List[Dict]], List[Dict]]:
return compose(lstarlift(pack), normalize, operation, lift(unpack))
def normalize(itr: Iterable[Union[Tuple, Iterable]]) -> Iterable[Tuple]:
return chain.from_iterable(map(normalize_item, normalize_item(itr)))
def normalize_item(itm: Union[Tuple, Iterable]) -> Iterable:
if isinstance(itm, tuple):
return [itm]
elif isinstance(itm, Iterable):
return itm
else:
raise UnexpectedItemType("Encountered an item that could not be normalized to a list.")
def inspect(msg="ins", embed=False):
def inner(x):
if isinstance(x, Iterable) and not isinstance(x, dict):
x = list(x)
print(msg, x)
if embed:
import IPython
IPython.embed()
return x
return inner

View File

@ -1,7 +1,8 @@
import pytest
from funcy import lmap
from pyinfra.server.rest import process_eagerly, unpack
from pyinfra.server.rest import process_eagerly
from pyinfra.server.utils import unpack
@pytest.mark.parametrize("batched", [True, False])

View File

@ -1,7 +1,8 @@
import pytest
from funcy import lmap
from pyinfra.server.rest import unpack, process_lazily
from pyinfra.server.rest import process_lazily
from pyinfra.server.utils import unpack
@pytest.mark.parametrize("batched", [True, False])

View File

@ -5,8 +5,8 @@ import pytest
from PIL import Image
from funcy import lmap, compose, flatten
from pyinfra.server.rest import pack, normalize_item, unpack
from pyinfra.utils.func import star, lift
from pyinfra.server.utils import pack, unpack, normalize_item
from pyinfra.utils.func import star, lift, lstarlift
from test.utils.image import image_to_bytes
@ -67,3 +67,8 @@ def images(n_items):
@pytest.fixture
def metadata(n_items):
return list(repeat({"dummy": True}, n_items))
@pytest.fixture
def packages(input_data_items, metadata):
return lstarlift(pack)(zip(input_data_items, metadata))

View File

@ -10,7 +10,7 @@ from PIL import Image
from funcy import retry, compose
from waitress import serve
from pyinfra.server.rest import unpack_op_pack, unpack_batchop_pack
from pyinfra.server.utils import unpack_op_pack, unpack_batchop_pack
from pyinfra.utils.buffer import bufferize
from pyinfra.utils.func import llift, starlift
from pyinfra.server.server import set_up_processing_server

View File

@ -2,7 +2,7 @@ import pytest
from pyinfra.server.packer.packers.identity import IdentityPacker, bundle
from pyinfra.server.packer.packers.rest import RestPacker
from pyinfra.server.rest import pack
from pyinfra.server.utils import pack
from pyinfra.utils.func import lstarlift

View File

@ -1,8 +1,10 @@
def test_identity_sender(data, metadata):
packer = IdentityPacker()
assert list(packer(data, metadata)) == lstarlift(bundle)(zip(data, metadata))
import pytest
from pyinfra.server.sender.senders.rest import RestSender
def test_rest_packer(data, metadata):
packer = RestPacker()
assert list(packer(data, metadata)) == lstarlift(pack)(zip(data, metadata))
@pytest.mark.parametrize("batched", [True, False])
@pytest.mark.parametrize("item_type", ["string", "image", "pdf"])
def test_rest_packer(url, packages, server_process):
sender = RestSender(f"{url}/process")
assert all([r.status_code == 200 for r in sender(packages)])

View File

@ -2,7 +2,7 @@ import pytest
from funcy import compose, lzip
from pyinfra.server.packer.packers.identity import bundle
from pyinfra.server.rest import unpack, pack
from pyinfra.server.utils import pack, unpack
from pyinfra.utils.func import lstarlift
from test.utils.server import bytes_to_string