From 64871bbb6280423a306ffba7ddebf28850053c4c Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Mon, 15 Jan 2024 10:30:07 +0100 Subject: [PATCH] refactor: add basic queue manager test --- tests/tests_with_docker_compose/queue_test.py | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 tests/tests_with_docker_compose/queue_test.py diff --git a/tests/tests_with_docker_compose/queue_test.py b/tests/tests_with_docker_compose/queue_test.py new file mode 100644 index 0000000..d293eef --- /dev/null +++ b/tests/tests_with_docker_compose/queue_test.py @@ -0,0 +1,38 @@ +import gzip +import json +from multiprocessing import Process +from time import sleep + +from kn_utils.logging import logger + +from pyinfra.queue.development_queue_manager import DevelopmentQueueManager +from pyinfra.queue.queue_manager import QueueManager + + +class TestQueueManager: + def test_basic_functionality(self, settings): + settings.rabbitmq_heartbeat = 7200 + development_queue_manager = DevelopmentQueueManager(settings) + + message = { + "targetFilePath": "test/target.json.gz", + "responseFilePath": "test/response.json.gz", + } + + development_queue_manager.publish_request(message) + + queue_manager = QueueManager(settings) + + consume = lambda: queue_manager.start_consuming(lambda x: x) + p = Process(target=consume) + p.start() + + wait_time = 1 + logger.info(f"Waiting {wait_time} seconds for the consumer to process the message...") + sleep(wait_time) + + p.kill() + + response = development_queue_manager.get_response() + + print(response)