From 6d31cbe6353c1ab06e510b746de1a9d84d742a84 Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Tue, 31 May 2022 13:13:18 +0200 Subject: [PATCH] Pull request #37: RED-4124 PyInfra now tries three times to download an unobtainable object before republishing message, removed unreadable stack trace print if message is put on dead letter queue since allready logged where the exeption is raised. Merge in RR/pyinfra from RED-4124-s3-retry to master Squashed commit of the following: commit 40e4dc3712fc692115b8274226430a431bf0192f Author: Julius Unverfehrt Date: Tue May 31 12:59:45 2022 +0200 RED-4124 PyInfra now tries three times to download an unobtainable object before republishing message, removed unreadable stack trace print if message is put on dead letter queue since allready logged where the exeption is raised. --- pyinfra/queue/queue_manager/pika_queue_manager.py | 2 +- pyinfra/storage/storage.py | 10 ++++++++++ scripts/mock_client.py | 8 ++++++-- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/pyinfra/queue/queue_manager/pika_queue_manager.py b/pyinfra/queue/queue_manager/pika_queue_manager.py index 8274048..5ff27be 100644 --- a/pyinfra/queue/queue_manager/pika_queue_manager.py +++ b/pyinfra/queue/queue_manager/pika_queue_manager.py @@ -97,7 +97,7 @@ class PikaQueueManager(QueueManager): self.channel.basic_publish("", self._input_queue, json.dumps(request).encode()) def reject(self, body, frame): - logger.exception(f"Adding to dead letter queue: {body}") + logger.error(f"Adding to dead letter queue: {body}") self.channel.basic_reject(delivery_tag=frame.delivery_tag, requeue=False) def publish_response(self, message, callback, max_attempts=3): diff --git a/pyinfra/storage/storage.py b/pyinfra/storage/storage.py index 0d50093..c93ccf7 100644 --- a/pyinfra/storage/storage.py +++ b/pyinfra/storage/storage.py @@ -1,6 +1,14 @@ +import logging + +from retry import retry + +from pyinfra.config import CONFIG from pyinfra.exceptions import DataLoadingFailure from pyinfra.storage.adapters.adapter import StorageAdapter +logger = logging.getLogger(__name__) +logger.setLevel(CONFIG.service.logging_level) + class Storage: def __init__(self, adapter: StorageAdapter): @@ -18,10 +26,12 @@ class Storage: def get_object(self, bucket_name, object_name): return self.__get_object(bucket_name, object_name) + @retry(DataLoadingFailure, tries=3, delay=5, jitter=(1, 3)) def __get_object(self, bucket_name, object_name): try: return self.__adapter.get_object(bucket_name, object_name) except Exception as err: + logging.error(err) raise DataLoadingFailure from err def get_all_objects(self, bucket_name): diff --git a/scripts/mock_client.py b/scripts/mock_client.py index 0a97e03..db620d0 100644 --- a/scripts/mock_client.py +++ b/scripts/mock_client.py @@ -10,7 +10,7 @@ from pyinfra.storage.storages import get_s3_storage def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--bucket_name", "-b", required=True) - parser.add_argument("--analysis_container", "-a", choices=["detr", "ner", "image"], required=True) + parser.add_argument("--analysis_container", "-a", choices=["detr", "ner", "image", "dl_error"], required=True) args = parser.parse_args() return args @@ -47,6 +47,8 @@ def build_message_bodies(analyse_container_type, bucket_name): def update_message(message_dict): if analyse_container_type == "detr" or analyse_container_type == "image": message_dict.update({"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "IMAGE_INFO.json.gz"}) + if analyse_container_type == "dl_error": + message_dict.update({"targetFileExtension": "no_such_file", "responseFileExtension": "IMAGE_INFO.json.gz"}) if analyse_container_type == "ner": message_dict.update( {"targetFileExtension": "TEXT.json.gz", "responseFileExtension": "NER_ENTITIES.json.gz"} @@ -74,7 +76,9 @@ def main(args): channel.basic_publish("", CONFIG.rabbitmq.queues.input, body) print(f"Put {body} on {CONFIG.rabbitmq.queues.input}") - for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output): + for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output, inactivity_timeout=1): + if not body: + break print(f"Received {json.loads(body)}") channel.basic_ack(method_frame.delivery_tag) channel.close()