change redelivery logic

Nack on message redelivered messages disabled for
DHL. Callback processing might need ~15h, usually
for any reason, connection to the rabbitmq server
is lost in this time. Although the result is
uploaded, the caller won't get informed about
this. Therefore, if a message is reprocessed,
there is a lookup in the callback if the response
objects are there. If so, the message gets
acknowledged, otherwise the message will be
discarded via should publish result flag.
This commit is contained in:
Julius Unverfehrt 2023-02-08 08:40:33 +01:00
parent fff6772a22
commit 62d453dbe3

View File

@ -114,18 +114,24 @@ class QueueManager(object):
# Only try to process each message once.
# Requeueing will be handled by the dead-letter-exchange.
# This prevents endless retries on messages that are impossible to process.
if frame.redelivered:
self.logger.info(
f"Aborting message processing for delivery_tag {frame.delivery_tag} " f"due to it being redelivered"
)
self._channel.basic_nack(frame.delivery_tag, requeue=False)
return
# FIXME: disabled for DHL. Callback processing might need ~15h, usually for any reason, connection to the
# rabbitmq server is lost in this time. Although the result is uploaded, the caller won't get informed
# about this. Therefore, if a message is reprocessed, there is a lookup in the callback if the response
# objects are there. If so, the message gets acknowledged, otherwise the message will be discarded via
# should publish result flag.
# if frame.redelivered:
# self.logger.info(
# f"Aborting message processing for delivery_tag {frame.delivery_tag} " f"due to it being redelivered"
# )
# self._channel.basic_nack(frame.delivery_tag, requeue=False)
# return
self.logger.debug(f"Processing {(frame, properties, body)}.")
is_redelivered = frame.redelivered
try:
unpacked_message_body = json.loads(body)
should_publish_result, callback_result = process_message_callback(unpacked_message_body)
should_publish_result, callback_result = process_message_callback(unpacked_message_body, is_redelivered)
if should_publish_result:
self.logger.info(