Merge in RR/pyinfra from storage-data-access-failure-dead-letter-queue-fix to master
Squashed commit of the following:
commit 1f1fcdf0357fe7817b36eac7e369d5ef75ec5af4
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Mon May 2 14:27:29 2022 +0200
fix: storage data access failure now triggers dead letter queue publishing
92 lines
2.8 KiB
Python
92 lines
2.8 KiB
Python
import abc
|
|
import gzip
|
|
import json
|
|
import logging
|
|
from operator import itemgetter
|
|
from typing import Callable
|
|
|
|
from pyinfra.config import CONFIG, parse_disjunction_string
|
|
from pyinfra.exceptions import DataLoadingFailure
|
|
from pyinfra.storage.storage import Storage
|
|
|
|
|
|
def get_object_name(body):
|
|
dossier_id, file_id, target_file_extension = itemgetter("dossierId", "fileId", "targetFileExtension")(body)
|
|
object_name = f"{dossier_id}/{file_id}.{target_file_extension}"
|
|
return object_name
|
|
|
|
|
|
def get_response_object_name(body):
|
|
dossier_id, file_id, response_file_extension = itemgetter("dossierId", "fileId", "responseFileExtension")(body)
|
|
object_name = f"{dossier_id}/{file_id}.{response_file_extension}"
|
|
return object_name
|
|
|
|
|
|
def get_object_descriptor(body):
|
|
return {"bucket_name": parse_disjunction_string(CONFIG.storage.bucket), "object_name": get_object_name(body)}
|
|
|
|
|
|
def get_response_object_descriptor(body):
|
|
return {
|
|
"bucket_name": parse_disjunction_string(CONFIG.storage.bucket),
|
|
"object_name": get_response_object_name(body),
|
|
}
|
|
|
|
|
|
class ResponseStrategy(abc.ABC):
|
|
@abc.abstractmethod
|
|
def handle_response(self, body):
|
|
pass
|
|
|
|
def __call__(self, body):
|
|
return self.handle_response(body)
|
|
|
|
|
|
class StorageStrategy(ResponseStrategy):
|
|
def __init__(self, storage):
|
|
self.storage = storage
|
|
|
|
def handle_response(self, body):
|
|
self.storage.put_object(**get_response_object_descriptor(body), data=gzip.compress(json.dumps(body).encode()))
|
|
body.pop("data")
|
|
return body
|
|
|
|
|
|
class ForwardingStrategy(ResponseStrategy):
|
|
def handle_response(self, body):
|
|
return body
|
|
|
|
|
|
class QueueVisitor:
|
|
def __init__(self, storage: Storage, callback: Callable, response_strategy):
|
|
self.storage = storage
|
|
self.callback = callback
|
|
self.response_strategy = response_strategy
|
|
|
|
def load_data(self, body):
|
|
def download():
|
|
logging.debug(f"Downloading {object_descriptor}...")
|
|
data = self.storage.get_object(**object_descriptor)
|
|
logging.debug(f"Downloaded {object_descriptor}.")
|
|
return data
|
|
|
|
object_descriptor = get_object_descriptor(body)
|
|
|
|
try:
|
|
return gzip.decompress(download())
|
|
except Exception as err:
|
|
logging.warning(f"Loading data from storage failed for {object_descriptor}.")
|
|
raise DataLoadingFailure from err
|
|
|
|
def process_data(self, data, body):
|
|
return self.callback({**body, "data": data})
|
|
|
|
def load_and_process(self, body):
|
|
data = self.process_data(self.load_data(body), body)
|
|
result_body = {**body, "data": data}
|
|
return result_body
|
|
|
|
def __call__(self, body):
|
|
result_body = self.load_and_process(body)
|
|
return self.response_strategy(result_body)
|