diff --git a/pyinfra/queue/queue_manager.py b/pyinfra/queue/queue_manager.py index 39383a7..e5cf5d2 100644 --- a/pyinfra/queue/queue_manager.py +++ b/pyinfra/queue/queue_manager.py @@ -112,22 +112,33 @@ class QueueManager(object): def _create_queue_callback(self, process_message_callback: Callable): def callback(_channel, frame, properties, body): self.logger.info(f"Received message from queue with delivery_tag {frame.delivery_tag}") - self.logger.debug(f"Processing {(frame, properties, body)}.") + # Only try to process each message once. + # Requeueing will be handled by the dead-letter-exchange. + # This prevents endless retries on messages that are impossible to process. + if frame.redelivered: + self.logger.info(f"Aborting message processing for delivery_tag {frame.delivery_tag} " + f"due to it being redelivered") + self._channel.basic_nack(frame.delivery_tag, requeue=False) + + self.logger.debug(f"Processing {(frame, properties, body)}.") try: unpacked_message_body = json.loads(body) - callback_result = process_message_callback(unpacked_message_body) + should_publish_result, callback_result = process_message_callback(unpacked_message_body) - self.logger.info("Processed message, publishing result to result-queue") - self._channel.basic_publish("", self._output_queue, json.dumps(callback_result).encode()) + if should_publish_result: + self.logger.info(f"Processed message with delivery_tag {frame.delivery_tag}, " + f"publishing result to result-queue") + self._channel.basic_publish("", self._output_queue, json.dumps(callback_result).encode()) - self.logger.info( - f"Result published, acknowledging incoming message with delivery_tag {frame.delivery_tag}" - ) - self._channel.basic_ack(frame.delivery_tag) + self.logger.info( + f"Result published, acknowledging incoming message with delivery_tag {frame.delivery_tag}" + ) + self._channel.basic_ack(frame.delivery_tag) + else: + self.logger.info(f"Processed message with delivery_tag {frame.delivery_tag}, declining message") - self.logger.info(f"Message with delivery_tag {frame.delivery_tag} processed") except Exception as ex: n_attempts = _get_n_previous_attempts(properties) + 1 self.logger.warning(f"Failed to process message, {n_attempts} attempts, error: {str(ex)}")