import json import logging import time from multiprocessing import Process import pika import pika.exceptions import pytest from pyinfra.queue.development_queue_manager import DevelopmentQueueManager from pyinfra.queue.queue_manager import QueueManager logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @pytest.fixture(scope="session") def development_queue_manager(test_queue_config): test_queue_config.rabbitmq_heartbeat = 7200 development_queue_manager = DevelopmentQueueManager(test_queue_config) yield development_queue_manager logger.info("Tearing down development queue manager...") try: development_queue_manager.close_channel() except pika.exceptions.ConnectionClosedByBroker: pass @pytest.fixture(scope="session") def payload_processing_time(test_queue_config, offset=5): # FIXME: this implicitly tests the heartbeat when running the end-to-end test. There should be another way to test # this explicitly. return test_queue_config.rabbitmq_heartbeat + offset @pytest.fixture(scope="session") def payload_processor(response_payload, payload_processing_time, payload_processor_type): def process(payload): time.sleep(payload_processing_time) return response_payload def process_with_failure(payload): raise MemoryError if payload_processor_type == "mock": return process elif payload_processor_type == "failing": return process_with_failure @pytest.fixture(scope="session", autouse=True) def start_queue_consumer(test_queue_config, payload_processor, sleep_seconds=5): def consume_queue(): queue_manager.start_consuming(payload_processor) queue_manager = QueueManager(test_queue_config) p = Process(target=consume_queue) p.start() logger.info(f"Setting up consumer, waiting for {sleep_seconds}...") time.sleep(sleep_seconds) yield logger.info("Tearing down consumer...") p.terminate() @pytest.fixture def message_properties(message_headers): if not message_headers: return pika.BasicProperties(headers=None) elif message_headers == "X-TENANT-ID": return pika.BasicProperties(headers={"X-TENANT-ID": "redaction"}) else: raise Exception(f"Invalid {message_headers=}.") @pytest.mark.parametrize("x_tenant_id", [None]) class TestQueueManager: # FIXME: All tests here are wonky. This is due to the implementation of running the process-blocking queue_manager # in a subprocess. It is then very hard to interact directly with the subprocess. If you have a better idea, please # refactor; the tests here are insufficient to ensure the functionality of the queue manager! @pytest.mark.parametrize("payload_processor_type", ["mock"], scope="session") def test_message_processing_does_not_block_heartbeat( self, development_queue_manager, payload, response_payload, payload_processing_time ): development_queue_manager.clear_queues() development_queue_manager.publish_request(payload) time.sleep(payload_processing_time + 10) _, _, body = development_queue_manager.get_response() result = json.loads(body) assert result == response_payload @pytest.mark.parametrize("message_headers", [None, "X-TENANT-ID"]) @pytest.mark.parametrize("payload_processor_type", ["mock"], scope="session") def test_queue_manager_forwards_message_headers( self, development_queue_manager, payload, response_payload, payload_processing_time, message_properties, ): development_queue_manager.clear_queues() development_queue_manager.publish_request(payload, message_properties) time.sleep(payload_processing_time + 10) _, properties, _ = development_queue_manager.get_response() assert properties.headers == message_properties.headers # FIXME: It is not possible to test the behavior of the queue manager directly, since it is running in a separate # process. You require logging to see if the exception is handled correctly. Hence, this test is only useful for # development, but insufficient to guarantee the correct behavior. @pytest.mark.parametrize("payload_processor_type", ["failing"], scope="session") def test_failed_message_processing_is_handled( self, development_queue_manager, payload, response_payload, payload_processing_time, ): development_queue_manager.clear_queues() development_queue_manager.publish_request(payload) time.sleep(payload_processing_time + 10) _, _, body = development_queue_manager.get_response() assert not body