151 lines
5.3 KiB
Python

import gzip
import json
from functools import singledispatch
from typing import TypedDict
from kn_utils.logging import logger
from pydantic import BaseModel, ValidationError
from pyinfra.storage.storages.storage import Storage
class DossierIdFileIdDownloadPayload(BaseModel):
dossierId: str
fileId: str
targetFileExtension: str
@property
def targetFilePath(self):
return f"{self.dossierId}/{self.fileId}.{self.targetFileExtension}"
class TenantIdDossierIdFileIdDownloadPayload(BaseModel):
tenantId: str
dossierId: str
fileId: str
targetFileExtension: str
@property
def targetFilePath(self):
return f"{self.tenantId}/{self.dossierId}/{self.fileId}.{self.targetFileExtension}"
class DossierIdFileIdUploadPayload(BaseModel):
dossierId: str
fileId: str
responseFileExtension: str
@property
def responseFilePath(self):
return f"{self.dossierId}/{self.fileId}.{self.responseFileExtension}"
class TenantIdDossierIdFileIdUploadPayload(BaseModel):
tenantId: str
dossierId: str
fileId: str
responseFileExtension: str
@property
def responseFilePath(self):
return f"{self.tenantId}/{self.dossierId}/{self.fileId}.{self.responseFileExtension}"
class TargetResponseFilePathDownloadPayload(BaseModel):
targetFilePath: str | dict[str, str]
class TargetResponseFilePathUploadPayload(BaseModel):
responseFilePath: str
class DownloadedData(TypedDict):
data: bytes
file_path: str
def download_data_bytes_as_specified_in_message(
storage: Storage, raw_payload: dict
) -> dict[str, DownloadedData] | DownloadedData:
"""Convenience function to download a file specified in a message payload.
Supports both legacy and new payload formats. Also supports downloading multiple files at once, which should
be specified in a dictionary under the 'targetFilePath' key with the file path as value.
The data is downloaded as bytes and returned as a dictionary with the file path as key and the data as value.
In case of several download targets, a nested dictionary is returned with the same keys and dictionaries with
the file path and data as values.
"""
try:
if "tenantId" in raw_payload and "dossierId" in raw_payload:
payload = TenantIdDossierIdFileIdDownloadPayload(**raw_payload)
elif "tenantId" not in raw_payload and "dossierId" in raw_payload:
payload = DossierIdFileIdDownloadPayload(**raw_payload)
else:
payload = TargetResponseFilePathDownloadPayload(**raw_payload)
except ValidationError:
raise ValueError("No download file path found in payload, nothing to download.")
data = _download(payload.targetFilePath, storage)
return data
@singledispatch
def _download(
file_path_or_file_path_dict: str | dict[str, str], storage: Storage
) -> dict[str, DownloadedData] | DownloadedData:
pass
@_download.register(str)
def _download_single_file(file_path: str, storage: Storage) -> DownloadedData:
if not storage.exists(file_path):
raise FileNotFoundError(f"File '{file_path}' does not exist in storage.")
data = storage.get_object(file_path)
logger.info(f"Downloaded {file_path} from storage.")
return DownloadedData(data=data, file_path=file_path)
@_download.register(dict)
def _download_multiple_files(file_path_dict: dict, storage: Storage) -> dict[str, DownloadedData]:
return {key: _download(value, storage) for key, value in file_path_dict.items()}
def upload_data_as_specified_in_message(storage: Storage, raw_payload: dict, data):
"""Convenience function to upload a file specified in a message payload. For now, only json serializable data is
supported. The storage json consists of the raw_payload, which is extended with a 'data' key, containing the
data to be uploaded.
If the content is not a json serializable object, an exception will be raised.
If the result file identifier specifies compression with gzip (.gz), it will be compressed before upload.
This function can be extended in the future as needed (e.g. if we need to upload images), but since further
requirements are not specified at this point in time, and it is unclear what these would entail, the code is kept
simple for now to improve readability, maintainability and avoid refactoring efforts of generic solutions that
weren't as generic as they seemed.
"""
try:
if "tenantId" in raw_payload and "dossierId" in raw_payload:
payload = TenantIdDossierIdFileIdUploadPayload(**raw_payload)
elif "tenantId" not in raw_payload and "dossierId" in raw_payload:
payload = DossierIdFileIdUploadPayload(**raw_payload)
else:
payload = TargetResponseFilePathUploadPayload(**raw_payload)
except ValidationError:
raise ValueError("No upload file path found in payload, nothing to upload.")
if ".json" not in payload.responseFilePath:
raise ValueError("Only json serializable data can be uploaded.")
data = {**raw_payload, "data": data}
data = json.dumps(data).encode("utf-8")
data = gzip.compress(data) if ".gz" in payload.responseFilePath else data
storage.put_object(payload.responseFilePath, data)
logger.info(f"Uploaded {payload.responseFilePath} to storage.")