fix: dlq init
This commit is contained in:
parent
3fb8c4e641
commit
87f57e2244
253
poetry.lock
generated
253
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@ -122,6 +122,11 @@ class AsyncQueueManager:
|
||||
self.config.service_response_exchange_name, ExchangeType.DIRECT, durable=True
|
||||
)
|
||||
|
||||
# we must declare DLQ to handle error messages
|
||||
self.dead_letter_queue = await self.channel.declare_queue(
|
||||
self.config.service_dead_letter_queue_name, durable=True
|
||||
)
|
||||
|
||||
@retry(tries=5, exceptions=(AMQPConnectionError, ChannelInvalidStateError), reraise=True, logger=logger)
|
||||
async def setup_tenant_queue(self) -> None:
|
||||
self.tenant_exchange_queue = await self.channel.declare_queue(
|
||||
@ -160,6 +165,10 @@ class AsyncQueueManager:
|
||||
input_queue = await self.channel.declare_queue(
|
||||
queue_name,
|
||||
durable=True,
|
||||
arguments={
|
||||
"x-dead-letter-exchange": "",
|
||||
"x-dead-letter-routing-key": self.config.service_dead_letter_queue_name,
|
||||
},
|
||||
)
|
||||
await input_queue.bind(self.input_exchange, routing_key=tenant_id)
|
||||
self.consumer_tags[tenant_id] = await input_queue.consume(self.process_input_message)
|
||||
@ -188,52 +197,54 @@ class AsyncQueueManager:
|
||||
return result
|
||||
|
||||
async with message.process(ignore_processed=True):
|
||||
if message.redelivered:
|
||||
logger.warning(f"Declining message with {message.delivery_tag=} due to it being redelivered.")
|
||||
await message.nack(requeue=False)
|
||||
return
|
||||
await message.nack(requeue=False)
|
||||
return
|
||||
# if message.redelivered:
|
||||
# logger.warning(f"Declining message with {message.delivery_tag=} due to it being redelivered.")
|
||||
# await message.nack(requeue=False)
|
||||
# return
|
||||
|
||||
if message.body.decode("utf-8") == "STOP":
|
||||
logger.info("Received stop signal, stopping consumption...")
|
||||
await message.ack()
|
||||
# TODO: shutdown is probably not the right call here - align w/ Dev what should happen on stop signal
|
||||
await self.shutdown()
|
||||
return
|
||||
# if message.body.decode("utf-8") == "STOP":
|
||||
# logger.info("Received stop signal, stopping consumption...")
|
||||
# await message.ack()
|
||||
# # TODO: shutdown is probably not the right call here - align w/ Dev what should happen on stop signal
|
||||
# await self.shutdown()
|
||||
# return
|
||||
|
||||
self.message_count += 1
|
||||
# self.message_count += 1
|
||||
|
||||
try:
|
||||
tenant_id = message.routing_key
|
||||
# try:
|
||||
# tenant_id = message.routing_key
|
||||
|
||||
filtered_message_headers = (
|
||||
{k: v for k, v in message.headers.items() if k.lower().startswith("x-")} if message.headers else {}
|
||||
)
|
||||
# filtered_message_headers = (
|
||||
# {k: v for k, v in message.headers.items() if k.lower().startswith("x-")} if message.headers else {}
|
||||
# )
|
||||
|
||||
logger.debug(f"Processing message with {filtered_message_headers=}.")
|
||||
# logger.debug(f"Processing message with {filtered_message_headers=}.")
|
||||
|
||||
result: dict = await (
|
||||
process_message_body_and_await_result({**json.loads(message.body), **filtered_message_headers})
|
||||
or {}
|
||||
)
|
||||
# result: dict = await (
|
||||
# process_message_body_and_await_result({**json.loads(message.body), **filtered_message_headers})
|
||||
# or {}
|
||||
# )
|
||||
|
||||
if result:
|
||||
await self.publish_to_output_exchange(tenant_id, result, filtered_message_headers)
|
||||
await message.ack()
|
||||
logger.debug(f"Message with {message.delivery_tag=} acknowledged.")
|
||||
else:
|
||||
raise ValueError(f"Could not process message with {message.body=}.")
|
||||
# if result:
|
||||
# await self.publish_to_output_exchange(tenant_id, result, filtered_message_headers)
|
||||
# await message.ack()
|
||||
# logger.debug(f"Message with {message.delivery_tag=} acknowledged.")
|
||||
# else:
|
||||
# raise ValueError(f"Could not process message with {message.body=}.")
|
||||
|
||||
except json.JSONDecodeError:
|
||||
await message.nack(requeue=False)
|
||||
logger.error(f"Invalid JSON in input message: {message.body}", exc_info=True)
|
||||
except FileNotFoundError as e:
|
||||
logger.warning(f"{e}, declining message with {message.delivery_tag=}.", exc_info=True)
|
||||
await message.nack(requeue=False)
|
||||
except Exception as e:
|
||||
await message.nack(requeue=False)
|
||||
logger.error(f"Error processing input message: {e}", exc_info=True)
|
||||
finally:
|
||||
self.message_count -= 1
|
||||
# except json.JSONDecodeError:
|
||||
# await message.nack(requeue=False)
|
||||
# logger.error(f"Invalid JSON in input message: {message.body}", exc_info=True)
|
||||
# except FileNotFoundError as e:
|
||||
# logger.warning(f"{e}, declining message with {message.delivery_tag=}.", exc_info=True)
|
||||
# await message.nack(requeue=False)
|
||||
# except Exception as e:
|
||||
# await message.nack(requeue=False)
|
||||
# logger.error(f"Error processing input message: {e}", exc_info=True)
|
||||
# finally:
|
||||
# self.message_count -= 1
|
||||
|
||||
async def publish_to_output_exchange(self, tenant_id: str, result: Dict[str, Any], headers: Dict[str, Any]) -> None:
|
||||
await self.output_exchange.publish(
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "pyinfra"
|
||||
version = "3.4.1"
|
||||
version = "3.4.2"
|
||||
description = ""
|
||||
authors = ["Team Research <research@knecon.com>"]
|
||||
license = "All rights reseverd"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user