Matthias Bisping 426967ee46 refactoring
2022-05-23 11:39:30 +02:00

71 lines
2.3 KiB
Python

import gzip
import json
import logging
import pytest
from funcy import notnone, filter, lfilter
from pyinfra.default_objects import get_visitor, get_queue_manager, get_storage, get_consumer, get_callback
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.packing import string_to_bytes
from pyinfra.visitor import get_object_descriptor
from test.utils.input import adorn_data_with_storage_info
logger = logging.getLogger(__name__)
@pytest.mark.parametrize("one_to_many", [True])
@pytest.mark.parametrize("analysis_task", [False])
@pytest.mark.parametrize("n_items", [1])
@pytest.mark.parametrize("n_pages", [1])
@pytest.mark.parametrize("buffer_size", [2])
@pytest.mark.parametrize(
"item_type",
[
# "string",
"image",
],
)
def test_serving(server_process, input_data_items, bucket_name, endpoint, core_operation):
print()
print(core_operation.__name__)
callback = get_callback(endpoint)
visitor = get_visitor(callback)
queue_manager = get_queue_manager()
storage = get_storage()
consumer = get_consumer(callback)
queue_manager.clear()
storage.clear_bucket(bucket_name)
items = adorn_data_with_storage_info(input_data_items)
for data, message in items:
storage.put_object(**get_object_descriptor(message), data=gzip.compress(data))
queue_manager.publish_request(message)
reqs = consumer.consume(inactivity_timeout=5)
for itm, req in zip(items, reqs):
logger.debug(f"Processing item {itm}")
print(f"Processing item")
queue_manager.publish_response(req, visitor)
def decode(storage_item):
repr = gzip.decompress(storage_item).decode().replace(r"\"", "'").replace('"', "").replace("'", '"')
try:
return json.loads(repr)
except json.decoder.JSONDecodeError:
return None
print(list(storage.get_all_object_names(bucket_name)))
names_of_uploaded_files = lfilter(".out", storage.get_all_object_names(bucket_name))
uploaded_files = [storage.get_object(bucket_name, fn) for fn in names_of_uploaded_files]
print(names_of_uploaded_files)
for storage_item in [*map(decode, uploaded_files)]:
storage_item["data"] = string_to_bytes(storage_item["data"])
print(storage_item)