From 27917863c90cb05a7a80e9921bfdcde16e0e4e75 Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Tue, 16 Jan 2024 14:21:41 +0100 Subject: [PATCH] refactor: finnish queue manager, queue manager tests, also add validation logic, integrate new settings --- pyinfra/k8s_probes/__init__.py | 0 pyinfra/k8s_probes/startup.py | 36 ----- .../queue/{queue_manager.py => manager.py} | 8 +- .../queue_manager_test.py | 123 ------------------ tests/tests_with_docker_compose/queue_test.py | 6 +- 5 files changed, 8 insertions(+), 165 deletions(-) delete mode 100644 pyinfra/k8s_probes/__init__.py delete mode 100644 pyinfra/k8s_probes/startup.py rename pyinfra/queue/{queue_manager.py => manager.py} (96%) delete mode 100644 tests/tests_with_docker_compose/queue_manager_test.py diff --git a/pyinfra/k8s_probes/__init__.py b/pyinfra/k8s_probes/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pyinfra/k8s_probes/startup.py b/pyinfra/k8s_probes/startup.py deleted file mode 100644 index 9a8a183..0000000 --- a/pyinfra/k8s_probes/startup.py +++ /dev/null @@ -1,36 +0,0 @@ -import sys -from kn_utils.logging import logger -from pathlib import Path - -from pyinfra.queue.queue_manager import token_file_name - - -def check_token_file(): - """ - Checks if the token file of the QueueManager exists and is not empty, i.e. the queue manager has been started. - - NOTE: This function suppresses all Exception's. - - Returns True if the queue manager has been started, False otherwise - """ - - try: - token_file_path = Path(token_file_name()) - - if token_file_path.exists(): - with token_file_path.open(mode="r", encoding="utf8") as token_file: - contents = token_file.read().strip() - - return contents != "" - # We intentionally do not handle exception here, since we're only using this in a short script. - # Take care to expand this if the intended use changes - except Exception as err: - logger.warning(f"{err}: Caught exception when reading from token file", exc_info=True) - return False - - -def run_checks(): - if check_token_file(): - sys.exit(0) - else: - sys.exit(1) diff --git a/pyinfra/queue/queue_manager.py b/pyinfra/queue/manager.py similarity index 96% rename from pyinfra/queue/queue_manager.py rename to pyinfra/queue/manager.py index ecf6242..5c5e238 100644 --- a/pyinfra/queue/queue_manager.py +++ b/pyinfra/queue/manager.py @@ -18,6 +18,8 @@ from pyinfra.utils.config_validation import validate_settings, queue_manager_val pika_logger = logging.getLogger("pika") pika_logger.setLevel(logging.WARNING) # disables non-informative pika log clutter +MessageProcessor = Callable[[dict], dict] + class QueueManager: def __init__(self, settings: Dynaconf): @@ -129,7 +131,7 @@ class QueueManager: self.establish_connection() return self.channel.basic_get(self.output_queue, auto_ack=True) - def _make_on_message_callback(self, message_processor: Callable): + def _make_on_message_callback(self, message_processor: MessageProcessor): def process_message_body_and_await_result(unpacked_message_body): with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_pool_executor: logger.debug("Processing payload in separate thread.") @@ -163,7 +165,9 @@ class QueueManager: else {} ) logger.debug(f"Processing message with {filtered_message_headers=}.") - result = process_message_body_and_await_result({**json.loads(body), **filtered_message_headers}) + result: dict = ( + process_message_body_and_await_result({**json.loads(body), **filtered_message_headers}) or {} + ) channel.basic_publish( "", diff --git a/tests/tests_with_docker_compose/queue_manager_test.py b/tests/tests_with_docker_compose/queue_manager_test.py deleted file mode 100644 index d6c9118..0000000 --- a/tests/tests_with_docker_compose/queue_manager_test.py +++ /dev/null @@ -1,123 +0,0 @@ -import json -import logging -import time -from multiprocessing import Process - -import pika -import pika.exceptions -import pytest - -from pyinfra.queue.development_queue_manager import DevelopmentQueueManager -from pyinfra.queue.queue_manager import QueueManager - -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) - - -@pytest.fixture(scope="session") -def development_queue_manager(test_queue_config): - test_queue_config.rabbitmq_heartbeat = 7200 - development_queue_manager = DevelopmentQueueManager(test_queue_config) - yield development_queue_manager - logger.info("Tearing down development queue manager...") - try: - development_queue_manager.close_channel() - except pika.exceptions.ConnectionClosedByBroker: - pass - - -@pytest.fixture(scope="session") -def payload_processing_time(test_queue_config, offset=5): - # FIXME: this implicitly tests the heartbeat when running the end-to-end test. There should be another way to test - # this explicitly. - return test_queue_config.rabbitmq_heartbeat + offset - - -@pytest.fixture(scope="session") -def payload_processor(response_payload, payload_processing_time, payload_processor_type): - def process(payload): - time.sleep(payload_processing_time) - return response_payload - - def process_with_failure(payload): - raise MemoryError - - if payload_processor_type == "mock": - return process - elif payload_processor_type == "failing": - return process_with_failure - - -@pytest.fixture(scope="session", autouse=True) -def start_queue_consumer(test_queue_config, payload_processor, sleep_seconds=5): - def consume_queue(): - queue_manager.start_consuming(payload_processor) - - queue_manager = QueueManager(test_queue_config) - p = Process(target=consume_queue) - p.start() - logger.info(f"Setting up consumer, waiting for {sleep_seconds}...") - time.sleep(sleep_seconds) - yield - logger.info("Tearing down consumer...") - p.terminate() - - -@pytest.fixture -def message_properties(message_headers): - if not message_headers: - return pika.BasicProperties(headers=None) - elif message_headers == "X-TENANT-ID": - return pika.BasicProperties(headers={"X-TENANT-ID": "redaction"}) - else: - raise Exception(f"Invalid {message_headers=}.") - - -@pytest.mark.parametrize("x_tenant_id", [None]) -class TestQueueManager: - # FIXME: All tests here are wonky. This is due to the implementation of running the process-blocking queue_manager - # in a subprocess. It is then very hard to interact directly with the subprocess. If you have a better idea, please - # refactor; the tests here are insufficient to ensure the functionality of the queue manager! - @pytest.mark.parametrize("payload_processor_type", ["mock"], scope="session") - def test_message_processing_does_not_block_heartbeat( - self, development_queue_manager, payload, response_payload, payload_processing_time - ): - development_queue_manager.clear_queues() - development_queue_manager.publish_request(payload) - time.sleep(payload_processing_time + 10) - _, _, body = development_queue_manager.get_response() - result = json.loads(body) - assert result == response_payload - - @pytest.mark.parametrize("message_headers", [None, "X-TENANT-ID"]) - @pytest.mark.parametrize("payload_processor_type", ["mock"], scope="session") - def test_queue_manager_forwards_message_headers( - self, - development_queue_manager, - payload, - response_payload, - payload_processing_time, - message_properties, - ): - development_queue_manager.clear_queues() - development_queue_manager.publish_request(payload, message_properties) - time.sleep(payload_processing_time + 10) - _, properties, _ = development_queue_manager.get_response() - assert properties.headers == message_properties.headers - - # FIXME: It is not possible to test the behavior of the queue manager directly, since it is running in a separate - # process. You require logging to see if the exception is handled correctly. Hence, this test is only useful for - # development, but insufficient to guarantee the correct behavior. - @pytest.mark.parametrize("payload_processor_type", ["failing"], scope="session") - def test_failed_message_processing_is_handled( - self, - development_queue_manager, - payload, - response_payload, - payload_processing_time, - ): - development_queue_manager.clear_queues() - development_queue_manager.publish_request(payload) - time.sleep(payload_processing_time + 10) - _, _, body = development_queue_manager.get_response() - assert not body diff --git a/tests/tests_with_docker_compose/queue_test.py b/tests/tests_with_docker_compose/queue_test.py index 43d6096..449cf30 100644 --- a/tests/tests_with_docker_compose/queue_test.py +++ b/tests/tests_with_docker_compose/queue_test.py @@ -6,7 +6,7 @@ import pika import pytest from kn_utils.logging import logger -from pyinfra.queue.queue_manager import QueueManager +from pyinfra.queue.manager import QueueManager logger.remove() logger.add(sink=stdout, level="DEBUG") @@ -15,7 +15,7 @@ logger.add(sink=stdout, level="DEBUG") def make_callback(process_time): def callback(x): sleep(process_time) - return json.dumps({"status": "success"}).encode("utf-8") + return json.dumps({"status": "success"}) return callback @@ -57,7 +57,6 @@ class TestQueueManager: response = queue_manager.get_message_from_output_queue() assert response is not None assert response[2] == b'{"status": "success"}' - print(response) def test_all_headers_beginning_with_x_are_forwarded(self, queue_manager, input_message, stop_message): queue_manager.purge_queues() @@ -78,7 +77,6 @@ class TestQueueManager: queue_manager.start_consuming(callback) response = queue_manager.get_message_from_output_queue() - print(response) assert response[2] == b'{"status": "success"}'