diff --git a/pyinfra/queue/async_manager.py b/pyinfra/queue/async_manager.py index 0932c41..be9b6b8 100644 --- a/pyinfra/queue/async_manager.py +++ b/pyinfra/queue/async_manager.py @@ -197,54 +197,52 @@ class AsyncQueueManager: return result async with message.process(ignore_processed=True): - await message.nack(requeue=False) - return - # if message.redelivered: - # logger.warning(f"Declining message with {message.delivery_tag=} due to it being redelivered.") - # await message.nack(requeue=False) - # return + if message.redelivered: + logger.warning(f"Declining message with {message.delivery_tag=} due to it being redelivered.") + await message.nack(requeue=False) + return - # if message.body.decode("utf-8") == "STOP": - # logger.info("Received stop signal, stopping consumption...") - # await message.ack() - # # TODO: shutdown is probably not the right call here - align w/ Dev what should happen on stop signal - # await self.shutdown() - # return + if message.body.decode("utf-8") == "STOP": + logger.info("Received stop signal, stopping consumption...") + await message.ack() + # TODO: shutdown is probably not the right call here - align w/ Dev what should happen on stop signal + await self.shutdown() + return - # self.message_count += 1 + self.message_count += 1 - # try: - # tenant_id = message.routing_key + try: + tenant_id = message.routing_key - # filtered_message_headers = ( - # {k: v for k, v in message.headers.items() if k.lower().startswith("x-")} if message.headers else {} - # ) + filtered_message_headers = ( + {k: v for k, v in message.headers.items() if k.lower().startswith("x-")} if message.headers else {} + ) - # logger.debug(f"Processing message with {filtered_message_headers=}.") + logger.debug(f"Processing message with {filtered_message_headers=}.") - # result: dict = await ( - # process_message_body_and_await_result({**json.loads(message.body), **filtered_message_headers}) - # or {} - # ) + result: dict = await ( + process_message_body_and_await_result({**json.loads(message.body), **filtered_message_headers}) + or {} + ) - # if result: - # await self.publish_to_output_exchange(tenant_id, result, filtered_message_headers) - # await message.ack() - # logger.debug(f"Message with {message.delivery_tag=} acknowledged.") - # else: - # raise ValueError(f"Could not process message with {message.body=}.") + if result: + await self.publish_to_output_exchange(tenant_id, result, filtered_message_headers) + await message.ack() + logger.debug(f"Message with {message.delivery_tag=} acknowledged.") + else: + raise ValueError(f"Could not process message with {message.body=}.") - # except json.JSONDecodeError: - # await message.nack(requeue=False) - # logger.error(f"Invalid JSON in input message: {message.body}", exc_info=True) - # except FileNotFoundError as e: - # logger.warning(f"{e}, declining message with {message.delivery_tag=}.", exc_info=True) - # await message.nack(requeue=False) - # except Exception as e: - # await message.nack(requeue=False) - # logger.error(f"Error processing input message: {e}", exc_info=True) - # finally: - # self.message_count -= 1 + except json.JSONDecodeError: + await message.nack(requeue=False) + logger.error(f"Invalid JSON in input message: {message.body}", exc_info=True) + except FileNotFoundError as e: + logger.warning(f"{e}, declining message with {message.delivery_tag=}.", exc_info=True) + await message.nack(requeue=False) + except Exception as e: + await message.nack(requeue=False) + logger.error(f"Error processing input message: {e}", exc_info=True) + finally: + self.message_count -= 1 async def publish_to_output_exchange(self, tenant_id: str, result: Dict[str, Any], headers: Dict[str, Any]) -> None: await self.output_exchange.publish(