refactor: message counter
This commit is contained in:
parent
47b42e95e2
commit
cb8509b120
@ -164,8 +164,9 @@ class AsyncQueueManager:
|
||||
await self.shutdown()
|
||||
return
|
||||
|
||||
self.message_count += 1
|
||||
|
||||
try:
|
||||
self.message_count += 1
|
||||
tenant_id = message.routing_key
|
||||
|
||||
filtered_message_headers = (
|
||||
@ -183,7 +184,6 @@ class AsyncQueueManager:
|
||||
await self.publish_to_output_exchange(tenant_id, result, filtered_message_headers)
|
||||
await message.ack()
|
||||
logger.debug(f"Message with {message.delivery_tag=} acknowledged.")
|
||||
self.message_count -= 1
|
||||
else:
|
||||
raise ValueError(f"Could not process message with {message.body=}.")
|
||||
|
||||
@ -197,6 +197,8 @@ class AsyncQueueManager:
|
||||
await message.nack(requeue=False)
|
||||
logger.error(f"Error processing input message: {e}", exc_info=True)
|
||||
raise
|
||||
finally:
|
||||
self.message_count -= 1
|
||||
|
||||
async def publish_to_output_exchange(self, tenant_id: str, result: Dict[str, Any], headers: Dict[str, Any]) -> None:
|
||||
await self.output_exchange.publish(
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "pyinfra"
|
||||
version = "3.0.0"
|
||||
version = "3.1.0"
|
||||
description = ""
|
||||
authors = ["Team Research <research@knecon.com>"]
|
||||
license = "All rights reseverd"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user