Pull request #35: fix: storage data access failure now triggers dead letter queue publishing

Merge in RR/pyinfra from storage-data-access-failure-dead-letter-queue-fix to master

Squashed commit of the following:

commit 1f1fcdf0357fe7817b36eac7e369d5ef75ec5af4
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Mon May 2 14:27:29 2022 +0200

    fix: storage data access failure now triggers dead letter queue publishing
This commit is contained in:
Julius Unverfehrt 2022-05-02 14:55:38 +02:00
parent 0e47ba61c7
commit d95b483917
6 changed files with 23 additions and 5 deletions

View File

@ -1,10 +1,11 @@
import json
import logging
import time
import pika
from pyinfra.config import CONFIG
from pyinfra.exceptions import ProcessingFailure
from pyinfra.exceptions import ProcessingFailure, DataLoadingFailure
from pyinfra.queue.queue_manager.queue_manager import QueueHandle, QueueManager
logger = logging.getLogger("pika")
@ -111,7 +112,7 @@ class PikaQueueManager(QueueManager):
response = json.dumps(callback(json.loads(body)))
self.channel.basic_publish("", self._output_queue, response.encode())
self.channel.basic_ack(frame.delivery_tag)
except ProcessingFailure:
except (ProcessingFailure, DataLoadingFailure):
logger.error(f"Message failed to process {n_attempts}/{max_attempts} times: {body}")

View File

@ -5,6 +5,7 @@ from operator import attrgetter
from minio import Minio
from pyinfra.exceptions import DataLoadingFailure
from pyinfra.storage.adapters.adapter import StorageAdapter
logger = logging.getLogger(__name__)
@ -34,6 +35,8 @@ class S3StorageAdapter(StorageAdapter):
try:
response = self.__client.get_object(bucket_name, object_name)
return response.data
except Exception as err:
raise DataLoadingFailure("Failed getting object from s3 client") from err
finally:
if response:
response.close()

View File

@ -1,3 +1,4 @@
from pyinfra.exceptions import DataLoadingFailure
from pyinfra.storage.adapters.adapter import StorageAdapter
@ -15,7 +16,13 @@ class Storage:
self.__adapter.put_object(bucket_name, object_name, data)
def get_object(self, bucket_name, object_name):
return self.__adapter.get_object(bucket_name, object_name)
return self.__get_object(bucket_name, object_name)
def __get_object(self, bucket_name, object_name):
try:
return self.__adapter.get_object(bucket_name, object_name)
except Exception as err:
raise DataLoadingFailure from err
def get_all_objects(self, bucket_name):
return self.__adapter.get_all_objects(bucket_name)

View File

@ -76,7 +76,7 @@ class QueueVisitor:
return gzip.decompress(download())
except Exception as err:
logging.warning(f"Loading data from storage failed for {object_descriptor}.")
raise DataLoadingFailure() from err
raise DataLoadingFailure from err
def process_data(self, data, body):
return self.callback({**body, "data": data})

View File

@ -59,7 +59,7 @@ def main():
try:
consumer.basic_consume_and_publish()
except Exception as err:
raise ConsumerError() from err
raise ConsumerError from err
try:
consume()

View File

@ -2,6 +2,8 @@ import logging
import pytest
from pyinfra.exceptions import DataLoadingFailure
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@ -43,3 +45,8 @@ class TestStorage:
storage.put_object(bucket_name, "file2", b"content 2")
full_names_received = storage.get_all_object_names(bucket_name)
assert {(bucket_name, "file1"), (bucket_name, "file2")} == {*full_names_received}
def test_data_loading_failure_raised_if_object_not_present(self, storage, bucket_name):
storage.clear_bucket(bucket_name)
with pytest.raises(DataLoadingFailure):
storage.get_object(bucket_name, "folder/file")