From cb8509b1206ef68d419d500c2dcbbd1bb4d0d6d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonathan=20K=C3=B6ssler?= Date: Thu, 1 Aug 2024 17:42:59 +0200 Subject: [PATCH] refactor: message counter --- pyinfra/queue/async_manager.py | 6 ++++-- pyproject.toml | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pyinfra/queue/async_manager.py b/pyinfra/queue/async_manager.py index 17a50eb..75efd8e 100644 --- a/pyinfra/queue/async_manager.py +++ b/pyinfra/queue/async_manager.py @@ -164,8 +164,9 @@ class AsyncQueueManager: await self.shutdown() return + self.message_count += 1 + try: - self.message_count += 1 tenant_id = message.routing_key filtered_message_headers = ( @@ -183,7 +184,6 @@ class AsyncQueueManager: await self.publish_to_output_exchange(tenant_id, result, filtered_message_headers) await message.ack() logger.debug(f"Message with {message.delivery_tag=} acknowledged.") - self.message_count -= 1 else: raise ValueError(f"Could not process message with {message.body=}.") @@ -197,6 +197,8 @@ class AsyncQueueManager: await message.nack(requeue=False) logger.error(f"Error processing input message: {e}", exc_info=True) raise + finally: + self.message_count -= 1 async def publish_to_output_exchange(self, tenant_id: str, result: Dict[str, Any], headers: Dict[str, Any]) -> None: await self.output_exchange.publish( diff --git a/pyproject.toml b/pyproject.toml index 13504bd..2fc214f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pyinfra" -version = "3.0.0" +version = "3.1.0" description = "" authors = ["Team Research "] license = "All rights reseverd"