feat: add on close callback
This commit is contained in:
parent
541219177f
commit
f855224e29
@ -1,8 +1,6 @@
|
||||
import asyncio
|
||||
import concurrent.futures
|
||||
import json
|
||||
import signal
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Callable, Dict, Set
|
||||
|
||||
@ -16,7 +14,6 @@ from aio_pika.abc import (
|
||||
AbstractQueue,
|
||||
)
|
||||
from aio_pika.exceptions import AMQPConnectionError, ChannelInvalidStateError
|
||||
from aiormq.exceptions import AMQPConnectionError
|
||||
from kn_utils.logging import logger
|
||||
from tenacity import (
|
||||
retry,
|
||||
@ -52,6 +49,9 @@ class RabbitMQConfig:
|
||||
"login": self.username,
|
||||
"password": self.password,
|
||||
"client_properties": {"heartbeat": self.heartbeat},
|
||||
# aio_pika automatically and infinitely tries to reconnect to the broker when the connection is closed,
|
||||
# which we don't want in order to enable scale to zero. Therefore, reconnect_interval is set to None.
|
||||
# "reconnect_interval": None,
|
||||
}
|
||||
|
||||
|
||||
@ -88,10 +88,35 @@ class AsyncQueueManager:
|
||||
async def connect(self) -> None:
|
||||
logger.info("Attempting to connect to RabbitMQ...")
|
||||
self.connection = await connect_robust(**self.config.connection_params)
|
||||
self.connection.close_callbacks.add(self.on_connection_close)
|
||||
self.channel = await self.connection.channel()
|
||||
await self.channel.set_qos(prefetch_count=1)
|
||||
logger.info("Successfully connected to RabbitMQ")
|
||||
|
||||
async def on_connection_close(self, sender, exc):
|
||||
"""This is a callback for unexpected connection closures."""
|
||||
logger.debug(f"Sender: {sender}")
|
||||
logger.error("Connection to RabbitMQ lost: %s. Attempting to reconnect...", exc)
|
||||
try:
|
||||
await self.reconnect()
|
||||
except Exception as e:
|
||||
logger.error("Reconnection failed. Shutting down: %s", e)
|
||||
await self.shutdown()
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(3),
|
||||
wait=wait_exponential(multiplier=1, min=4, max=60),
|
||||
retry=retry_if_exception_type(AMQPConnectionError),
|
||||
reraise=True,
|
||||
)
|
||||
async def reconnect(self) -> None:
|
||||
logger.info("Attempting to reconnect to RabbitMQ...")
|
||||
await self.connect()
|
||||
await self.setup_exchanges()
|
||||
await self.initialize_tenant_queues()
|
||||
await self.setup_tenant_queue()
|
||||
logger.info("Reconnected to RabbitMQ successfully. Press CTRL+C to exit.")
|
||||
|
||||
async def is_ready(self) -> bool:
|
||||
if self.connection is None or self.connection.is_closed:
|
||||
try:
|
||||
@ -320,4 +345,3 @@ class AsyncQueueManager:
|
||||
logger.error(f"Error closing connection: {e}")
|
||||
|
||||
logger.info("RabbitMQ handler shut down successfully.")
|
||||
# sys.exit(0)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user