refactoring
This commit is contained in:
parent
54ca81d577
commit
426967ee46
@ -3,19 +3,32 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from funcy import notnone
|
from funcy import notnone, filter, lfilter
|
||||||
|
|
||||||
from pyinfra.default_objects import get_visitor, get_queue_manager, get_storage, get_consumer, get_callback
|
from pyinfra.default_objects import get_visitor, get_queue_manager, get_storage, get_consumer, get_callback
|
||||||
from pyinfra.server.packing import bytes_to_string, string_to_bytes
|
from pyinfra.server.dispatcher.dispatcher import Nothing
|
||||||
|
from pyinfra.server.packing import string_to_bytes
|
||||||
from pyinfra.visitor import get_object_descriptor
|
from pyinfra.visitor import get_object_descriptor
|
||||||
from test.utils.input import adorn_data_with_storage_info
|
from test.utils.input import adorn_data_with_storage_info
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("item_type", ["string"])
|
|
||||||
@pytest.mark.parametrize("one_to_many", [True])
|
@pytest.mark.parametrize("one_to_many", [True])
|
||||||
def test_serving(server_process, input_data_items, bucket_name, endpoint):
|
@pytest.mark.parametrize("analysis_task", [False])
|
||||||
|
@pytest.mark.parametrize("n_items", [1])
|
||||||
|
@pytest.mark.parametrize("n_pages", [1])
|
||||||
|
@pytest.mark.parametrize("buffer_size", [2])
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"item_type",
|
||||||
|
[
|
||||||
|
# "string",
|
||||||
|
"image",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_serving(server_process, input_data_items, bucket_name, endpoint, core_operation):
|
||||||
|
print()
|
||||||
|
print(core_operation.__name__)
|
||||||
|
|
||||||
callback = get_callback(endpoint)
|
callback = get_callback(endpoint)
|
||||||
visitor = get_visitor(callback)
|
visitor = get_visitor(callback)
|
||||||
@ -36,16 +49,22 @@ def test_serving(server_process, input_data_items, bucket_name, endpoint):
|
|||||||
|
|
||||||
for itm, req in zip(items, reqs):
|
for itm, req in zip(items, reqs):
|
||||||
logger.debug(f"Processing item {itm}")
|
logger.debug(f"Processing item {itm}")
|
||||||
|
print(f"Processing item")
|
||||||
queue_manager.publish_response(req, visitor)
|
queue_manager.publish_response(req, visitor)
|
||||||
|
|
||||||
def decode(storage_item):
|
def decode(storage_item):
|
||||||
repr = gzip.decompress(storage_item).decode().replace(r'\"', "'").replace('"', "").replace("'", '"')
|
repr = gzip.decompress(storage_item).decode().replace(r"\"", "'").replace('"', "").replace("'", '"')
|
||||||
try:
|
try:
|
||||||
return json.loads(repr)
|
return json.loads(repr)
|
||||||
except json.decoder.JSONDecodeError:
|
except json.decoder.JSONDecodeError:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
print()
|
|
||||||
for storage_item in [*filter(notnone, map(decode, storage.get_all_objects(bucket_name)))]:
|
print(list(storage.get_all_object_names(bucket_name)))
|
||||||
|
names_of_uploaded_files = lfilter(".out", storage.get_all_object_names(bucket_name))
|
||||||
|
uploaded_files = [storage.get_object(bucket_name, fn) for fn in names_of_uploaded_files]
|
||||||
|
print(names_of_uploaded_files)
|
||||||
|
|
||||||
|
for storage_item in [*map(decode, uploaded_files)]:
|
||||||
storage_item["data"] = string_to_bytes(storage_item["data"])
|
storage_item["data"] = string_to_bytes(storage_item["data"])
|
||||||
print(storage_item)
|
print(storage_item)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user