import json from sys import stdout from time import sleep import pika import pytest from kn_utils.logging import logger from pyinfra.queue.manager import QueueManager logger.remove() logger.add(sink=stdout, level="DEBUG") def make_callback(process_time): def callback(x): sleep(process_time) return json.dumps({"status": "success"}) return callback @pytest.fixture(scope="session") def queue_manager(settings): settings.rabbitmq_heartbeat = 10 settings.connection_sleep = 5 queue_manager = QueueManager(settings) yield queue_manager @pytest.fixture def input_message(): return json.dumps({ "targetFilePath": "test/target.json.gz", "responseFilePath": "test/response.json.gz", }) @pytest.fixture def stop_message(): return "STOP" class TestQueueManager: def test_processing_of_several_messages(self, queue_manager, input_message, stop_message): queue_manager.purge_queues() for _ in range(2): queue_manager.publish_message_to_input_queue(input_message) queue_manager.publish_message_to_input_queue(stop_message) callback = make_callback(1) queue_manager.start_consuming(callback) for _ in range(2): response = queue_manager.get_message_from_output_queue() assert response is not None assert response[2] == b'{"status": "success"}' def test_all_headers_beginning_with_x_are_forwarded(self, queue_manager, input_message, stop_message): queue_manager.purge_queues() properties = pika.BasicProperties( headers={ "X-TENANT-ID": "redaction", "X-OTHER-HEADER": "other-header-value", "x-tenant_id": "tenant-id-value", "x_should_not_be_forwarded": "should-not-be-forwarded-value", } ) queue_manager.publish_message_to_input_queue(input_message, properties=properties) queue_manager.publish_message_to_input_queue(stop_message) callback = make_callback(0.2) queue_manager.start_consuming(callback) response = queue_manager.get_message_from_output_queue() assert response[2] == b'{"status": "success"}' assert response[1].headers["X-TENANT-ID"] == "redaction" assert response[1].headers["X-OTHER-HEADER"] == "other-header-value" assert response[1].headers["x-tenant_id"] == "tenant-id-value" assert "x_should_not_be_forwarded" not in response[1].headers def test_message_processing_does_not_block_heartbeat(self, queue_manager, input_message, stop_message): queue_manager.purge_queues() queue_manager.publish_message_to_input_queue(input_message) queue_manager.publish_message_to_input_queue(stop_message) callback = make_callback(15) queue_manager.start_consuming(callback) response = queue_manager.get_message_from_output_queue() assert response[2] == b'{"status": "success"}'