signature harminization for 1 -> 1 and 1 -> n WIP

This commit is contained in:
Matthias Bisping 2022-04-29 15:43:20 +02:00
parent 23070f3480
commit fd57261631
4 changed files with 59 additions and 32 deletions

View File

@ -1,8 +1,8 @@
from itertools import chain
from itertools import chain, tee
from operator import itemgetter
from typing import Iterable
from funcy import compose, first, rcompose
from more_itertools import flatten
from funcy import compose, first, rcompose, flatten
from pyinfra.utils.func import star, lift, lstarlift, starlift
from test.utils.server import bytes_to_string, string_to_bytes
@ -24,13 +24,30 @@ def bundle(data: bytes, metadata: dict):
def unpack_op_pack(operation):
return compose(first, lstarlift(pack), star(operation), unpack)
return compose(inspect("A2"), flatten, inspect("A1"), lstarlift(pack), star(operation), unpack)
def unpack_batchop_pack(operation):
return rcompose(
lift(unpack), # unpack the buffer items
operation, # apply operation on unpacked items
flatten, # operations may be 1 -> 1, 1 -> n or n -> 1, hence flatten
lstarlift(pack),
)
raise BrokenPipeError
# return rcompose(
# lift(unpack), # unpack the buffer items
# operation, # apply operation on unpacked items
# flatten, # operations may be 1 -> 1, 1 -> n or n -> 1, hence flatten
# lstarlift(pack),
# )
def inspect(msg="ins"):
def inner(x):
if isinstance(x, Iterable) and not isinstance(x, dict):
print(11111111111111111111)
x = list(x)
else:
print("00000000000")
print(msg, x)
return x
return inner

View File

@ -7,8 +7,9 @@ from typing import Iterable
import pytest
import requests
from funcy import rcompose, compose, rpartial, identity, lmap, ilen, first
from more_itertools import flatten
from pyinfra.rest import pack, unpack, bundle
from pyinfra.rest import pack, unpack, bundle, inspect
from pyinfra.utils.func import lift, starlift, parallel_map, star, lstarlift
logger = logging.getLogger("PIL.PngImagePlugin")
@ -26,25 +27,33 @@ def post_partial(url, input_data: Iterable[bytes], metadata):
pack_data_and_metadata_for_rest_transfer = lift(rpartial(pack, metadata))
dispatch_http_method_left_and_forward_data_right = parallel_map(dispatch_methods, lift(identity))
send_data_with_method_to_analyzer = starlift(send)
extract_payload_from_responses = lift(methodcaller("json"))
flatten_buffered_payloads = chain.from_iterable
interpret_payloads = lift(compose(star(bundle), unpack))
extract_payload_from_responses = compose(flatten, lift(methodcaller("json")))
flatten_buffered_payloads = flatten
input_data_to_result_data = rcompose(
pack_data_and_metadata_for_rest_transfer,
dispatch_http_method_left_and_forward_data_right,
send_data_with_method_to_analyzer,
inspect("B"),
extract_payload_from_responses,
inspect("C"),
flatten_buffered_payloads,
interpret_payloads,
inspect("D"),
)
return input_data_to_result_data(input_data)
@pytest.mark.parametrize("item_type", ["string", "image"])
@pytest.mark.parametrize("item_type", ["string"])
def test_sending_partial_request(url, data_items, metadata, operation, server_process):
expected = lmap(compose(first, lstarlift(bundle), partial(operation, metadata=metadata)), data_items)
op = compose(lstarlift(pack), partial(operation, metadata=metadata))
expected = list(flatten(map(op, data_items)))
print()
print("exp")
print(expected)
output = list(post_partial(f"{url}/process", data_items, metadata))
print()
print("out")
print(output)
assert output == expected

View File

@ -1,5 +1,5 @@
import json
from operator import itemgetter, methodcaller
from operator import itemgetter, methodcaller, attrgetter
import pytest
import requests
@ -9,17 +9,18 @@ from pyinfra.rest import pack
@pytest.mark.parametrize("item_type", ["pdf"])
def test_sending_partial_request(url, server_process, pdf, metadata, operation):
def test_pickup_endpoint(url, server_process, pdf, metadata, operation):
def post(package):
return requests.post(f"{url}/submit", json=package)
def post(data):
return requests.post(f"{url}/submit", data=data)
pickup = compose(itemgetter("pickup_endpoint"), methodcaller("json"), post, rpartial(pack, metadata))(pdf)
pickup = compose(
itemgetter("pickup_endpoint"),
methodcaller("json"),
post,
rpartial(pack, metadata),
)(pdf)
print(pickup)
while True:
response = requests.get(f"{url}/{pickup}")
print(response)
# while True:
# response = requests.get(f"{url}/{pickup}")
# print(response)

View File

@ -58,11 +58,11 @@ def processor_fn(operation, buffer_size, batched):
@pytest.fixture
def operation(item_type, batched):
def upper(string: bytes, metadata):
return string.decode().upper().encode(), metadata
return [(string.decode().upper().encode(), metadata)]
def rotate(im: bytes, metadata):
im = Image.open(io.BytesIO(im))
return image_to_bytes(im.rotate(90)), metadata
return [(image_to_bytes(im.rotate(90)), metadata)]
def stream_pages(pdf: bytes, metadata):
for page in fitz.open(stream=pdf):
@ -79,7 +79,7 @@ def item_type(request):
return request.param
@pytest.fixture(params=[False, True])
@pytest.fixture(params=[False])
def batched(request):
"""Controls, whether the buffer processor function of the webserver is applied to batches or single items."""
return request.param