chore: remove test nack

This commit is contained in:
Jonathan Kössler 2025-01-15 10:00:50 +01:00
parent 87f57e2244
commit ea0c55930a

View File

@ -197,54 +197,52 @@ class AsyncQueueManager:
return result
async with message.process(ignore_processed=True):
await message.nack(requeue=False)
return
# if message.redelivered:
# logger.warning(f"Declining message with {message.delivery_tag=} due to it being redelivered.")
# await message.nack(requeue=False)
# return
if message.redelivered:
logger.warning(f"Declining message with {message.delivery_tag=} due to it being redelivered.")
await message.nack(requeue=False)
return
# if message.body.decode("utf-8") == "STOP":
# logger.info("Received stop signal, stopping consumption...")
# await message.ack()
# # TODO: shutdown is probably not the right call here - align w/ Dev what should happen on stop signal
# await self.shutdown()
# return
if message.body.decode("utf-8") == "STOP":
logger.info("Received stop signal, stopping consumption...")
await message.ack()
# TODO: shutdown is probably not the right call here - align w/ Dev what should happen on stop signal
await self.shutdown()
return
# self.message_count += 1
self.message_count += 1
# try:
# tenant_id = message.routing_key
try:
tenant_id = message.routing_key
# filtered_message_headers = (
# {k: v for k, v in message.headers.items() if k.lower().startswith("x-")} if message.headers else {}
# )
filtered_message_headers = (
{k: v for k, v in message.headers.items() if k.lower().startswith("x-")} if message.headers else {}
)
# logger.debug(f"Processing message with {filtered_message_headers=}.")
logger.debug(f"Processing message with {filtered_message_headers=}.")
# result: dict = await (
# process_message_body_and_await_result({**json.loads(message.body), **filtered_message_headers})
# or {}
# )
result: dict = await (
process_message_body_and_await_result({**json.loads(message.body), **filtered_message_headers})
or {}
)
# if result:
# await self.publish_to_output_exchange(tenant_id, result, filtered_message_headers)
# await message.ack()
# logger.debug(f"Message with {message.delivery_tag=} acknowledged.")
# else:
# raise ValueError(f"Could not process message with {message.body=}.")
if result:
await self.publish_to_output_exchange(tenant_id, result, filtered_message_headers)
await message.ack()
logger.debug(f"Message with {message.delivery_tag=} acknowledged.")
else:
raise ValueError(f"Could not process message with {message.body=}.")
# except json.JSONDecodeError:
# await message.nack(requeue=False)
# logger.error(f"Invalid JSON in input message: {message.body}", exc_info=True)
# except FileNotFoundError as e:
# logger.warning(f"{e}, declining message with {message.delivery_tag=}.", exc_info=True)
# await message.nack(requeue=False)
# except Exception as e:
# await message.nack(requeue=False)
# logger.error(f"Error processing input message: {e}", exc_info=True)
# finally:
# self.message_count -= 1
except json.JSONDecodeError:
await message.nack(requeue=False)
logger.error(f"Invalid JSON in input message: {message.body}", exc_info=True)
except FileNotFoundError as e:
logger.warning(f"{e}, declining message with {message.delivery_tag=}.", exc_info=True)
await message.nack(requeue=False)
except Exception as e:
await message.nack(requeue=False)
logger.error(f"Error processing input message: {e}", exc_info=True)
finally:
self.message_count -= 1
async def publish_to_output_exchange(self, tenant_id: str, result: Dict[str, Any], headers: Dict[str, Any]) -> None:
await self.output_exchange.publish(