import json from sys import stdout from time import sleep import pika from kn_utils.logging import logger logger.remove() logger.add(sink=stdout, level="DEBUG") def make_callback(process_time): def callback(x): sleep(process_time) return {"status": "success"} return callback def file_not_found_callback(x): raise FileNotFoundError("File not found") class TestQueueManager: def test_not_available_file_leads_to_message_rejection_without_crashing( self, queue_manager, input_message, stop_message ): queue_manager.purge_queues() queue_manager.publish_message_to_input_queue(input_message) queue_manager.publish_message_to_input_queue(stop_message) queue_manager.start_consuming(file_not_found_callback) def test_processing_of_several_messages(self, queue_manager, input_message, stop_message): queue_manager.purge_queues() for _ in range(2): queue_manager.publish_message_to_input_queue(input_message) queue_manager.publish_message_to_input_queue(stop_message) callback = make_callback(1) queue_manager.start_consuming(callback) for _ in range(2): response = queue_manager.get_message_from_output_queue() assert response is not None assert json.loads(response[2].decode()) == {"status": "success"} def test_all_headers_beginning_with_x_are_forwarded(self, queue_manager, input_message, stop_message): queue_manager.purge_queues() properties = pika.BasicProperties( headers={ "X-TENANT-ID": "redaction", "X-OTHER-HEADER": "other-header-value", "x-tenant_id": "tenant-id-value", "x_should_not_be_forwarded": "should-not-be-forwarded-value", } ) queue_manager.publish_message_to_input_queue(input_message, properties=properties) queue_manager.publish_message_to_input_queue(stop_message) callback = make_callback(0.2) queue_manager.start_consuming(callback) response = queue_manager.get_message_from_output_queue() assert json.loads(response[2].decode()) == {"status": "success"} assert response[1].headers["X-TENANT-ID"] == "redaction" assert response[1].headers["X-OTHER-HEADER"] == "other-header-value" assert response[1].headers["x-tenant_id"] == "tenant-id-value" assert "x_should_not_be_forwarded" not in response[1].headers def test_message_processing_does_not_block_heartbeat(self, queue_manager, input_message, stop_message): queue_manager.purge_queues() queue_manager.publish_message_to_input_queue(input_message) queue_manager.publish_message_to_input_queue(stop_message) callback = make_callback(15) queue_manager.start_consuming(callback) response = queue_manager.get_message_from_output_queue() assert json.loads(response[2].decode()) == {"status": "success"}