Merge in RR/pyinfra from image-prediction-v2-support to 2.0.0
Squashed commit of the following:
commit 37c536324e847357e86dd9b72d1e07ad792ed90f
Merge: 77d1db8 01bfb1d
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Mon Jul 11 13:53:56 2022 +0200
Merge branch '2.0.0' of ssh://git.iqser.com:2222/rr/pyinfra into image-prediction-v2-support
commit 77d1db8e8630de8822c124eb39f4cd817ed1d3e1
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Mon Jul 11 13:07:41 2022 +0200
add operation assignment via config if operation is not defined by caller
commit 36c8ca48a8c6151f713c093a23de110901ba6b02
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Mon Jul 11 10:33:34 2022 +0200
refactor nothing part 2
commit f6cd0ef986802554dd544b9b7a24073d3b3f05b5
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Mon Jul 11 10:28:49 2022 +0200
refactor nothing
commit 1e70d49531e89613c70903be49290b94ee014f65
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Jul 6 17:42:12 2022 +0200
enable docker-compose fixture
commit 9fee32cecdd120cfac3e065fb8ad2b4f37b49226
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Jul 6 17:40:35 2022 +0200
added 'multi' key to actual operation configurations
commit 4287f6d9878dd361489b8490eafd06f81df472ce
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Jul 6 16:56:12 2022 +0200
removed debug prints
commit 23a533e8f99222c7e598fb0864f65e9aa3508a3b
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Jul 6 16:31:50 2022 +0200
completed correcting / cleaning upload and download logic with regard to operations and ids. next: remove debug code
commit 33246d1ff94989d2ea70242c7ae2e58afa4d35c1
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Jul 6 14:37:17 2022 +0200
corrected / cleaned upload and download logic with regard to operations and ids
commit 7f2b4e882022c6843cb2f80df202caa495c54ee9
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Tue Jul 5 18:41:07 2022 +0200
partially decomplected file descriptor manager from concrete and non-generic descriptor code
commit 40b892da17670dae3b8eba1700877c1dcf219852
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Tue Jul 5 09:53:46 2022 +0200
typo
commit ec4fa8e6f4551ff1f8d4f78c484b7a260f274898
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Tue Jul 5 09:52:41 2022 +0200
typo
commit 701b43403c328161fd96a73ce388a66035cca348
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Mon Jul 4 17:26:53 2022 +0200
made adjustments for image classification with pyinfra 2.x; added related fixmes
commit 7a794bdcc987631cdc4d89b5620359464e2e018e
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Mon Jul 4 13:05:26 2022 +0200
removed obsolete imports
commit 3fc6a7ef5d0172dbce1c4292d245eced2f378b5a
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Mon Jul 4 11:47:12 2022 +0200
enable docker-compose fixture
commit 36d8d3bc851b06d94cf12a73048a00a67ef79c42
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Mon Jul 4 11:46:53 2022 +0200
renaming
commit 3bf00d11cd041dff325b66f13fcd00d3ce96b8b5
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Jun 30 12:47:57 2022 +0200
refactoring: added cached pipeline factory
commit 90e735852af2f86e35be845fabf28494de952edb
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Jun 29 13:47:08 2022 +0200
renaming
commit 93b3d4b202b41183ed8cabe193a4bfa03f520787
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Jun 29 13:25:03 2022 +0200
further refactored server setup code: moving and decomplecting
commit 8b2ed83c7ade5bd811cb045d56fbfb0353fa385e
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Jun 29 12:53:09 2022 +0200
refactored server setup code: factored out and decoupled operation registry and prometheus summary registry
... and 6 more commits
243 lines
6.8 KiB
Python
243 lines
6.8 KiB
Python
import json
|
|
import re
|
|
from itertools import starmap, repeat, chain
|
|
from operator import itemgetter
|
|
|
|
import pytest
|
|
from funcy import compose, first, second, pluck, lflatten, lzip
|
|
|
|
from pyinfra.config import CONFIG
|
|
from pyinfra.default_objects import get_component_factory
|
|
from pyinfra.server.packing import unpack, pack
|
|
from pyinfra.utils.encoding import compress, decompress
|
|
from test.config import CONFIG as TEST_CONFIG
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"batched",
|
|
[
|
|
False,
|
|
True,
|
|
],
|
|
)
|
|
@pytest.mark.parametrize(
|
|
"one_to_many",
|
|
[
|
|
False,
|
|
True,
|
|
],
|
|
)
|
|
@pytest.mark.parametrize(
|
|
"analysis_task",
|
|
[
|
|
False,
|
|
True,
|
|
],
|
|
)
|
|
@pytest.mark.parametrize(
|
|
"n_items",
|
|
[
|
|
# 1,
|
|
3,
|
|
],
|
|
)
|
|
@pytest.mark.parametrize("n_pages", [2])
|
|
@pytest.mark.parametrize("buffer_size", [1, 2])
|
|
@pytest.mark.parametrize(
|
|
"item_type",
|
|
[
|
|
"string",
|
|
"image",
|
|
"pdf",
|
|
],
|
|
)
|
|
@pytest.mark.parametrize(
|
|
"queue_manager_name",
|
|
[
|
|
# "mock",
|
|
"pika",
|
|
],
|
|
scope="session",
|
|
)
|
|
@pytest.mark.parametrize(
|
|
"client_name",
|
|
[
|
|
"mock",
|
|
# "s3",
|
|
# "azure",
|
|
],
|
|
scope="session",
|
|
)
|
|
@pytest.mark.parametrize(
|
|
"components_type",
|
|
[
|
|
# "test",
|
|
"real",
|
|
],
|
|
)
|
|
@pytest.mark.parametrize(
|
|
"many_to_n",
|
|
[
|
|
True,
|
|
False,
|
|
],
|
|
)
|
|
def test_serving(
|
|
operation_name, server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_n
|
|
):
|
|
|
|
storage, queue_manager, consumer, file_descriptor_manager = components
|
|
|
|
assert queue_manager.input_queue.to_list() == []
|
|
assert queue_manager.output_queue.to_list() == []
|
|
assert [*storage.get_all_object_names(bucket_name)] == []
|
|
|
|
if n_items:
|
|
assert data_message_pairs
|
|
|
|
if many_to_n:
|
|
upload_data_to_folder_in_storage_and_publish_single_request_to_queue(
|
|
storage, queue_manager, data_message_pairs, file_descriptor_manager
|
|
)
|
|
else:
|
|
upload_data_to_storage_and_publish_requests_to_queue(
|
|
storage, queue_manager, data_message_pairs, file_descriptor_manager
|
|
)
|
|
|
|
consumer.consume_and_publish(n=int(many_to_n) or n_items)
|
|
|
|
outputs = get_data_uploaded_by_consumer(queue_manager, storage, bucket_name)
|
|
|
|
# TODO: correctness of target should be validated as well, since production has become non-trivial
|
|
assert sorted(outputs) == sorted(targets)
|
|
|
|
|
|
@pytest.fixture
|
|
def data_metadata_packs(input_data_items, metadata):
|
|
return list(starmap(compose(lambda s: s.encode(), json.dumps, pack), zip(input_data_items, metadata)))
|
|
|
|
|
|
@pytest.fixture
|
|
def data_message_pairs(data_metadata_packs, queue_message_metadata):
|
|
return lzip(data_metadata_packs, queue_message_metadata)
|
|
|
|
|
|
# TODO: refactor; too many params
|
|
def upload_data_to_storage_and_publish_requests_to_queue(
|
|
storage, queue_manager, data_message_pairs, file_descriptor_manager
|
|
):
|
|
for data, message in data_message_pairs:
|
|
upload_data_to_storage_and_publish_request_to_queue(
|
|
storage, queue_manager, data, message, file_descriptor_manager
|
|
)
|
|
|
|
|
|
# TODO: refactor; too many params
|
|
def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message, file_descriptor_manager):
|
|
storage.put_object(**file_descriptor_manager.get_input_object_descriptor(message), data=compress(data))
|
|
queue_manager.publish_request(message)
|
|
|
|
|
|
# TODO: refactor body; too long and scripty
|
|
def upload_data_to_folder_in_storage_and_publish_single_request_to_queue(
|
|
storage, queue_manager, data_message_pairs, file_descriptor_manager
|
|
):
|
|
assert data_message_pairs
|
|
|
|
ref_message = second(first(data_message_pairs))
|
|
pages = ref_message["pages"]
|
|
|
|
for data, page in zip(map(first, data_message_pairs), pages):
|
|
object_descriptor = file_descriptor_manager.get_input_object_descriptor({**ref_message, "id": page})
|
|
object_descriptor["object_name"] = build_filepath(object_descriptor, page)
|
|
|
|
storage.put_object(**object_descriptor, data=compress(data))
|
|
|
|
queue_manager.publish_request(ref_message)
|
|
|
|
|
|
def build_filepath(object_descriptor, page):
|
|
object_name = object_descriptor["object_name"]
|
|
parts = object_name.split("/")
|
|
path = "/".join(parts)
|
|
path = re.sub(r"id:\d", f"id:{page}", path)
|
|
|
|
return path
|
|
|
|
|
|
def get_data_uploaded_by_consumer(queue_manager, storage, bucket_name):
|
|
names_of_uploaded_files = lflatten(pluck("response_files", queue_manager.output_queue.to_list()))
|
|
uploaded_files = starmap(storage.get_object, zip(repeat(bucket_name), names_of_uploaded_files))
|
|
outputs = sorted(chain(*map(decode, uploaded_files)), key=itemgetter(0))
|
|
return outputs
|
|
|
|
|
|
@pytest.fixture
|
|
def components(components_type, real_components, test_components, bucket_name):
|
|
if components_type == "real":
|
|
components = real_components
|
|
elif components_type == "test":
|
|
components = test_components
|
|
else:
|
|
raise ValueError(f"Unknown component type '{components_type}'.")
|
|
|
|
storage, queue_manager, consumer, file_descriptor_manager = components
|
|
|
|
queue_manager.clear()
|
|
storage.make_bucket(bucket_name)
|
|
storage.clear_bucket(bucket_name)
|
|
|
|
yield storage, queue_manager, consumer, file_descriptor_manager
|
|
|
|
queue_manager.clear()
|
|
storage.clear_bucket(bucket_name)
|
|
|
|
|
|
def decode(storage_item):
|
|
storage_item = json.loads(decompress(storage_item).decode())
|
|
if not isinstance(storage_item, list):
|
|
storage_item = [storage_item]
|
|
|
|
yield from map(unpack, storage_item)
|
|
|
|
|
|
@pytest.fixture(params=["real", "mixed"])
|
|
def components_type(request):
|
|
return request.param
|
|
|
|
|
|
@pytest.fixture
|
|
def real_components(url):
|
|
|
|
CONFIG["service"]["operations"] = TEST_CONFIG.service.operations
|
|
CONFIG["service"]["response_formatter"] = TEST_CONFIG.service.response_formatter
|
|
CONFIG["service"]["upload_formatter"] = "identity"
|
|
|
|
component_factory = get_component_factory(CONFIG)
|
|
|
|
callback = component_factory.get_callback(url)
|
|
consumer = component_factory.get_consumer(callback)
|
|
queue_manager = component_factory.get_queue_manager()
|
|
storage = component_factory.get_storage()
|
|
file_descriptor_manager = component_factory.get_file_descriptor_manager()
|
|
|
|
return storage, queue_manager, consumer, file_descriptor_manager
|
|
|
|
|
|
@pytest.fixture
|
|
def test_components(url, queue_manager, storage):
|
|
pass
|
|
#
|
|
# component_factory = ComponentFactory(CONFIG)
|
|
#
|
|
# file_descriptor_manager = component_factory.get_file_descriptor_manager()
|
|
#
|
|
# visitor = QueueVisitor(
|
|
# storage=storage,
|
|
# callback=component_factory.get_callback(url),
|
|
# response_strategy=component_factory.get_response_strategy(storage),
|
|
# )
|
|
# consumer = Consumer(visitor, queue_manager)
|
|
#
|
|
# return storage, queue_manager, consumer, file_descriptor_manager
|