From 86af05c12cbf0d2b681493d08e0d87eec1f2520f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonathan=20K=C3=B6ssler?= Date: Tue, 12 Nov 2024 16:50:23 +0100 Subject: [PATCH] feat: add logger to retry --- pyinfra/queue/async_manager.py | 28 ++++++++-------------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/pyinfra/queue/async_manager.py b/pyinfra/queue/async_manager.py index b5f9a4f..872d494 100644 --- a/pyinfra/queue/async_manager.py +++ b/pyinfra/queue/async_manager.py @@ -21,6 +21,9 @@ from aio_pika.exceptions import ( from aiormq.exceptions import AMQPConnectionError from kn_utils.logging import logger from kn_utils.retry import retry +from tenacity import before_log +from tenacity import retry as t_retry +from tenacity import stop_after_attempt @dataclass @@ -75,11 +78,7 @@ class AsyncQueueManager: self.message_count: int = 0 - @retry( - tries=5, - exceptions=AMQPConnectionError, - reraise=True, - ) + @retry(tries=5, exceptions=AMQPConnectionError, reraise=True, logger=logger) async def connect(self) -> None: logger.info("Attempting to connect to RabbitMQ...") self.connection = await connect(**self.config.connection_params) @@ -113,11 +112,7 @@ class AsyncQueueManager: return False return True - @retry( - tries=5, - exceptions=(AMQPConnectionError, ChannelInvalidStateError), - reraise=True, - ) + @retry(tries=5, exceptions=(AMQPConnectionError, ChannelInvalidStateError), reraise=True, logger=logger) async def setup_exchanges(self) -> None: self.tenant_exchange = await self.channel.declare_exchange( self.config.tenant_exchange_name, ExchangeType.TOPIC, durable=True @@ -129,11 +124,7 @@ class AsyncQueueManager: self.config.service_response_exchange_name, ExchangeType.DIRECT, durable=True ) - @retry( - tries=5, - exceptions=(AMQPConnectionError, ChannelInvalidStateError), - reraise=True, - ) + @retry(tries=5, exceptions=(AMQPConnectionError, ChannelInvalidStateError), reraise=True, logger=logger) async def setup_tenant_queue(self) -> None: self.tenant_exchange_queue = await self.channel.declare_queue( f"{self.config.pod_name}_{self.config.tenant_event_queue_suffix}", @@ -253,11 +244,7 @@ class AsyncQueueManager: ) logger.info(f"Published result to queue {tenant_id}.") - @retry( - tries=5, - exceptions=(aiohttp.ClientResponseError, aiohttp.ClientConnectorError), - reraise=True, - ) + @retry(tries=5, exceptions=(aiohttp.ClientResponseError, aiohttp.ClientConnectorError), reraise=True, logger=logger) async def fetch_active_tenants(self) -> Set[str]: async with aiohttp.ClientSession() as session: async with session.get(self.tenant_service_url) as response: @@ -278,6 +265,7 @@ class AsyncQueueManager: ChannelInvalidStateError, ), reraise=True, + logger=logger, ) async def initialize_tenant_queues(self, active_tenants: set) -> None: for tenant_id in active_tenants: