diff --git a/pyinfra/queue/async_manager.py b/pyinfra/queue/async_manager.py index abdd38f..be7b02d 100644 --- a/pyinfra/queue/async_manager.py +++ b/pyinfra/queue/async_manager.py @@ -13,6 +13,7 @@ from aio_pika.abc import ( AbstractIncomingMessage, ) from kn_utils.logging import logger +from retry import retry @dataclass @@ -88,6 +89,8 @@ class AsyncQueueManager: arguments={ "x-dead-letter-exchange": "", "x-dead-letter-routing-key": self.config.service_dead_letter_queue_name, + "x-expires": self.config.queue_expiration_time, + "x-max-priority": 2, }, ) await queue.bind(self.tenant_exchange, routing_key="tenant.*") @@ -187,6 +190,7 @@ class AsyncQueueManager: ) logger.info(f"Published result to queue {tenant_id}.") + @retry(tries=3, delay=5, jitter=(1, 3), logger=logger, exceptions=(aiohttp.ClientError)) async def fetch_active_tenants(self) -> Set[str]: try: async with aiohttp.ClientSession() as session: