diff --git a/pyinfra/queue/async_manager.py b/pyinfra/queue/async_manager.py index 802d35a..2800920 100644 --- a/pyinfra/queue/async_manager.py +++ b/pyinfra/queue/async_manager.py @@ -1,8 +1,6 @@ import asyncio import concurrent.futures import json -import signal -import sys from dataclasses import dataclass, field from typing import Any, Callable, Dict, Set @@ -16,7 +14,6 @@ from aio_pika.abc import ( AbstractQueue, ) from aio_pika.exceptions import AMQPConnectionError, ChannelInvalidStateError -from aiormq.exceptions import AMQPConnectionError from kn_utils.logging import logger from tenacity import ( retry, @@ -52,6 +49,9 @@ class RabbitMQConfig: "login": self.username, "password": self.password, "client_properties": {"heartbeat": self.heartbeat}, + # aio_pika automatically and infinitely tries to reconnect to the broker when the connection is closed, + # which we don't want in order to enable scale to zero. Therefore, reconnect_interval is set to None. + # "reconnect_interval": None, } @@ -88,10 +88,35 @@ class AsyncQueueManager: async def connect(self) -> None: logger.info("Attempting to connect to RabbitMQ...") self.connection = await connect_robust(**self.config.connection_params) + self.connection.close_callbacks.add(self.on_connection_close) self.channel = await self.connection.channel() await self.channel.set_qos(prefetch_count=1) logger.info("Successfully connected to RabbitMQ") + async def on_connection_close(self, sender, exc): + """This is a callback for unexpected connection closures.""" + logger.debug(f"Sender: {sender}") + logger.error("Connection to RabbitMQ lost: %s. Attempting to reconnect...", exc) + try: + await self.reconnect() + except Exception as e: + logger.error("Reconnection failed. Shutting down: %s", e) + await self.shutdown() + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=60), + retry=retry_if_exception_type(AMQPConnectionError), + reraise=True, + ) + async def reconnect(self) -> None: + logger.info("Attempting to reconnect to RabbitMQ...") + await self.connect() + await self.setup_exchanges() + await self.initialize_tenant_queues() + await self.setup_tenant_queue() + logger.info("Reconnected to RabbitMQ successfully. Press CTRL+C to exit.") + async def is_ready(self) -> bool: if self.connection is None or self.connection.is_closed: try: @@ -320,4 +345,3 @@ class AsyncQueueManager: logger.error(f"Error closing connection: {e}") logger.info("RabbitMQ handler shut down successfully.") - # sys.exit(0)