added extraction test case (with page index) to serving test

This commit is contained in:
Matthias Bisping 2022-06-03 13:13:45 +02:00
parent e7ee0cda42
commit eb81e96400
4 changed files with 101 additions and 97 deletions

View File

@ -3,7 +3,7 @@ from itertools import starmap, repeat
import numpy as np
import pytest
from PIL import Image
from funcy import lmap, compose, flatten, lflatten
from funcy import lmap, compose, flatten, lflatten, omit
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.normalization import normalize_item
@ -60,13 +60,13 @@ def pdfs_to_bytes(unencoded_pdfs):
@pytest.fixture
def target_data_items(input_data_items, core_operation):
def target_data_items(input_data_items, core_operation, metadata):
if core_operation is Nothing:
return Nothing
op = compose(normalize_item, core_operation)
expected = lflatten(map(op, input_data_items))
expected = lflatten(starmap(op, zip(input_data_items, metadata)))
return expected
@ -134,7 +134,7 @@ def images_to_bytes(images):
@pytest.fixture
def metadata(n_items):
return list(repeat({"key": "value"}, n_items))
return list(repeat({"key": "value", "pages": [0, 2, 3]}, n_items))
@pytest.fixture

View File

@ -6,10 +6,11 @@ from multiprocessing import Process
from typing import Generator
import fitz
import funcy
import pytest
import requests
from PIL import Image
from funcy import retry
from funcy import retry, project, omit
from waitress import serve
from pyinfra.server.dispatcher.dispatcher import Nothing
@ -63,7 +64,8 @@ def operation_conditionally_batched(operation, batched):
def operation(core_operation):
def op(data, metadata):
assert isinstance(metadata, dict)
result = core_operation(data)
result = core_operation(data, metadata)
metadata = omit(metadata, ["pages", "operation"])
if isinstance(result, Generator):
return zip(result, repeat(metadata))
else:
@ -76,21 +78,26 @@ def operation(core_operation):
@pytest.fixture
def core_operation(item_type, one_to_many, analysis_task):
def upper(string: bytes):
def duplicate(string: bytes, metadata):
for _ in range(2):
yield upper(string, metadata)
def upper(string: bytes, metadata):
return string.decode().upper().encode()
def duplicate(string: bytes):
for _ in range(2):
yield upper(string)
def extract(string: bytes, metadata):
for c in project(dict(enumerate(string.decode())), metadata["pages"]).values():
yield c.encode()
def rotate(im: bytes):
def rotate(im: bytes, metadata):
im = Image.open(io.BytesIO(im))
return image_to_bytes(im.rotate(90))
def classify(_: bytes):
def classify(_: bytes, metadata):
return b""
def stream_pages(pdf: bytes):
def stream_pages(pdf: bytes, metadata):
for i, page in enumerate(fitz.open(stream=pdf)):
# yield page.get_pixmap().tobytes("png"), metadata
yield f"page_{i}".encode()
@ -101,7 +108,7 @@ def core_operation(item_type, one_to_many, analysis_task):
"image": {False: rotate, True: classify},
},
True: {
"string": {False: duplicate},
"string": {False: extract},
"pdf": {False: stream_pages},
},
}

View File

