diff --git a/pyinfra/queue/queue_manager.py b/pyinfra/queue/queue_manager.py index e1d4543..c599fff 100644 --- a/pyinfra/queue/queue_manager.py +++ b/pyinfra/queue/queue_manager.py @@ -3,11 +3,12 @@ import json import logging import signal from typing import Callable +from os import environ import pika import pika.exceptions -from pyinfra.config import get_config, Config +from pyinfra.config import Config pika_logger = logging.getLogger("pika") pika_logger.setLevel(logging.WARNING) @@ -31,50 +32,66 @@ def _get_n_previous_attempts(props): class QueueManager(object): def __init__(self, config: Config): - connection_params = get_connection_params(config) + self.logger = logging.getLogger("queue_manager") + self.logger.setLevel(config.logging_level_root) - atexit.register(self.stop_consuming) - signal.signal(signal.SIGTERM, self.stop_consuming) - signal.signal(signal.SIGINT, self.stop_consuming) + self._consumer_token = None - self._connection = pika.BlockingConnection(parameters=connection_params) - self._channel = self._connection.channel() - self._channel.basic_qos(prefetch_count=1) - - args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": config.dead_letter_queue} + self._connection_params = get_connection_params(config) self._input_queue = config.request_queue self._output_queue = config.response_queue + self._dead_letter_queue = config.dead_letter_queue + + atexit.register(self.stop_consuming) + signal.signal(signal.SIGTERM, self._handle_stop_signal) + signal.signal(signal.SIGINT, self._handle_stop_signal) + + def _open_channel(self): + self._connection = pika.BlockingConnection(parameters=self._connection_params) + self._channel = self._connection.channel() + self._channel.basic_qos(prefetch_count=1) + + args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": self._dead_letter_queue} self._channel.queue_declare(self._input_queue, arguments=args, auto_delete=False, durable=True) self._channel.queue_declare(self._output_queue, arguments=args, auto_delete=False, durable=True) - self._consumer_token = None - - self.logger = logging.getLogger("queue_manager") - self.logger.setLevel(config.logging_level_root) + def _close_channel(self): + self._channel.close() + self._connection.close() def start_consuming(self, process_message_callback: Callable): callback = self._create_queue_callback(process_message_callback) - self.logger.info("Consuming from queue") self._consumer_token = None + + self.logger.info("Consuming from queue") try: + self._open_channel() + self._consumer_token = self._channel.basic_consume(self._input_queue, callback) self.logger.info(f"Registered with consumer-tag: {self._consumer_token}") self._channel.start_consuming() + except Exception: + self.logger.warning( + "An unexpected exception occurred while consuming messages. Consuming will stop." + ) + raise finally: - self.logger.warning("An unhandled exception occurred while consuming messages. Consuming will stop.") self.stop_consuming() + self._close_channel() def stop_consuming(self): if self._consumer_token and self._connection: self.logger.info(f"Cancelling subscription for consumer-tag: {self._consumer_token}") - self._channel.basic_cancel(self._consumer_token) - self._connection.close() - + self._channel.stop_consuming(self._consumer_token) self._consumer_token = None + def _handle_stop_signal(self, signal_number, _stack_frame, *args, **kwargs): + self.logger.info(f"Received signal {signal_number}") + self.stop_consuming() + def _create_queue_callback(self, process_message_callback: Callable): def callback(_channel, frame, properties, body): self.logger.info(f"Received message from queue with delivery_tag {frame.delivery_tag}")