diff --git a/pyinfra/queue/queue_manager.py b/pyinfra/queue/queue_manager.py index b8159fe..5424c6f 100644 --- a/pyinfra/queue/queue_manager.py +++ b/pyinfra/queue/queue_manager.py @@ -114,18 +114,24 @@ class QueueManager(object): # Only try to process each message once. # Requeueing will be handled by the dead-letter-exchange. # This prevents endless retries on messages that are impossible to process. - if frame.redelivered: - self.logger.info( - f"Aborting message processing for delivery_tag {frame.delivery_tag} " f"due to it being redelivered" - ) - self._channel.basic_nack(frame.delivery_tag, requeue=False) - return + # FIXME: disabled for DHL. Callback processing might need ~15h, usually for any reason, connection to the + # rabbitmq server is lost in this time. Although the result is uploaded, the caller won't get informed + # about this. Therefore, if a message is reprocessed, there is a lookup in the callback if the response + # objects are there. If so, the message gets acknowledged, otherwise the message will be discarded via + # should publish result flag. + # if frame.redelivered: + # self.logger.info( + # f"Aborting message processing for delivery_tag {frame.delivery_tag} " f"due to it being redelivered" + # ) + # self._channel.basic_nack(frame.delivery_tag, requeue=False) + # return self.logger.debug(f"Processing {(frame, properties, body)}.") + is_redelivered = frame.redelivered try: unpacked_message_body = json.loads(body) - should_publish_result, callback_result = process_message_callback(unpacked_message_body) + should_publish_result, callback_result = process_message_callback(unpacked_message_body, is_redelivered) if should_publish_result: self.logger.info(