@ -4,7 +4,7 @@ from itertools import starmap, repeat, chain
from operator import itemgetter
import pytest
from funcy import compose, lzip, lpluck
from funcy import compose, lpluck
from pyinfra.default_objects import (
get_callback,
@ -14,53 +14,11 @@ from pyinfra.default_objects import (
get_storage,
)
from pyinfra.queue.consumer import Consumer
from pyinfra.server.packing import bytes_to_string, unpack, pack
from pyinfra.server.packing import unpack, pack
from pyinfra.visitor import get_object_descriptor, QueueVisitor
from test.utils.input import pair_data_with_queue_message
@pytest.fixture
def test_components(url, queue_manager, storage):
callback = get_callback(url)
visitor = QueueVisitor(storage, callback, get_response_strategy(storage))
consumer = Consumer(visitor, queue_manager)
return storage, queue_manager, consumer
@pytest.fixture
def real_components(url):
callback = get_callback(url)
consumer = get_consumer(callback)
queue_manager = get_queue_manager()
storage = get_storage()
return storage, queue_manager, consumer
@pytest.fixture
def components(components_type, real_components, test_components):
if components_type == "real":
return real_components
elif components_type == "test":
return test_components
else:
raise ValueError(f"Unknown components type '{components_type}'.")
@pytest.fixture(params=["real", "mixed"])
def components_type(request):
return request.param
def decode(storage_item):
storage_item = json.loads(gzip.decompress(storage_item).decode())
if not isinstance(storage_item, list):
storage_item = [storage_item]
yield from map(unpack, storage_item)
@pytest.mark.parametrize(
"one_to_many",
[
@ -78,13 +36,6 @@ def decode(storage_item):
@pytest.mark.parametrize("n_items", [1, 3])
@pytest.mark.parametrize("n_pages", [2])
@pytest.mark.parametrize("buffer_size", [2])
@pytest.mark.parametrize(
"storage_item_has_metadata",
[
True,
False,
],
)
@pytest.mark.parametrize(
"item_type",
[
@ -117,41 +68,88 @@ def decode(storage_item):
"real",
],
)
def test_serving(
server_process,
input_data_items,
metadata,
bucket_name,
components,
storage_item_has_metadata,
target_data_items,
targets,
):
def test_serving(server_process, bucket_name, components, targets, data_message_pairs, n_items):
storage, queue_manager, consumer = components
upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, data_message_pairs)
consumer.consume_and_publish(n=n_items)
outputs = get_data_uploaded_by_consumer(queue_manager, storage, bucket_name)
targets = sorted(targets, key=itemgetter(0))
assert outputs == targets
@pytest.fixture
def data_message_pairs(input_data_items, metadata):
data_metadata_packs = starmap(compose(lambda s: s.encode(), json.dumps, pack), zip(input_data_items, metadata))
data_message_pairs = pair_data_with_queue_message(data_metadata_packs)
return data_message_pairs
def upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, data_message_pairs):
for data, message in data_message_pairs:
upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message)
def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message):
storage.put_object(**get_object_descriptor(message), data=gzip.compress(data))
queue_manager.publish_request(message)
def get_data_uploaded_by_consumer(queue_manager, storage, bucket_name):
names_of_uploaded_files = lpluck("responseFile", queue_manager.output_queue.to_list())
uploaded_files = starmap(storage.get_object, zip(repeat(bucket_name), names_of_uploaded_files))
outputs = sorted(chain(*map(decode, uploaded_files)), key=itemgetter(0))
return outputs
@pytest.fixture
def components(components_type, real_components, test_components, bucket_name):
if components_type == "real":
components = real_components
elif components_type == "test":
components = test_components
else:
raise ValueError(f"Unknown components type '{components_type}'.")
storage, queue_manager, consumer = components
queue_manager.clear()
storage.clear_bucket(bucket_name)
if storage_item_has_metadata:
data_metadata_packs = starmap(compose(lambda s: s.encode(), json.dumps, pack), zip(input_data_items, metadata))
else:
data_metadata_packs = map(compose(lambda s: s.encode(), json.dumps, bytes_to_string), input_data_items)
metadata = repeat({})
targets = lzip(target_data_items, metadata)
targets = sorted(targets, key=itemgetter(0))
data_message_pairs = pair_data_with_queue_message(data_metadata_packs)
for data, message in data_message_pairs:
storage.put_object(**get_object_descriptor(message), data=gzip.compress(data))
queue_manager.publish_request(message)
consumer.consume_and_publish(n=len(data_message_pairs))
names_of_uploaded_files = lpluck("responseFile", queue_manager.output_queue.to_list())
uploaded_files = starmap(storage.get_object, zip(repeat(bucket_name), names_of_uploaded_files))
outputs = sorted(chain(*map(decode, uploaded_files)), key=itemgetter(0))
assert outputs == targets
yield storage, queue_manager, consumer
storage.clear_bucket(bucket_name)
def decode(storage_item):
storage_item = json.loads(gzip.decompress(storage_item).decode())
if not isinstance(storage_item, list):
storage_item = [storage_item]
yield from map(unpack, storage_item)
@pytest.fixture(params=["real", "mixed"])
def components_type(request):
return request.param
@pytest.fixture
def real_components(url):
callback = get_callback(url)
consumer = get_consumer(callback)
queue_manager = get_queue_manager()
storage = get_storage()
return storage, queue_manager, consumer
@pytest.fixture
def test_components(url, queue_manager, storage):
callback = get_callback(url)
visitor = QueueVisitor(storage, callback, get_response_strategy(storage))
consumer = Consumer(visitor, queue_manager)
return storage, queue_manager, consumer

View File

@ -9,7 +9,6 @@ def pair_data_with_queue_message(data: Iterable[bytes]):
"fileId": f"file{i}",
"targetFileExtension": "in.gz",
"responseFileExtension": "out.gz",
"pages": []
}
yield d, body