pyinfra/test/fixtures/input.py

159 lines
3.9 KiB
Python

from functools import partial
from itertools import starmap, repeat
import numpy as np
import pytest
from PIL import Image
from funcy import lmap, compose, flatten, lflatten, omit, second, first, lzip
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.normalization import normalize_item
from pyinfra.server.packing import pack, unpack
from pyinfra.utils.func import star, lift, lstarlift
from test.utils.image import image_to_bytes
from test.utils.input import pair_data_with_queue_message
from test.utils.pdf import pdf_stream
@pytest.fixture
def data(data_type, pdf):
if data_type == "pdf":
return pdf
elif data_type == "bytestring":
return "content".encode("latin1")
@pytest.fixture
def input_data_items(unencoded_input_data, input_data_encoder):
return input_data_encoder(unencoded_input_data)
@pytest.fixture
def unencoded_input_data(item_type, unencoded_strings, unencoded_images, unencoded_pdfs):
if item_type == "string":
return unencoded_strings
elif item_type == "image":
return unencoded_images
elif item_type == "pdf":
return unencoded_pdfs
else:
raise ValueError(f"Unknown item type {item_type}")
@pytest.fixture
def input_data_encoder(item_type):
if item_type == "string":
return strings_to_bytes
elif item_type == "image":
return images_to_bytes
elif item_type == "pdf":
return pdfs_to_bytes
else:
raise ValueError(f"Unknown item type {item_type}")
@pytest.fixture
def unencoded_pdfs(n_items, unencoded_pdf):
return [unencoded_pdf] * n_items
def pdfs_to_bytes(unencoded_pdfs):
return [pdf_stream(pdf) for pdf in unencoded_pdfs]
@pytest.fixture
def target_data_items(input_data_items, core_operation, metadata):
if core_operation is Nothing:
return Nothing
op = compose(normalize_item, core_operation)
expected = lflatten(starmap(op, zip(input_data_items, metadata)))
return expected
@pytest.fixture
def unencoded_strings(n_items):
return [f"content{i}" for i in range(n_items)]
def strings_to_bytes(strings):
return [bytes(s, encoding="utf8") for s in strings]
@pytest.fixture
def targets(data_message_pairs, input_data_items, operation, metadata, server_side_test):
"""TODO: this has become super wonky"""
metadata = [{**m1, **m2} for m1, m2 in zip(lmap(second, data_message_pairs), metadata)]
if operation is Nothing:
return Nothing
op = compose(lift(star(pack)), normalize_item, operation)
try:
response_data, response_metadata = zip(*map(unpack, flatten(starmap(op, zip(input_data_items, metadata)))))
queue_message_keys = ["id"] * (not server_side_test) + [
*second(first(pair_data_with_queue_message([b""]))).keys()
]
response_metadata = lmap(partial(omit, keys=queue_message_keys), response_metadata)
expected = lzip(response_data, response_metadata)
except ValueError:
expected = []
return expected
@pytest.fixture
def endpoint(url):
return f"{url}/submit"
@pytest.fixture(params=["rest", "basic"])
def client_pipeline_type(request):
return request.param
@pytest.fixture(params=[1, 0, 5])
def n_items(request):
return request.param
@pytest.fixture(params=[0, 100])
def n_pages(request):
return request.param
@pytest.fixture(params=[1, 5])
def buffer_size(request):
return request.param
def array_to_image(array) -> Image.Image:
return Image.fromarray(np.uint8(array * 255), mode="RGB")
def input_batch(n_items):
return np.random.random_sample(size=(n_items, 3, 30, 30))
@pytest.fixture
def unencoded_images(n_items):
return lmap(array_to_image, input_batch(n_items))
def images_to_bytes(images):
return lmap(image_to_bytes, images)
@pytest.fixture
def metadata(n_items):
return list(repeat({"key": "value"}, times=n_items))
@pytest.fixture
def packages(input_data_items, metadata):
return lstarlift(pack)(zip(input_data_items, metadata))