refactoring: move

This commit is contained in:
Matthias Bisping 2022-05-03 09:56:12 +02:00
parent ea9b405d2a
commit 92190a42f0
2 changed files with 37 additions and 42 deletions

View File

@ -1,13 +1,19 @@
from operator import itemgetter
import logging
from _operator import methodcaller
from itertools import repeat
from operator import itemgetter
from typing import Iterable, Dict, List, Callable, Union, Tuple
from funcy import compose, first
import requests
from funcy import compose, first, ilen, rpartial, identity, flatten, rcompose
from pyinfra.exceptions import UnexpectedItemType
from pyinfra.utils.func import star, lift, lstarlift
from pyinfra.utils.func import star, lift, lstarlift, parallel_map, starlift
from test.utils.server import bytes_to_string, string_to_bytes
logger = logging.getLogger("PIL.PngImagePlugin")
logger.setLevel(logging.WARNING)
def pack(data: bytes, metadata: dict):
package = {"data": bytes_to_string(data), "metadata": metadata}
@ -70,3 +76,30 @@ def inspect(msg="ins", embed=False):
return x
return inner
def dispatch_methods(input_data):
return *repeat(requests.patch, ilen(input_data) - 1), requests.post
def post_partial(url, input_data: Iterable[bytes], metadata):
def send(method, data):
response = method(url, json=data)
response.raise_for_status()
return response
pack_data_and_metadata_for_rest_transfer = lift(rpartial(pack, metadata))
dispatch_http_method_left_and_forward_data_right = parallel_map(dispatch_methods, lift(identity))
send_data_with_method_to_analyzer = starlift(send)
extract_payload_from_responses = compose(flatten, lift(methodcaller("json")))
flatten_buffered_payloads = flatten
input_data_to_result_data = rcompose(
pack_data_and_metadata_for_rest_transfer,
dispatch_http_method_left_and_forward_data_right,
send_data_with_method_to_analyzer,
extract_payload_from_responses,
flatten_buffered_payloads,
)
return input_data_to_result_data(input_data)

View File

@ -1,44 +1,6 @@
import logging
from itertools import repeat
from operator import methodcaller
from typing import Iterable
import pytest
import requests
from funcy import rcompose, compose, rpartial, identity, ilen, flatten
from pyinfra.rest import pack, inspect
from pyinfra.utils.func import lift, starlift, parallel_map
logger = logging.getLogger("PIL.PngImagePlugin")
logger.setLevel(logging.WARNING)
def dispatch_methods(input_data):
return *repeat(requests.patch, ilen(input_data) - 1), requests.post
def post_partial(url, input_data: Iterable[bytes], metadata):
def send(method, data):
response = method(url, json=data)
response.raise_for_status()
return response
pack_data_and_metadata_for_rest_transfer = lift(rpartial(pack, metadata))
dispatch_http_method_left_and_forward_data_right = parallel_map(dispatch_methods, lift(identity))
send_data_with_method_to_analyzer = starlift(send)
extract_payload_from_responses = compose(flatten, lift(methodcaller("json")))
flatten_buffered_payloads = flatten
input_data_to_result_data = rcompose(
pack_data_and_metadata_for_rest_transfer,
dispatch_http_method_left_and_forward_data_right,
send_data_with_method_to_analyzer,
extract_payload_from_responses,
flatten_buffered_payloads,
)
return input_data_to_result_data(input_data)
from pyinfra.rest import post_partial
@pytest.mark.parametrize("batched", [True, False])