diff --git a/pyinfra/queue/queue_manager/pika_queue_manager.py b/pyinfra/queue/queue_manager/pika_queue_manager.py index 5cde19f..8274048 100644 --- a/pyinfra/queue/queue_manager/pika_queue_manager.py +++ b/pyinfra/queue/queue_manager/pika_queue_manager.py @@ -1,10 +1,11 @@ import json import logging +import time import pika from pyinfra.config import CONFIG -from pyinfra.exceptions import ProcessingFailure +from pyinfra.exceptions import ProcessingFailure, DataLoadingFailure from pyinfra.queue.queue_manager.queue_manager import QueueHandle, QueueManager logger = logging.getLogger("pika") @@ -111,7 +112,7 @@ class PikaQueueManager(QueueManager): response = json.dumps(callback(json.loads(body))) self.channel.basic_publish("", self._output_queue, response.encode()) self.channel.basic_ack(frame.delivery_tag) - except ProcessingFailure: + except (ProcessingFailure, DataLoadingFailure): logger.error(f"Message failed to process {n_attempts}/{max_attempts} times: {body}") diff --git a/pyinfra/storage/adapters/s3.py b/pyinfra/storage/adapters/s3.py index 04ecbb4..06a1f69 100644 --- a/pyinfra/storage/adapters/s3.py +++ b/pyinfra/storage/adapters/s3.py @@ -5,6 +5,7 @@ from operator import attrgetter from minio import Minio +from pyinfra.exceptions import DataLoadingFailure from pyinfra.storage.adapters.adapter import StorageAdapter logger = logging.getLogger(__name__) @@ -34,6 +35,8 @@ class S3StorageAdapter(StorageAdapter): try: response = self.__client.get_object(bucket_name, object_name) return response.data + except Exception as err: + raise DataLoadingFailure("Failed getting object from s3 client") from err finally: if response: response.close() diff --git a/pyinfra/storage/storage.py b/pyinfra/storage/storage.py index 827c914..0d50093 100644 --- a/pyinfra/storage/storage.py +++ b/pyinfra/storage/storage.py @@ -1,3 +1,4 @@ +from pyinfra.exceptions import DataLoadingFailure from pyinfra.storage.adapters.adapter import StorageAdapter @@ -15,7 +16,13 @@ class Storage: self.__adapter.put_object(bucket_name, object_name, data) def get_object(self, bucket_name, object_name): - return self.__adapter.get_object(bucket_name, object_name) + return self.__get_object(bucket_name, object_name) + + def __get_object(self, bucket_name, object_name): + try: + return self.__adapter.get_object(bucket_name, object_name) + except Exception as err: + raise DataLoadingFailure from err def get_all_objects(self, bucket_name): return self.__adapter.get_all_objects(bucket_name) diff --git a/pyinfra/visitor.py b/pyinfra/visitor.py index e49a4cd..1fbdab0 100644 --- a/pyinfra/visitor.py +++ b/pyinfra/visitor.py @@ -76,7 +76,7 @@ class QueueVisitor: return gzip.decompress(download()) except Exception as err: logging.warning(f"Loading data from storage failed for {object_descriptor}.") - raise DataLoadingFailure() from err + raise DataLoadingFailure from err def process_data(self, data, body): return self.callback({**body, "data": data}) diff --git a/src/serve.py b/src/serve.py index f670e74..2fc4f9f 100644 --- a/src/serve.py +++ b/src/serve.py @@ -59,7 +59,7 @@ def main(): try: consumer.basic_consume_and_publish() except Exception as err: - raise ConsumerError() from err + raise ConsumerError from err try: consume() diff --git a/test/unit_tests/storage_test.py b/test/unit_tests/storage_test.py index efb7b62..90354d7 100644 --- a/test/unit_tests/storage_test.py +++ b/test/unit_tests/storage_test.py @@ -2,6 +2,8 @@ import logging import pytest +from pyinfra.exceptions import DataLoadingFailure + logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -43,3 +45,8 @@ class TestStorage: storage.put_object(bucket_name, "file2", b"content 2") full_names_received = storage.get_all_object_names(bucket_name) assert {(bucket_name, "file1"), (bucket_name, "file2")} == {*full_names_received} + + def test_data_loading_failure_raised_if_object_not_present(self, storage, bucket_name): + storage.clear_bucket(bucket_name) + with pytest.raises(DataLoadingFailure): + storage.get_object(bucket_name, "folder/file")