refactorig WIP 3: parametrized storage; added azure BS; updated reqs

This commit is contained in:
Matthias Bisping 2022-02-16 16:28:40 +01:00
parent b1ade1fb65
commit 61e03f9c47
11 changed files with 148 additions and 15 deletions

View File

@ -1,5 +1,4 @@
rabbitmq:
host: $RABBITMQ_HOST|localhost # RabbitMQ host address
port: $RABBITMQ_PORT|5672 # RabbitMQ host port
user: $RABBITMQ_USERNAME|user # RabbitMQ username
@ -21,10 +20,10 @@ minio:
bucket: $STORAGE_BUCKET_NAME|redaction # MinIO bucket
service:
logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for log file messages
logfile_path: $LOGFILE_PATH|null # Overwrites the default path for the service logfile (image_service/log.log)
name: $SERVICE_NAME|mini-queue-service-v1 # Name of the service in the kubernetes cluster
storage_backend: $STORAGE_BACKEND|s3 # The storage to pull files to be processed from
webserver:
host: $SANIC_HOST|"0.0.0.0" # Sanic webserver host address

View File

@ -3,8 +3,8 @@ import logging
import tempfile
from operator import itemgetter
from pyinfra.minio import MinioHandle
from pyinfra.rabbitmq import make_connection, make_channel, declare_queue
from pyinfra.storage.storage import StorageHandle
from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name, download, unzip
@ -21,9 +21,7 @@ def make_callback(body_processor, output_queue_name):
return callback
def make_storage_callback(output_queue_name):
storage_client = MinioHandle()
def make_storage_callback(output_queue_name, storage: StorageHandle):
def process(payload):
@ -36,7 +34,7 @@ def make_storage_callback(output_queue_name):
dossier_id, file_id = itemgetter("dossierId", "fileId")(payload)
object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id)
downloaded_file_path = download(storage_client, object_name, pdf_compressed_dir)
downloaded_file_path = download(storage, object_name, pdf_compressed_dir)
unzipped_file_path = unzip(downloaded_file_path, pdf_dir)
return json.dumps(unzipped_file_path)

View File

@ -8,7 +8,7 @@ import pika
from pyinfra.config import CONFIG
from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name, download, unzip
from pyinfra.minio import MinioHandle
from pyinfra.storage.minio import MinioHandle
def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel:

View File

View File

@ -0,0 +1,113 @@
"""Implements a wrapper around an Azure blob storage client that provides operations on the blob storage required by the service."""
import os
from typing import Iterable
from azure.storage.blob import BlobServiceClient, ContainerClient
from pyinfra.config import CONFIG
from pyinfra.storage.storage import StorageHandle
def get_blob_service_client(connection_string=None) -> BlobServiceClient:
"""Instantiates a minio.Minio client.
Args:
connection_string: Azure blob storage connection string.
Returns:
A minio.Minio client.
"""
connection_string = CONFIG.azure_blob_storage.connection_string if connection_string is None else connection_string
return BlobServiceClient.from_connection_string(conn_str=connection_string)
class AzureBlobStorageHandle(StorageHandle):
"""Implements a wrapper around an Azure blob storage client that provides operations on the blob storage required by
the service.
"""
def __init__(self):
"""Initializes an AzureBlobStorageHandle"""
super().__init__()
self.client: BlobServiceClient = get_blob_service_client()
self.default_container_name = CONFIG.azure_blob_storage.container
self.backend = "azure"
def __get_container_client(self, container_name) -> ContainerClient:
if container_name is None:
container_name = self.default_container_name
container_client = self._StorageHandle__provide_container(container_name)
return container_client
def _StorageHandle__provide_container(self, container_name: str):
container_client = self.client.get_container_client(container_name)
return container_client if container_client.exists() else self.client.create_container(container_name)
def _StorageHandle__add_file(self, path, storage_path, container_name: str = None):
container_client = self.__get_container_client(container_name)
blob_client = container_client.get_blob_client(storage_path)
with open(path, "rb") as f:
blob_client.upload_blob(f, overwrite=True)
def list_files(self, container_name=None) -> Iterable[str]:
"""List all files in a container.
Args:
container_name: container to list files from.
Returns:
Iterable of filenames.
"""
return self._list_files("name", container_name=container_name)
def get_objects(self, container_name=None):
"""List all files in a container_name.
Args:
container_name: Container to list files from.
Returns:
Iterable over all objects in the container.
"""
container_client = self.__get_container_client(container_name)
blobs = container_client.list_blobs()
yield from blobs
def _StorageHandle__list_containers(self):
return self.client.list_containers()
def _StorageHandle__purge(self) -> None:
"""Deletes all files and buckets in the store."""
for container in self.client.list_containers():
self.client.delete_container(container.name)
def _StorageHandle__fget_object(self, container_name: str, object_name: str, target_path):
container_client = self.__get_container_client(container_name)
blob_client = container_client.get_blob_client(object_name)
with open(target_path, "wb") as f:
blob_data = blob_client.download_blob()
blob_data.readinto(f)
def _StorageHandle__remove_file(self, folder: str, filename: str, container_name: str = None) -> None:
"""Removes a file from the store.
Args:
folder: Folder containing file.
filename: Name of file (without folder) to remove.
container_name: container containing file.
"""
if container_name is None:
container_name = self.default_container_name
path = os.path.join(folder, filename)
container_client = self.__get_container_client(container_name)
blob_client = container_client.get_blob_client(path)
if blob_client.exists():
container_client.delete_blob(blob_client)

