From fc1f23a24da71edd3c874434fd3b4d6db9a2c271 Mon Sep 17 00:00:00 2001 From: Viktor Seifert Date: Mon, 1 Aug 2022 11:26:15 +0200 Subject: [PATCH 1/7] RED-4653: Corrected signal handler by correctly handling passed params --- pyinfra/queue/queue_manager.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pyinfra/queue/queue_manager.py b/pyinfra/queue/queue_manager.py index e1d4543..135ea5e 100644 --- a/pyinfra/queue/queue_manager.py +++ b/pyinfra/queue/queue_manager.py @@ -7,7 +7,7 @@ from typing import Callable import pika import pika.exceptions -from pyinfra.config import get_config, Config +from pyinfra.config import Config pika_logger = logging.getLogger("pika") pika_logger.setLevel(logging.WARNING) @@ -34,8 +34,8 @@ class QueueManager(object): connection_params = get_connection_params(config) atexit.register(self.stop_consuming) - signal.signal(signal.SIGTERM, self.stop_consuming) - signal.signal(signal.SIGINT, self.stop_consuming) + signal.signal(signal.SIGTERM, self._handle_stop_signal) + signal.signal(signal.SIGINT, self._handle_stop_signal) self._connection = pika.BlockingConnection(parameters=connection_params) self._channel = self._connection.channel() @@ -75,6 +75,10 @@ class QueueManager(object): self._consumer_token = None + def _handle_stop_signal(self, signal_number, _stack_frame, *args, **kwargs): + self.logger.info(f"Received signal {signal_number}") + self.stop_consuming() + def _create_queue_callback(self, process_message_callback: Callable): def callback(_channel, frame, properties, body): self.logger.info(f"Received message from queue with delivery_tag {frame.delivery_tag}") From 5cdf4df4a3e2e103ef7a6ce4c899eaf68d9c5690 Mon Sep 17 00:00:00 2001 From: Viktor Seifert Date: Mon, 1 Aug 2022 12:25:27 +0200 Subject: [PATCH 2/7] RED-4653: Switch to closing channel instead of only cancelling subscription on shutdown. Changed queue-consumption shutdown to close the channel before closing the connection, since only cancelling the consumers doesn't clean-up the channel correctly, which in turn can cause an error when closing the connection. Also reordered the code so that the connection and channel are only opened when queue-consumption starts. --- pyinfra/queue/queue_manager.py | 37 ++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/pyinfra/queue/queue_manager.py b/pyinfra/queue/queue_manager.py index 135ea5e..c961de9 100644 --- a/pyinfra/queue/queue_manager.py +++ b/pyinfra/queue/queue_manager.py @@ -31,30 +31,38 @@ def _get_n_previous_attempts(props): class QueueManager(object): def __init__(self, config: Config): - connection_params = get_connection_params(config) + self.logger = logging.getLogger("queue_manager") + self.logger.setLevel(config.logging_level_root) + + self._consumer_token = None + + self._connection_params = get_connection_params(config) + + self._input_queue = config.request_queue + self._output_queue = config.response_queue + self._dead_letter_queue = config.dead_letter_queue atexit.register(self.stop_consuming) signal.signal(signal.SIGTERM, self._handle_stop_signal) signal.signal(signal.SIGINT, self._handle_stop_signal) - self._connection = pika.BlockingConnection(parameters=connection_params) + def _open_channel(self): + self._connection = pika.BlockingConnection(parameters=self._connection_params) self._channel = self._connection.channel() self._channel.basic_qos(prefetch_count=1) - args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": config.dead_letter_queue} - - self._input_queue = config.request_queue - self._output_queue = config.response_queue + args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": self._dead_letter_queue} self._channel.queue_declare(self._input_queue, arguments=args, auto_delete=False, durable=True) self._channel.queue_declare(self._output_queue, arguments=args, auto_delete=False, durable=True) - self._consumer_token = None - - self.logger = logging.getLogger("queue_manager") - self.logger.setLevel(config.logging_level_root) + def _close_channel(self): + self._channel.close() + self._connection.close() def start_consuming(self, process_message_callback: Callable): + self._open_channel() + callback = self._create_queue_callback(process_message_callback) self.logger.info("Consuming from queue") @@ -63,18 +71,21 @@ class QueueManager(object): self._consumer_token = self._channel.basic_consume(self._input_queue, callback) self.logger.info(f"Registered with consumer-tag: {self._consumer_token}") self._channel.start_consuming() + except Exception as ex: + self.logger.warning( + f"An unexpected exception occurred while consuming messages. Consuming will stop.\n{ex}" + ) finally: - self.logger.warning("An unhandled exception occurred while consuming messages. Consuming will stop.") self.stop_consuming() def stop_consuming(self): if self._consumer_token and self._connection: self.logger.info(f"Cancelling subscription for consumer-tag: {self._consumer_token}") self._channel.basic_cancel(self._consumer_token) - self._connection.close() - self._consumer_token = None + self._close_channel() + def _handle_stop_signal(self, signal_number, _stack_frame, *args, **kwargs): self.logger.info(f"Received signal {signal_number}") self.stop_consuming() From bbf013385ab75c5c02cc0f39f7db2896bb98653a Mon Sep 17 00:00:00 2001 From: Viktor Seifert Date: Mon, 1 Aug 2022 13:17:01 +0200 Subject: [PATCH 3/7] RED-4653: Corrected exception block to not swallow exceptions --- pyinfra/queue/queue_manager.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pyinfra/queue/queue_manager.py b/pyinfra/queue/queue_manager.py index c961de9..7762ffc 100644 --- a/pyinfra/queue/queue_manager.py +++ b/pyinfra/queue/queue_manager.py @@ -67,14 +67,16 @@ class QueueManager(object): self.logger.info("Consuming from queue") self._consumer_token = None + try: self._consumer_token = self._channel.basic_consume(self._input_queue, callback) self.logger.info(f"Registered with consumer-tag: {self._consumer_token}") self._channel.start_consuming() - except Exception as ex: + except Exception: self.logger.warning( - f"An unexpected exception occurred while consuming messages. Consuming will stop.\n{ex}" + f"An unexpected exception occurred while consuming messages. Consuming will stop." ) + raise finally: self.stop_consuming() From 76985e83edb95f8ea8c6db2974d232de08b3c79e Mon Sep 17 00:00:00 2001 From: Viktor Seifert Date: Mon, 1 Aug 2022 13:25:53 +0200 Subject: [PATCH 4/7] RED-4653: Added some debugging code to test if closing the connection needed --- pyinfra/queue/queue_manager.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pyinfra/queue/queue_manager.py b/pyinfra/queue/queue_manager.py index 7762ffc..5021114 100644 --- a/pyinfra/queue/queue_manager.py +++ b/pyinfra/queue/queue_manager.py @@ -3,6 +3,7 @@ import json import logging import signal from typing import Callable +from os import environ import pika import pika.exceptions @@ -58,7 +59,9 @@ class QueueManager(object): def _close_channel(self): self._channel.close() - self._connection.close() + + if environ.get("CLOSE_CONNECTION", False) == "True": + self._connection.close() def start_consuming(self, process_message_callback: Callable): self._open_channel() From 2cffab279db2f38787d0f97dfb9d106b8d5d164f Mon Sep 17 00:00:00 2001 From: Viktor Seifert Date: Mon, 1 Aug 2022 13:39:09 +0200 Subject: [PATCH 5/7] RED-4653: Changed code to close only the connection instead of the channel & connection to see if that is sufficient for a clean shutdown --- pyinfra/queue/queue_manager.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pyinfra/queue/queue_manager.py b/pyinfra/queue/queue_manager.py index 5021114..a60159b 100644 --- a/pyinfra/queue/queue_manager.py +++ b/pyinfra/queue/queue_manager.py @@ -58,10 +58,7 @@ class QueueManager(object): self._channel.queue_declare(self._output_queue, arguments=args, auto_delete=False, durable=True) def _close_channel(self): - self._channel.close() - - if environ.get("CLOSE_CONNECTION", False) == "True": - self._connection.close() + self._connection.close() def start_consuming(self, process_message_callback: Callable): self._open_channel() From 89ce61996cd3397cad11436ad57ac03dfa487c05 Mon Sep 17 00:00:00 2001 From: Viktor Seifert Date: Mon, 1 Aug 2022 14:08:58 +0200 Subject: [PATCH 6/7] RED-4653: Reordered code to prevent errors on application shutdown --- pyinfra/queue/queue_manager.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pyinfra/queue/queue_manager.py b/pyinfra/queue/queue_manager.py index a60159b..81bbb20 100644 --- a/pyinfra/queue/queue_manager.py +++ b/pyinfra/queue/queue_manager.py @@ -58,17 +58,18 @@ class QueueManager(object): self._channel.queue_declare(self._output_queue, arguments=args, auto_delete=False, durable=True) def _close_channel(self): + self._channel.close() self._connection.close() def start_consuming(self, process_message_callback: Callable): - self._open_channel() - callback = self._create_queue_callback(process_message_callback) - self.logger.info("Consuming from queue") self._consumer_token = None + self.logger.info("Consuming from queue") try: + self._open_channel() + self._consumer_token = self._channel.basic_consume(self._input_queue, callback) self.logger.info(f"Registered with consumer-tag: {self._consumer_token}") self._channel.start_consuming() @@ -79,15 +80,14 @@ class QueueManager(object): raise finally: self.stop_consuming() + self._close_channel() def stop_consuming(self): if self._consumer_token and self._connection: self.logger.info(f"Cancelling subscription for consumer-tag: {self._consumer_token}") - self._channel.basic_cancel(self._consumer_token) + self._channel.stop_consuming(self._consumer_token) self._consumer_token = None - self._close_channel() - def _handle_stop_signal(self, signal_number, _stack_frame, *args, **kwargs): self.logger.info(f"Received signal {signal_number}") self.stop_consuming() From 0efbd2c98cecaa1e33991473b1b120827df60ae9 Mon Sep 17 00:00:00 2001 From: Viktor Seifert Date: Mon, 1 Aug 2022 14:15:03 +0200 Subject: [PATCH 7/7] RED-4653: Removed unnecessary string formatting --- pyinfra/queue/queue_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyinfra/queue/queue_manager.py b/pyinfra/queue/queue_manager.py index 81bbb20..c599fff 100644 --- a/pyinfra/queue/queue_manager.py +++ b/pyinfra/queue/queue_manager.py @@ -75,7 +75,7 @@ class QueueManager(object): self._channel.start_consuming() except Exception: self.logger.warning( - f"An unexpected exception occurred while consuming messages. Consuming will stop." + "An unexpected exception occurred while consuming messages. Consuming will stop." ) raise finally: