refactoring

This commit is contained in:
Matthias Bisping 2022-05-03 18:06:28 +02:00
parent 85d7ad52dc
commit 51a6bf9875
2 changed files with 45 additions and 64 deletions

View File

@ -1,11 +1,11 @@
import logging
from _operator import methodcaller
from itertools import repeat, chain
from _operator import methodcaller, itemgetter
from itertools import repeat, chain, starmap, takewhile
from operator import itemgetter
from typing import Iterable, Dict, List, Callable, Union, Tuple
import requests
from funcy import compose, ilen, rpartial, identity, flatten, rcompose
from funcy import compose, ilen, identity, flatten, rcompose, repeatedly
from pyinfra.exceptions import UnexpectedItemType
from pyinfra.utils.func import star, lift, lstarlift, parallel_map, starlift
@ -79,7 +79,6 @@ def post_partial(url, data: Iterable[bytes], metadata: dict):
response.raise_for_status()
return response
pack_data_and_metadata_for_rest_transfer = lift(rpartial(pack, metadata))
dispatch_http_method_left_and_forward_data_right = parallel_map(dispatch_methods, lift(identity))
send_data_with_method_to_analyzer = starlift(send)
extract_payload_from_responses = lift(methodcaller("json"))
@ -92,4 +91,43 @@ def post_partial(url, data: Iterable[bytes], metadata: dict):
flatten, # each analysis call returns an iterable. Can be empty, singleton or multi item. Hence, flatten.
)
return input_data_to_result_data(data)
return input_data_to_result_data(data, repeat(metadata))
def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable):
yield from starmap(pack, zip(data, metadata))
def stream_response_payloads(endpoint):
def receive():
response = requests.get(endpoint)
return response
def more_is_coming(response):
return response.status_code == 206
def load_payload(response):
return response.json()
response_stream = takewhile(more_is_coming, repeatedly(receive))
payloads = map(load_payload, response_stream)
yield from payloads
def submit_and_pickup(url, data, metadata):
def post(package):
response = requests.post(f"{url}/submit", json=package)
response.raise_for_status()
return response
input_data_to_payload_stream = rcompose(
pack_data_and_metadata_for_rest_transfer,
lift(post),
lift(methodcaller("json")),
lift(itemgetter("pickup_endpoint")),
lift(lambda ep: f"{url}/{ep}"),
lift(stream_response_payloads),
flatten,
)
yield from input_data_to_payload_stream(data, repeat(metadata))

View File

@ -1,64 +1,7 @@
from functools import partial
from itertools import starmap, repeat
from itertools import takewhile
from operator import itemgetter, methodcaller
from typing import Iterable, Tuple
import pytest
import requests
from funcy import repeatedly, flatten, rcompose, lmap
from funcy import lmap
from pyinfra.rest import pack, unpack
from pyinfra.utils.func import lift
def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable):
yield from starmap(pack, zip(data, metadata))
def send_data_to_analyzer(url, payloads: Iterable[Tuple]):
def post(package):
response = requests.post(f"{url}/submit", json=package)
response.raise_for_status()
return response
yield from map(post, payloads)
def stream_response_payloads(endpoint):
def receive():
response = requests.get(endpoint)
return response
def more_is_coming(response):
return response.status_code == 206
def load_payload(response):
return response.json()
response_stream = takewhile(more_is_coming, repeatedly(receive))
payloads = map(load_payload, response_stream)
yield from payloads
def submit_and_pickup(url, data, metadata):
def post(package):
response = requests.post(f"{url}/submit", json=package)
response.raise_for_status()
return response
input_data_to_payload_stream = rcompose(
pack_data_and_metadata_for_rest_transfer,
lift(post),
lift(methodcaller("json")),
lift(itemgetter("pickup_endpoint")),
lift(lambda ep: f"{url}/{ep}"),
lift(stream_response_payloads),
flatten,
)
yield from input_data_to_payload_stream(data, repeat(metadata))
from pyinfra.rest import unpack, submit_and_pickup
@pytest.mark.parametrize("batched", [True, False])