From 71ad2af4eb278a3718ad5385b06f07faa9059e9f Mon Sep 17 00:00:00 2001 From: Viktor Seifert Date: Tue, 23 Aug 2022 10:22:13 +0200 Subject: [PATCH] Pull request #50: RED-5009: Changed callback to not process redelivered messages to prevent endless retries Merge in RR/pyinfra from RED-5009 to master Squashed commit of the following: commit 1f8114379bdeb3af8640c71c2edde2a672bb358c Author: Viktor Seifert Date: Mon Aug 22 16:55:04 2022 +0200 RED-5009: Added the possibility for a callback to signal that a message should be declined/dead-lettered commit be674c2915f6f149c581bc2fe2783217fe424df8 Author: Viktor Seifert Date: Fri Aug 19 16:26:38 2022 +0200 RED-5009: Changed callback to not process redelivered messages to prevent endless retries --- pyinfra/queue/queue_manager.py | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/pyinfra/queue/queue_manager.py b/pyinfra/queue/queue_manager.py index 39383a7..e5cf5d2 100644 --- a/pyinfra/queue/queue_manager.py +++ b/pyinfra/queue/queue_manager.py @@ -112,22 +112,33 @@ class QueueManager(object): def _create_queue_callback(self, process_message_callback: Callable): def callback(_channel, frame, properties, body): self.logger.info(f"Received message from queue with delivery_tag {frame.delivery_tag}") - self.logger.debug(f"Processing {(frame, properties, body)}.") + # Only try to process each message once. + # Requeueing will be handled by the dead-letter-exchange. + # This prevents endless retries on messages that are impossible to process. + if frame.redelivered: + self.logger.info(f"Aborting message processing for delivery_tag {frame.delivery_tag} " + f"due to it being redelivered") + self._channel.basic_nack(frame.delivery_tag, requeue=False) + + self.logger.debug(f"Processing {(frame, properties, body)}.") try: unpacked_message_body = json.loads(body) - callback_result = process_message_callback(unpacked_message_body) + should_publish_result, callback_result = process_message_callback(unpacked_message_body) - self.logger.info("Processed message, publishing result to result-queue") - self._channel.basic_publish("", self._output_queue, json.dumps(callback_result).encode()) + if should_publish_result: + self.logger.info(f"Processed message with delivery_tag {frame.delivery_tag}, " + f"publishing result to result-queue") + self._channel.basic_publish("", self._output_queue, json.dumps(callback_result).encode()) - self.logger.info( - f"Result published, acknowledging incoming message with delivery_tag {frame.delivery_tag}" - ) - self._channel.basic_ack(frame.delivery_tag) + self.logger.info( + f"Result published, acknowledging incoming message with delivery_tag {frame.delivery_tag}" + ) + self._channel.basic_ack(frame.delivery_tag) + else: + self.logger.info(f"Processed message with delivery_tag {frame.delivery_tag}, declining message") - self.logger.info(f"Message with delivery_tag {frame.delivery_tag} processed") except Exception as ex: n_attempts = _get_n_previous_attempts(properties) + 1 self.logger.warning(f"Failed to process message, {n_attempts} attempts, error: {str(ex)}")