diff --git a/tests/tests_with_docker_compose/queue_test.py b/tests/tests_with_docker_compose/queue_test.py new file mode 100644 index 0000000..d293eef --- /dev/null +++ b/tests/tests_with_docker_compose/queue_test.py @@ -0,0 +1,38 @@ +import gzip +import json +from multiprocessing import Process +from time import sleep + +from kn_utils.logging import logger + +from pyinfra.queue.development_queue_manager import DevelopmentQueueManager +from pyinfra.queue.queue_manager import QueueManager + + +class TestQueueManager: + def test_basic_functionality(self, settings): + settings.rabbitmq_heartbeat = 7200 + development_queue_manager = DevelopmentQueueManager(settings) + + message = { + "targetFilePath": "test/target.json.gz", + "responseFilePath": "test/response.json.gz", + } + + development_queue_manager.publish_request(message) + + queue_manager = QueueManager(settings) + + consume = lambda: queue_manager.start_consuming(lambda x: x) + p = Process(target=consume) + p.start() + + wait_time = 1 + logger.info(f"Waiting {wait_time} seconds for the consumer to process the message...") + sleep(wait_time) + + p.kill() + + response = development_queue_manager.get_response() + + print(response)