From 35542f994c9ad358d3967df36af309f9b8cdb595 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Wed, 18 May 2022 09:24:12 +0200 Subject: [PATCH] integration test for lazy pipeline --- test/integration_tests/serve_test.py | 39 ++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 test/integration_tests/serve_test.py diff --git a/test/integration_tests/serve_test.py b/test/integration_tests/serve_test.py new file mode 100644 index 0000000..faa6164 --- /dev/null +++ b/test/integration_tests/serve_test.py @@ -0,0 +1,39 @@ +import gzip +import logging + +import pytest + +from pyinfra.default_objects import get_visitor, get_queue_manager, get_storage, get_consumer, get_callback +from pyinfra.visitor import get_object_descriptor + +logger = logging.getLogger(__name__) + + +@pytest.mark.parametrize("item_type", ["pdf"]) +@pytest.mark.parametrize("one_to_many", [False]) +def test_serving(server_process, items, bucket_name, endpoint): + + callback = get_callback(endpoint) + visitor = get_visitor(callback) + queue_manager = get_queue_manager() + storage = get_storage() + consumer = get_consumer(callback) + + queue_manager.clear() + storage.clear_bucket(bucket_name) + + for data, message in items: + storage.put_object(**get_object_descriptor(message), data=gzip.compress(data)) + queue_manager.publish_request(message) + + reqs = consumer.consume(inactivity_timeout=5) + + for itm, req in zip(items, reqs): + logger.debug(f"Processing item {itm}") + queue_manager.publish_response(req, visitor) + + def decode(storage_item): + return gzip.decompress(storage_item).decode() + + print([*map(decode, storage.get_all_objects(bucket_name))]) + print([*storage.get_all_object_names(bucket_name)])