159 lines
3.9 KiB
Python
159 lines
3.9 KiB
Python
from functools import partial
|
|
from itertools import starmap, repeat
|
|
|
|
import numpy as np
|
|
import pytest
|
|
from PIL import Image
|
|
from funcy import lmap, compose, flatten, lflatten, omit, second, first, lzip
|
|
|
|
from pyinfra.server.dispatcher.dispatcher import Nothing
|
|
from pyinfra.server.normalization import normalize_item
|
|
from pyinfra.server.packing import pack, unpack
|
|
from pyinfra.utils.func import star, lift, lstarlift
|
|
from test.utils.image import image_to_bytes
|
|
from test.utils.input import pair_data_with_queue_message
|
|
from test.utils.pdf import pdf_stream
|
|
|
|
|
|
@pytest.fixture
|
|
def data(data_type, pdf):
|
|
if data_type == "pdf":
|
|
return pdf
|
|
elif data_type == "bytestring":
|
|
return "content".encode("latin1")
|
|
|
|
|
|
@pytest.fixture
|
|
def input_data_items(unencoded_input_data, input_data_encoder):
|
|
return input_data_encoder(unencoded_input_data)
|
|
|
|
|
|
@pytest.fixture
|
|
def unencoded_input_data(item_type, unencoded_strings, unencoded_images, unencoded_pdfs):
|
|
if item_type == "string":
|
|
return unencoded_strings
|
|
elif item_type == "image":
|
|
return unencoded_images
|
|
elif item_type == "pdf":
|
|
return unencoded_pdfs
|
|
else:
|
|
raise ValueError(f"Unknown item type {item_type}")
|
|
|
|
|
|
@pytest.fixture
|
|
def input_data_encoder(item_type):
|
|
if item_type == "string":
|
|
return strings_to_bytes
|
|
elif item_type == "image":
|
|
return images_to_bytes
|
|
elif item_type == "pdf":
|
|
return pdfs_to_bytes
|
|
else:
|
|
raise ValueError(f"Unknown item type {item_type}")
|
|
|
|
|
|
@pytest.fixture
|
|
def unencoded_pdfs(n_items, unencoded_pdf):
|
|
return [unencoded_pdf] * n_items
|
|
|
|
|
|
def pdfs_to_bytes(unencoded_pdfs):
|
|
return [pdf_stream(pdf) for pdf in unencoded_pdfs]
|
|
|
|
|
|
@pytest.fixture
|
|
def target_data_items(input_data_items, core_operation, metadata):
|
|
|
|
if core_operation is Nothing:
|
|
return Nothing
|
|
|
|
op = compose(normalize_item, core_operation)
|
|
expected = lflatten(starmap(op, zip(input_data_items, metadata)))
|
|
return expected
|
|
|
|
|
|
@pytest.fixture
|
|
def unencoded_strings(n_items):
|
|
return [f"content{i}" for i in range(n_items)]
|
|
|
|
|
|
def strings_to_bytes(strings):
|
|
return [bytes(s, encoding="utf8") for s in strings]
|
|
|
|
|
|
@pytest.fixture
|
|
def targets(data_message_pairs, input_data_items, operation, metadata, server_side_test):
|
|
"""TODO: this has become super wonky"""
|
|
metadata = [{**m1, **m2} for m1, m2 in zip(lmap(second, data_message_pairs), metadata)]
|
|
|
|
if operation is Nothing:
|
|
return Nothing
|
|
|
|
op = compose(lift(star(pack)), normalize_item, operation)
|
|
|
|
try:
|
|
response_data, response_metadata = zip(*map(unpack, flatten(starmap(op, zip(input_data_items, metadata)))))
|
|
|
|
queue_message_keys = ["id"] * (not server_side_test) + [
|
|
*second(first(pair_data_with_queue_message([b""]))).keys()
|
|
]
|
|
response_metadata = lmap(partial(omit, keys=queue_message_keys), response_metadata)
|
|
expected = lzip(response_data, response_metadata)
|
|
|
|
except ValueError:
|
|
expected = []
|
|
|
|
return expected
|
|
|
|
|
|
@pytest.fixture
|
|
def endpoint(url):
|
|
return f"{url}/submit"
|
|
|
|
|
|
@pytest.fixture(params=["rest", "basic"])
|
|
def client_pipeline_type(request):
|
|
return request.param
|
|
|
|
|
|
@pytest.fixture(params=[1, 0, 5])
|
|
def n_items(request):
|
|
return request.param
|
|
|
|
|
|
@pytest.fixture(params=[0, 100])
|
|
def n_pages(request):
|
|
return request.param
|
|
|
|
|
|
@pytest.fixture(params=[1, 5])
|
|
def buffer_size(request):
|
|
return request.param
|
|
|
|
|
|
def array_to_image(array) -> Image.Image:
|
|
return Image.fromarray(np.uint8(array * 255), mode="RGB")
|
|
|
|
|
|
def input_batch(n_items):
|
|
return np.random.random_sample(size=(n_items, 3, 30, 30))
|
|
|
|
|
|
@pytest.fixture
|
|
def unencoded_images(n_items):
|
|
return lmap(array_to_image, input_batch(n_items))
|
|
|
|
|
|
def images_to_bytes(images):
|
|
return lmap(image_to_bytes, images)
|
|
|
|
|
|
@pytest.fixture
|
|
def metadata(n_items):
|
|
return list(repeat({"key": "value"}, times=n_items))
|
|
|
|
|
|
@pytest.fixture
|
|
def packages(input_data_items, metadata):
|
|
return lstarlift(pack)(zip(input_data_items, metadata))
|