integration test for lazy pipeline
This commit is contained in:
parent
fb712af7c6
commit
35542f994c
39
test/integration_tests/serve_test.py
Normal file
39
test/integration_tests/serve_test.py
Normal file
@ -0,0 +1,39 @@
|
||||
import gzip
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
|
||||
from pyinfra.default_objects import get_visitor, get_queue_manager, get_storage, get_consumer, get_callback
|
||||
from pyinfra.visitor import get_object_descriptor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("item_type", ["pdf"])
|
||||
@pytest.mark.parametrize("one_to_many", [False])
|
||||
def test_serving(server_process, items, bucket_name, endpoint):
|
||||
|
||||
callback = get_callback(endpoint)
|
||||
visitor = get_visitor(callback)
|
||||
queue_manager = get_queue_manager()
|
||||
storage = get_storage()
|
||||
consumer = get_consumer(callback)
|
||||
|
||||
queue_manager.clear()
|
||||
storage.clear_bucket(bucket_name)
|
||||
|
||||
for data, message in items:
|
||||
storage.put_object(**get_object_descriptor(message), data=gzip.compress(data))
|
||||
queue_manager.publish_request(message)
|
||||
|
||||
reqs = consumer.consume(inactivity_timeout=5)
|
||||
|
||||
for itm, req in zip(items, reqs):
|
||||
logger.debug(f"Processing item {itm}")
|
||||
queue_manager.publish_response(req, visitor)
|
||||
|
||||
def decode(storage_item):
|
||||
return gzip.decompress(storage_item).decode()
|
||||
|
||||
print([*map(decode, storage.get_all_objects(bucket_name))])
|
||||
print([*storage.get_all_object_names(bucket_name)])
|
||||
Loading…
x
Reference in New Issue
Block a user