feat: add logger to retry
This commit is contained in:
parent
c6e336cb35
commit
86af05c12c
@ -21,6 +21,9 @@ from aio_pika.exceptions import (
|
||||
from aiormq.exceptions import AMQPConnectionError
|
||||
from kn_utils.logging import logger
|
||||
from kn_utils.retry import retry
|
||||
from tenacity import before_log
|
||||
from tenacity import retry as t_retry
|
||||
from tenacity import stop_after_attempt
|
||||
|
||||
|
||||
@dataclass
|
||||
@ -75,11 +78,7 @@ class AsyncQueueManager:
|
||||
|
||||
self.message_count: int = 0
|
||||
|
||||
@retry(
|
||||
tries=5,
|
||||
exceptions=AMQPConnectionError,
|
||||
reraise=True,
|
||||
)
|
||||
@retry(tries=5, exceptions=AMQPConnectionError, reraise=True, logger=logger)
|
||||
async def connect(self) -> None:
|
||||
logger.info("Attempting to connect to RabbitMQ...")
|
||||
self.connection = await connect(**self.config.connection_params)
|
||||
@ -113,11 +112,7 @@ class AsyncQueueManager:
|
||||
return False
|
||||
return True
|
||||
|
||||
@retry(
|
||||
tries=5,
|
||||
exceptions=(AMQPConnectionError, ChannelInvalidStateError),
|
||||
reraise=True,
|
||||
)
|
||||
@retry(tries=5, exceptions=(AMQPConnectionError, ChannelInvalidStateError), reraise=True, logger=logger)
|
||||
async def setup_exchanges(self) -> None:
|
||||
self.tenant_exchange = await self.channel.declare_exchange(
|
||||
self.config.tenant_exchange_name, ExchangeType.TOPIC, durable=True
|
||||
@ -129,11 +124,7 @@ class AsyncQueueManager:
|
||||
self.config.service_response_exchange_name, ExchangeType.DIRECT, durable=True
|
||||
)
|
||||
|
||||
@retry(
|
||||
tries=5,
|
||||
exceptions=(AMQPConnectionError, ChannelInvalidStateError),
|
||||
reraise=True,
|
||||
)
|
||||
@retry(tries=5, exceptions=(AMQPConnectionError, ChannelInvalidStateError), reraise=True, logger=logger)
|
||||
async def setup_tenant_queue(self) -> None:
|
||||
self.tenant_exchange_queue = await self.channel.declare_queue(
|
||||
f"{self.config.pod_name}_{self.config.tenant_event_queue_suffix}",
|
||||
@ -253,11 +244,7 @@ class AsyncQueueManager:
|
||||
)
|
||||
logger.info(f"Published result to queue {tenant_id}.")
|
||||
|
||||
@retry(
|
||||
tries=5,
|
||||
exceptions=(aiohttp.ClientResponseError, aiohttp.ClientConnectorError),
|
||||
reraise=True,
|
||||
)
|
||||
@retry(tries=5, exceptions=(aiohttp.ClientResponseError, aiohttp.ClientConnectorError), reraise=True, logger=logger)
|
||||
async def fetch_active_tenants(self) -> Set[str]:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(self.tenant_service_url) as response:
|
||||
@ -278,6 +265,7 @@ class AsyncQueueManager:
|
||||
ChannelInvalidStateError,
|
||||
),
|
||||
reraise=True,
|
||||
logger=logger,
|
||||
)
|
||||
async def initialize_tenant_queues(self, active_tenants: set) -> None:
|
||||
for tenant_id in active_tenants:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user