Pull request #50: RED-5009: Changed callback to not process redelivered messages to prevent endless retries

Merge in RR/pyinfra from RED-5009 to master

Squashed commit of the following:

commit 1f8114379bdeb3af8640c71c2edde2a672bb358c
Author: Viktor Seifert <viktor.seifert@iqser.com>
Date:   Mon Aug 22 16:55:04 2022 +0200

    RED-5009: Added the possibility for a callback to signal that a message should be declined/dead-lettered

commit be674c2915f6f149c581bc2fe2783217fe424df8
Author: Viktor Seifert <viktor.seifert@iqser.com>
Date:   Fri Aug 19 16:26:38 2022 +0200

    RED-5009: Changed callback to not process redelivered messages to prevent endless retries
This commit is contained in:
Viktor Seifert 2022-08-23 10:22:13 +02:00 committed by Julius Unverfehrt
parent be82114f83
commit 71ad2af4eb

View File

@ -112,22 +112,33 @@ class QueueManager(object):
def _create_queue_callback(self, process_message_callback: Callable):
def callback(_channel, frame, properties, body):
self.logger.info(f"Received message from queue with delivery_tag {frame.delivery_tag}")
self.logger.debug(f"Processing {(frame, properties, body)}.")
# Only try to process each message once.
# Requeueing will be handled by the dead-letter-exchange.
# This prevents endless retries on messages that are impossible to process.
if frame.redelivered:
self.logger.info(f"Aborting message processing for delivery_tag {frame.delivery_tag} "
f"due to it being redelivered")
self._channel.basic_nack(frame.delivery_tag, requeue=False)
self.logger.debug(f"Processing {(frame, properties, body)}.")
try:
unpacked_message_body = json.loads(body)
callback_result = process_message_callback(unpacked_message_body)
should_publish_result, callback_result = process_message_callback(unpacked_message_body)
self.logger.info("Processed message, publishing result to result-queue")
self._channel.basic_publish("", self._output_queue, json.dumps(callback_result).encode())
if should_publish_result:
self.logger.info(f"Processed message with delivery_tag {frame.delivery_tag}, "
f"publishing result to result-queue")
self._channel.basic_publish("", self._output_queue, json.dumps(callback_result).encode())
self.logger.info(
f"Result published, acknowledging incoming message with delivery_tag {frame.delivery_tag}"
)
self._channel.basic_ack(frame.delivery_tag)
self.logger.info(
f"Result published, acknowledging incoming message with delivery_tag {frame.delivery_tag}"
)
self._channel.basic_ack(frame.delivery_tag)
else:
self.logger.info(f"Processed message with delivery_tag {frame.delivery_tag}, declining message")
self.logger.info(f"Message with delivery_tag {frame.delivery_tag} processed")
except Exception as ex:
n_attempts = _get_n_previous_attempts(properties) + 1
self.logger.warning(f"Failed to process message, {n_attempts} attempts, error: {str(ex)}")