View File

@ -5,7 +5,7 @@ from typing import Iterable
from minio import Minio
from pyinfra.config import CONFIG
from pyinfra.storage import StorageHandle
from pyinfra.storage.storage import StorageHandle
def get_minio_client(access_key=None, secret_key=None) -> Minio:

View File

@ -1,7 +1,9 @@
pika==1.2.0
retry==0.9.2
envyaml==1.8.210417
minio==7.1.1
Flask==2.0.2
envyaml==1.10.211231
minio==7.1.3
Flask==2.0.3
waitress==2.0.0
tqdm==4.62.3
azure-core==1.22.1
azure-storage-blob==12.9.0

View File

@ -3,7 +3,7 @@ import os
from tqdm import tqdm
from pyinfra.minio import MinioHandle
from pyinfra.storage.minio import MinioHandle
def parse_args():

View File

@ -1,7 +1,7 @@
import json
from pyinfra.config import CONFIG
from pyinfra.minio import MinioHandle
from pyinfra.storage.minio import MinioHandle
from pyinfra.rabbitmq import make_channel, declare_queue, make_connection

View File

@ -8,6 +8,10 @@ from waitress import serve
from pyinfra.config import CONFIG
from pyinfra.consume import consume, ConsumerError
from pyinfra.callback import make_callback, make_storage_callback
from pyinfra.storage.azure_blob_storage import AzureBlobStorageHandle
from pyinfra.storage.minio import MinioHandle
# from image_service.storage.azure_blob_storage import AzureBlobStorageHandle
# TODO: implement meaningful checks
@ -33,9 +37,26 @@ def start_integrity_checks_webserver(mode="debug"):
serve(app, host=CONFIG.webserver.host, port=CONFIG.webserver.port)
class UnknownStorageBackend(ValueError):
pass
def get_storage():
storage_backend = CONFIG.service.storage_backend
if storage_backend == "s3":
storage = MinioHandle()
elif storage_backend == "azure":
storage = AzureBlobStorageHandle()
else:
raise UnknownStorageBackend(f"Unknown storage backend '{storage_backend}'.")
return storage
def main():
callback = make_storage_callback(output_queue_name=CONFIG.rabbitmq.queues.output)
callback = make_storage_callback(output_queue_name=CONFIG.rabbitmq.queues.output, storage=get_storage())
webserver = Process(target=start_integrity_checks_webserver, args=("debug", ))
webserver.start()