Pull request #46: RED-4653
Merge in RR/pyinfra from RED-4653 to master * commit '0efbd2c98cecaa1e33991473b1b120827df60ae9': RED-4653: Removed unnecessary string formatting RED-4653: Reordered code to prevent errors on application shutdown RED-4653: Changed code to close only the connection instead of the channel & connection to see if that is sufficient for a clean shutdown RED-4653: Added some debugging code to test if closing the connection needed RED-4653: Corrected exception block to not swallow exceptions RED-4653: Switch to closing channel instead of only cancelling subscription on shutdown. RED-4653: Corrected signal handler by correctly handling passed params
This commit is contained in:
commit
3ccc4a1547
@ -3,11 +3,12 @@ import json
|
||||
import logging
|
||||
import signal
|
||||
from typing import Callable
|
||||
from os import environ
|
||||
|
||||
import pika
|
||||
import pika.exceptions
|
||||
|
||||
from pyinfra.config import get_config, Config
|
||||
from pyinfra.config import Config
|
||||
|
||||
pika_logger = logging.getLogger("pika")
|
||||
pika_logger.setLevel(logging.WARNING)
|
||||
@ -31,50 +32,66 @@ def _get_n_previous_attempts(props):
|
||||
|
||||
class QueueManager(object):
|
||||
def __init__(self, config: Config):
|
||||
connection_params = get_connection_params(config)
|
||||
self.logger = logging.getLogger("queue_manager")
|
||||
self.logger.setLevel(config.logging_level_root)
|
||||
|
||||
atexit.register(self.stop_consuming)
|
||||
signal.signal(signal.SIGTERM, self.stop_consuming)
|
||||
signal.signal(signal.SIGINT, self.stop_consuming)
|
||||
self._consumer_token = None
|
||||
|
||||
self._connection = pika.BlockingConnection(parameters=connection_params)
|
||||
self._channel = self._connection.channel()
|
||||
self._channel.basic_qos(prefetch_count=1)
|
||||
|
||||
args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": config.dead_letter_queue}
|
||||
self._connection_params = get_connection_params(config)
|
||||
|
||||
self._input_queue = config.request_queue
|
||||
self._output_queue = config.response_queue
|
||||
self._dead_letter_queue = config.dead_letter_queue
|
||||
|
||||
atexit.register(self.stop_consuming)
|
||||
signal.signal(signal.SIGTERM, self._handle_stop_signal)
|
||||
signal.signal(signal.SIGINT, self._handle_stop_signal)
|
||||
|
||||
def _open_channel(self):
|
||||
self._connection = pika.BlockingConnection(parameters=self._connection_params)
|
||||
self._channel = self._connection.channel()
|
||||
self._channel.basic_qos(prefetch_count=1)
|
||||
|
||||
args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": self._dead_letter_queue}
|
||||
|
||||
self._channel.queue_declare(self._input_queue, arguments=args, auto_delete=False, durable=True)
|
||||
self._channel.queue_declare(self._output_queue, arguments=args, auto_delete=False, durable=True)
|
||||
|
||||
self._consumer_token = None
|
||||
|
||||
self.logger = logging.getLogger("queue_manager")
|
||||
self.logger.setLevel(config.logging_level_root)
|
||||
def _close_channel(self):
|
||||
self._channel.close()
|
||||
self._connection.close()
|
||||
|
||||
def start_consuming(self, process_message_callback: Callable):
|
||||
callback = self._create_queue_callback(process_message_callback)
|
||||
|
||||
self.logger.info("Consuming from queue")
|
||||
self._consumer_token = None
|
||||
|
||||
self.logger.info("Consuming from queue")
|
||||
try:
|
||||
self._open_channel()
|
||||
|
||||
self._consumer_token = self._channel.basic_consume(self._input_queue, callback)
|
||||
self.logger.info(f"Registered with consumer-tag: {self._consumer_token}")
|
||||
self._channel.start_consuming()
|
||||
except Exception:
|
||||
self.logger.warning(
|
||||
"An unexpected exception occurred while consuming messages. Consuming will stop."
|
||||
)
|
||||
raise
|
||||
finally:
|
||||
self.logger.warning("An unhandled exception occurred while consuming messages. Consuming will stop.")
|
||||
self.stop_consuming()
|
||||
self._close_channel()
|
||||
|
||||
def stop_consuming(self):
|
||||
if self._consumer_token and self._connection:
|
||||
self.logger.info(f"Cancelling subscription for consumer-tag: {self._consumer_token}")
|
||||
self._channel.basic_cancel(self._consumer_token)
|
||||
self._connection.close()
|
||||
|
||||
self._channel.stop_consuming(self._consumer_token)
|
||||
self._consumer_token = None
|
||||
|
||||
def _handle_stop_signal(self, signal_number, _stack_frame, *args, **kwargs):
|
||||
self.logger.info(f"Received signal {signal_number}")
|
||||
self.stop_consuming()
|
||||
|
||||
def _create_queue_callback(self, process_message_callback: Callable):
|
||||
def callback(_channel, frame, properties, body):
|
||||
self.logger.info(f"Received message from queue with delivery_tag {frame.delivery_tag}")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user