diff --git a/pyinfra/queue/queue_manager/pika_queue_manager.py b/pyinfra/queue/queue_manager/pika_queue_manager.py index 8274048..5ff27be 100644 --- a/pyinfra/queue/queue_manager/pika_queue_manager.py +++ b/pyinfra/queue/queue_manager/pika_queue_manager.py @@ -97,7 +97,7 @@ class PikaQueueManager(QueueManager): self.channel.basic_publish("", self._input_queue, json.dumps(request).encode()) def reject(self, body, frame): - logger.exception(f"Adding to dead letter queue: {body}") + logger.error(f"Adding to dead letter queue: {body}") self.channel.basic_reject(delivery_tag=frame.delivery_tag, requeue=False) def publish_response(self, message, callback, max_attempts=3): diff --git a/pyinfra/storage/storage.py b/pyinfra/storage/storage.py index 0d50093..c93ccf7 100644 --- a/pyinfra/storage/storage.py +++ b/pyinfra/storage/storage.py @@ -1,6 +1,14 @@ +import logging + +from retry import retry + +from pyinfra.config import CONFIG from pyinfra.exceptions import DataLoadingFailure from pyinfra.storage.adapters.adapter import StorageAdapter +logger = logging.getLogger(__name__) +logger.setLevel(CONFIG.service.logging_level) + class Storage: def __init__(self, adapter: StorageAdapter): @@ -18,10 +26,12 @@ class Storage: def get_object(self, bucket_name, object_name): return self.__get_object(bucket_name, object_name) + @retry(DataLoadingFailure, tries=3, delay=5, jitter=(1, 3)) def __get_object(self, bucket_name, object_name): try: return self.__adapter.get_object(bucket_name, object_name) except Exception as err: + logging.error(err) raise DataLoadingFailure from err def get_all_objects(self, bucket_name): diff --git a/scripts/mock_client.py b/scripts/mock_client.py index 0a97e03..db620d0 100644 --- a/scripts/mock_client.py +++ b/scripts/mock_client.py @@ -10,7 +10,7 @@ from pyinfra.storage.storages import get_s3_storage def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--bucket_name", "-b", required=True) - parser.add_argument("--analysis_container", "-a", choices=["detr", "ner", "image"], required=True) + parser.add_argument("--analysis_container", "-a", choices=["detr", "ner", "image", "dl_error"], required=True) args = parser.parse_args() return args @@ -47,6 +47,8 @@ def build_message_bodies(analyse_container_type, bucket_name): def update_message(message_dict): if analyse_container_type == "detr" or analyse_container_type == "image": message_dict.update({"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "IMAGE_INFO.json.gz"}) + if analyse_container_type == "dl_error": + message_dict.update({"targetFileExtension": "no_such_file", "responseFileExtension": "IMAGE_INFO.json.gz"}) if analyse_container_type == "ner": message_dict.update( {"targetFileExtension": "TEXT.json.gz", "responseFileExtension": "NER_ENTITIES.json.gz"} @@ -74,7 +76,9 @@ def main(args): channel.basic_publish("", CONFIG.rabbitmq.queues.input, body) print(f"Put {body} on {CONFIG.rabbitmq.queues.input}") - for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output): + for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output, inactivity_timeout=1): + if not body: + break print(f"Received {json.loads(body)}") channel.basic_ack(method_frame.delivery_tag) channel.close()