diff --git a/test/fixtures/input.py b/test/fixtures/input.py index f5e5e82..d68e1a6 100644 --- a/test/fixtures/input.py +++ b/test/fixtures/input.py @@ -3,7 +3,7 @@ from itertools import starmap, repeat import numpy as np import pytest from PIL import Image -from funcy import lmap, compose, flatten, lflatten +from funcy import lmap, compose, flatten, lflatten, omit from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.normalization import normalize_item @@ -60,13 +60,13 @@ def pdfs_to_bytes(unencoded_pdfs): @pytest.fixture -def target_data_items(input_data_items, core_operation): +def target_data_items(input_data_items, core_operation, metadata): if core_operation is Nothing: return Nothing op = compose(normalize_item, core_operation) - expected = lflatten(map(op, input_data_items)) + expected = lflatten(starmap(op, zip(input_data_items, metadata))) return expected @@ -134,7 +134,7 @@ def images_to_bytes(images): @pytest.fixture def metadata(n_items): - return list(repeat({"key": "value"}, n_items)) + return list(repeat({"key": "value", "pages": [0, 2, 3]}, n_items)) @pytest.fixture diff --git a/test/fixtures/server.py b/test/fixtures/server.py index c03d05f..1fef605 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -6,10 +6,11 @@ from multiprocessing import Process from typing import Generator import fitz +import funcy import pytest import requests from PIL import Image -from funcy import retry +from funcy import retry, project, omit from waitress import serve from pyinfra.server.dispatcher.dispatcher import Nothing @@ -63,7 +64,8 @@ def operation_conditionally_batched(operation, batched): def operation(core_operation): def op(data, metadata): assert isinstance(metadata, dict) - result = core_operation(data) + result = core_operation(data, metadata) + metadata = omit(metadata, ["pages", "operation"]) if isinstance(result, Generator): return zip(result, repeat(metadata)) else: @@ -76,21 +78,26 @@ def operation(core_operation): @pytest.fixture def core_operation(item_type, one_to_many, analysis_task): - def upper(string: bytes): + + def duplicate(string: bytes, metadata): + for _ in range(2): + yield upper(string, metadata) + + def upper(string: bytes, metadata): return string.decode().upper().encode() - def duplicate(string: bytes): - for _ in range(2): - yield upper(string) + def extract(string: bytes, metadata): + for c in project(dict(enumerate(string.decode())), metadata["pages"]).values(): + yield c.encode() - def rotate(im: bytes): + def rotate(im: bytes, metadata): im = Image.open(io.BytesIO(im)) return image_to_bytes(im.rotate(90)) - def classify(_: bytes): + def classify(_: bytes, metadata): return b"" - def stream_pages(pdf: bytes): + def stream_pages(pdf: bytes, metadata): for i, page in enumerate(fitz.open(stream=pdf)): # yield page.get_pixmap().tobytes("png"), metadata yield f"page_{i}".encode() @@ -101,7 +108,7 @@ def core_operation(item_type, one_to_many, analysis_task): "image": {False: rotate, True: classify}, }, True: { - "string": {False: duplicate}, + "string": {False: extract}, "pdf": {False: stream_pages}, }, } diff --git a/test/integration_tests/serve_test.py b/test/integration_tests/serve_test.py index ea7e158..df6f103 100644 --- a/test/integration_tests/serve_test.py +++ b/test/integration_tests/serve_test.py @@ -4,7 +4,7 @@ from itertools import starmap, repeat, chain from operator import itemgetter import pytest -from funcy import compose, lzip, lpluck +from funcy import compose, lpluck from pyinfra.default_objects import ( get_callback, @@ -14,53 +14,11 @@ from pyinfra.default_objects import ( get_storage, ) from pyinfra.queue.consumer import Consumer -from pyinfra.server.packing import bytes_to_string, unpack, pack +from pyinfra.server.packing import unpack, pack from pyinfra.visitor import get_object_descriptor, QueueVisitor from test.utils.input import pair_data_with_queue_message -@pytest.fixture -def test_components(url, queue_manager, storage): - - callback = get_callback(url) - visitor = QueueVisitor(storage, callback, get_response_strategy(storage)) - consumer = Consumer(visitor, queue_manager) - - return storage, queue_manager, consumer - - -@pytest.fixture -def real_components(url): - callback = get_callback(url) - consumer = get_consumer(callback) - queue_manager = get_queue_manager() - storage = get_storage() - return storage, queue_manager, consumer - - -@pytest.fixture -def components(components_type, real_components, test_components): - if components_type == "real": - return real_components - elif components_type == "test": - return test_components - else: - raise ValueError(f"Unknown components type '{components_type}'.") - - -@pytest.fixture(params=["real", "mixed"]) -def components_type(request): - return request.param - - -def decode(storage_item): - storage_item = json.loads(gzip.decompress(storage_item).decode()) - if not isinstance(storage_item, list): - storage_item = [storage_item] - - yield from map(unpack, storage_item) - - @pytest.mark.parametrize( "one_to_many", [ @@ -78,13 +36,6 @@ def decode(storage_item): @pytest.mark.parametrize("n_items", [1, 3]) @pytest.mark.parametrize("n_pages", [2]) @pytest.mark.parametrize("buffer_size", [2]) -@pytest.mark.parametrize( - "storage_item_has_metadata", - [ - True, - False, - ], -) @pytest.mark.parametrize( "item_type", [ @@ -117,41 +68,88 @@ def decode(storage_item): "real", ], ) -def test_serving( - server_process, - input_data_items, - metadata, - bucket_name, - components, - storage_item_has_metadata, - target_data_items, - targets, -): +def test_serving(server_process, bucket_name, components, targets, data_message_pairs, n_items): + + storage, queue_manager, consumer = components + + upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, data_message_pairs) + consumer.consume_and_publish(n=n_items) + outputs = get_data_uploaded_by_consumer(queue_manager, storage, bucket_name) + + targets = sorted(targets, key=itemgetter(0)) + assert outputs == targets + + +@pytest.fixture +def data_message_pairs(input_data_items, metadata): + data_metadata_packs = starmap(compose(lambda s: s.encode(), json.dumps, pack), zip(input_data_items, metadata)) + data_message_pairs = pair_data_with_queue_message(data_metadata_packs) + return data_message_pairs + + +def upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, data_message_pairs): + for data, message in data_message_pairs: + upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message) + + +def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message): + storage.put_object(**get_object_descriptor(message), data=gzip.compress(data)) + queue_manager.publish_request(message) + + +def get_data_uploaded_by_consumer(queue_manager, storage, bucket_name): + names_of_uploaded_files = lpluck("responseFile", queue_manager.output_queue.to_list()) + uploaded_files = starmap(storage.get_object, zip(repeat(bucket_name), names_of_uploaded_files)) + outputs = sorted(chain(*map(decode, uploaded_files)), key=itemgetter(0)) + return outputs + + +@pytest.fixture +def components(components_type, real_components, test_components, bucket_name): + if components_type == "real": + components = real_components + elif components_type == "test": + components = test_components + else: + raise ValueError(f"Unknown components type '{components_type}'.") + storage, queue_manager, consumer = components queue_manager.clear() storage.clear_bucket(bucket_name) - if storage_item_has_metadata: - data_metadata_packs = starmap(compose(lambda s: s.encode(), json.dumps, pack), zip(input_data_items, metadata)) - else: - data_metadata_packs = map(compose(lambda s: s.encode(), json.dumps, bytes_to_string), input_data_items) - metadata = repeat({}) - targets = lzip(target_data_items, metadata) - - targets = sorted(targets, key=itemgetter(0)) - - data_message_pairs = pair_data_with_queue_message(data_metadata_packs) - - for data, message in data_message_pairs: - storage.put_object(**get_object_descriptor(message), data=gzip.compress(data)) - queue_manager.publish_request(message) - - consumer.consume_and_publish(n=len(data_message_pairs)) - - names_of_uploaded_files = lpluck("responseFile", queue_manager.output_queue.to_list()) - uploaded_files = starmap(storage.get_object, zip(repeat(bucket_name), names_of_uploaded_files)) - outputs = sorted(chain(*map(decode, uploaded_files)), key=itemgetter(0)) - assert outputs == targets + yield storage, queue_manager, consumer storage.clear_bucket(bucket_name) + + +def decode(storage_item): + storage_item = json.loads(gzip.decompress(storage_item).decode()) + if not isinstance(storage_item, list): + storage_item = [storage_item] + + yield from map(unpack, storage_item) + + +@pytest.fixture(params=["real", "mixed"]) +def components_type(request): + return request.param + + +@pytest.fixture +def real_components(url): + callback = get_callback(url) + consumer = get_consumer(callback) + queue_manager = get_queue_manager() + storage = get_storage() + return storage, queue_manager, consumer + + +@pytest.fixture +def test_components(url, queue_manager, storage): + + callback = get_callback(url) + visitor = QueueVisitor(storage, callback, get_response_strategy(storage)) + consumer = Consumer(visitor, queue_manager) + + return storage, queue_manager, consumer diff --git a/test/utils/input.py b/test/utils/input.py index 40a1e0f..daf754c 100644 --- a/test/utils/input.py +++ b/test/utils/input.py @@ -9,7 +9,6 @@ def pair_data_with_queue_message(data: Iterable[bytes]): "fileId": f"file{i}", "targetFileExtension": "in.gz", "responseFileExtension": "out.gz", - "pages": [] } yield d, body