90 lines
2.6 KiB
Python
90 lines
2.6 KiB
Python
import gzip
|
|
import json
|
|
import logging
|
|
from itertools import starmap, repeat
|
|
|
|
import pytest
|
|
from funcy import lfilter, lmap, compose, lzip
|
|
|
|
from pyinfra.default_objects import get_visitor, get_queue_manager, get_storage, get_consumer, get_callback
|
|
from pyinfra.server.packing import bytes_to_string, unpack, pack
|
|
from pyinfra.visitor import get_object_descriptor
|
|
from test.utils.input import adorn_data_with_storage_info
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@pytest.mark.parametrize("one_to_many", [False, True])
|
|
@pytest.mark.parametrize("analysis_task", [False])
|
|
@pytest.mark.parametrize("n_items", [2])
|
|
@pytest.mark.parametrize("n_pages", [1])
|
|
@pytest.mark.parametrize("buffer_size", [2])
|
|
@pytest.mark.parametrize(
|
|
"storage_item_has_metadata",
|
|
[
|
|
True,
|
|
False
|
|
],
|
|
)
|
|
@pytest.mark.parametrize(
|
|
"item_type",
|
|
[
|
|
"string",
|
|
"image",
|
|
"pdf",
|
|
],
|
|
)
|
|
def test_serving(
|
|
server_process,
|
|
input_data_items,
|
|
metadata,
|
|
bucket_name,
|
|
endpoint,
|
|
core_operation,
|
|
storage_item_has_metadata,
|
|
target_data_items,
|
|
targets,
|
|
):
|
|
print()
|
|
|
|
callback = get_callback(endpoint)
|
|
visitor = get_visitor(callback)
|
|
queue_manager = get_queue_manager()
|
|
storage = get_storage()
|
|
consumer = get_consumer(callback)
|
|
|
|
queue_manager.clear()
|
|
storage.clear_bucket(bucket_name)
|
|
|
|
if storage_item_has_metadata:
|
|
data_metadata_packs = starmap(compose(lambda s: s.encode(), json.dumps, pack), zip(input_data_items, metadata))
|
|
else:
|
|
data_metadata_packs = map(compose(lambda s: s.encode(), json.dumps, bytes_to_string), input_data_items)
|
|
metadata = repeat({})
|
|
targets = lzip(target_data_items, metadata)
|
|
|
|
adorned_data_metadata_packs = adorn_data_with_storage_info(data_metadata_packs)
|
|
|
|
for data, message in adorned_data_metadata_packs:
|
|
storage.put_object(**get_object_descriptor(message), data=gzip.compress(data))
|
|
queue_manager.publish_request(message)
|
|
|
|
reqs = consumer.consume(inactivity_timeout=5)
|
|
|
|
for itm, req in zip(adorned_data_metadata_packs, reqs):
|
|
queue_manager.publish_response(req, visitor)
|
|
|
|
def decode(storage_item):
|
|
repr = gzip.decompress(storage_item).decode().replace(r"\"", "'").replace('"', "").replace("'", '"')
|
|
storage_item = json.loads(repr)
|
|
data, metadata = unpack(storage_item)
|
|
return data, metadata
|
|
|
|
names_of_uploaded_files = lfilter(".out", storage.get_all_object_names(bucket_name))
|
|
uploaded_files = [storage.get_object(bucket_name, fn) for fn in names_of_uploaded_files]
|
|
|
|
outputs = lmap(decode, uploaded_files)
|
|
print(outputs)
|
|
print(targets)
|
|
assert outputs == targets
|