RED-4653: Switch to closing channel instead of only cancelling subscription on shutdown.

Changed queue-consumption shutdown to close the channel before closing the connection, since only cancelling the consumers doesn't clean-up the channel correctly, which in turn can cause an error when closing the connection.  Also reordered the code so that the connection and channel are only opened when queue-consumption starts.
This commit is contained in:
Viktor Seifert 2022-08-01 12:25:27 +02:00
parent fc1f23a24d
commit 5cdf4df4a3

View File

@ -31,30 +31,38 @@ def _get_n_previous_attempts(props):
class QueueManager(object):
def __init__(self, config: Config):
connection_params = get_connection_params(config)
self.logger = logging.getLogger("queue_manager")
self.logger.setLevel(config.logging_level_root)
self._consumer_token = None
self._connection_params = get_connection_params(config)
self._input_queue = config.request_queue
self._output_queue = config.response_queue
self._dead_letter_queue = config.dead_letter_queue
atexit.register(self.stop_consuming)
signal.signal(signal.SIGTERM, self._handle_stop_signal)
signal.signal(signal.SIGINT, self._handle_stop_signal)
self._connection = pika.BlockingConnection(parameters=connection_params)
def _open_channel(self):
self._connection = pika.BlockingConnection(parameters=self._connection_params)
self._channel = self._connection.channel()
self._channel.basic_qos(prefetch_count=1)
args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": config.dead_letter_queue}
self._input_queue = config.request_queue
self._output_queue = config.response_queue
args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": self._dead_letter_queue}
self._channel.queue_declare(self._input_queue, arguments=args, auto_delete=False, durable=True)
self._channel.queue_declare(self._output_queue, arguments=args, auto_delete=False, durable=True)
self._consumer_token = None
self.logger = logging.getLogger("queue_manager")
self.logger.setLevel(config.logging_level_root)
def _close_channel(self):
self._channel.close()
self._connection.close()
def start_consuming(self, process_message_callback: Callable):
self._open_channel()
callback = self._create_queue_callback(process_message_callback)
self.logger.info("Consuming from queue")
@ -63,18 +71,21 @@ class QueueManager(object):
self._consumer_token = self._channel.basic_consume(self._input_queue, callback)
self.logger.info(f"Registered with consumer-tag: {self._consumer_token}")
self._channel.start_consuming()
except Exception as ex:
self.logger.warning(
f"An unexpected exception occurred while consuming messages. Consuming will stop.\n{ex}"
)
finally:
self.logger.warning("An unhandled exception occurred while consuming messages. Consuming will stop.")
self.stop_consuming()
def stop_consuming(self):
if self._consumer_token and self._connection:
self.logger.info(f"Cancelling subscription for consumer-tag: {self._consumer_token}")
self._channel.basic_cancel(self._consumer_token)
self._connection.close()
self._consumer_token = None
self._close_channel()
def _handle_stop_signal(self, signal_number, _stack_frame, *args, **kwargs):
self.logger.info(f"Received signal {signal_number}")
self.stop_consuming()