From 596d4a9bd00c3bb964d6d1b7f8d90e576665fd30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonathan=20K=C3=B6ssler?= Date: Mon, 22 Jul 2024 16:48:31 +0200 Subject: [PATCH] feat: add expiration for tenant event queue and retry to tenant api call --- pyinfra/queue/async_manager.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pyinfra/queue/async_manager.py b/pyinfra/queue/async_manager.py index abdd38f..be7b02d 100644 --- a/pyinfra/queue/async_manager.py +++ b/pyinfra/queue/async_manager.py @@ -13,6 +13,7 @@ from aio_pika.abc import ( AbstractIncomingMessage, ) from kn_utils.logging import logger +from retry import retry @dataclass @@ -88,6 +89,8 @@ class AsyncQueueManager: arguments={ "x-dead-letter-exchange": "", "x-dead-letter-routing-key": self.config.service_dead_letter_queue_name, + "x-expires": self.config.queue_expiration_time, + "x-max-priority": 2, }, ) await queue.bind(self.tenant_exchange, routing_key="tenant.*") @@ -187,6 +190,7 @@ class AsyncQueueManager: ) logger.info(f"Published result to queue {tenant_id}.") + @retry(tries=3, delay=5, jitter=(1, 3), logger=logger, exceptions=(aiohttp.ClientError)) async def fetch_active_tenants(self) -> Set[str]: try: async with aiohttp.ClientSession() as session: