diff --git a/config.yaml b/config.yaml index 8dfbd51..f9a0671 100755 --- a/config.yaml +++ b/config.yaml @@ -1,5 +1,4 @@ rabbitmq: - host: $RABBITMQ_HOST|localhost # RabbitMQ host address port: $RABBITMQ_PORT|5672 # RabbitMQ host port user: $RABBITMQ_USERNAME|user # RabbitMQ username @@ -21,10 +20,10 @@ minio: bucket: $STORAGE_BUCKET_NAME|redaction # MinIO bucket service: - logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for log file messages logfile_path: $LOGFILE_PATH|null # Overwrites the default path for the service logfile (image_service/log.log) name: $SERVICE_NAME|mini-queue-service-v1 # Name of the service in the kubernetes cluster + storage_backend: $STORAGE_BACKEND|s3 # The storage to pull files to be processed from webserver: host: $SANIC_HOST|"0.0.0.0" # Sanic webserver host address diff --git a/pyinfra/callback.py b/pyinfra/callback.py index 20548c5..cbf76e6 100644 --- a/pyinfra/callback.py +++ b/pyinfra/callback.py @@ -3,8 +3,8 @@ import logging import tempfile from operator import itemgetter -from pyinfra.minio import MinioHandle from pyinfra.rabbitmq import make_connection, make_channel, declare_queue +from pyinfra.storage.storage import StorageHandle from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name, download, unzip @@ -21,9 +21,7 @@ def make_callback(body_processor, output_queue_name): return callback -def make_storage_callback(output_queue_name): - - storage_client = MinioHandle() +def make_storage_callback(output_queue_name, storage: StorageHandle): def process(payload): @@ -36,7 +34,7 @@ def make_storage_callback(output_queue_name): dossier_id, file_id = itemgetter("dossierId", "fileId")(payload) object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id) - downloaded_file_path = download(storage_client, object_name, pdf_compressed_dir) + downloaded_file_path = download(storage, object_name, pdf_compressed_dir) unzipped_file_path = unzip(downloaded_file_path, pdf_dir) return json.dumps(unzipped_file_path) diff --git a/pyinfra/rabbitmq.py b/pyinfra/rabbitmq.py index 1114a16..8a1df72 100644 --- a/pyinfra/rabbitmq.py +++ b/pyinfra/rabbitmq.py @@ -8,7 +8,7 @@ import pika from pyinfra.config import CONFIG from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name, download, unzip -from pyinfra.minio import MinioHandle +from pyinfra.storage.minio import MinioHandle def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel: diff --git a/pyinfra/storage/__init__.py b/pyinfra/storage/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/storage/azure_blob_storage.py b/pyinfra/storage/azure_blob_storage.py new file mode 100644 index 0000000..f3fa40b --- /dev/null +++ b/pyinfra/storage/azure_blob_storage.py @@ -0,0 +1,113 @@ +"""Implements a wrapper around an Azure blob storage client that provides operations on the blob storage required by the service.""" +import os +from typing import Iterable + +from azure.storage.blob import BlobServiceClient, ContainerClient + +from pyinfra.config import CONFIG +from pyinfra.storage.storage import StorageHandle + + +def get_blob_service_client(connection_string=None) -> BlobServiceClient: + """Instantiates a minio.Minio client. + + Args: + connection_string: Azure blob storage connection string. + + Returns: + A minio.Minio client. + """ + connection_string = CONFIG.azure_blob_storage.connection_string if connection_string is None else connection_string + return BlobServiceClient.from_connection_string(conn_str=connection_string) + + +class AzureBlobStorageHandle(StorageHandle): + """Implements a wrapper around an Azure blob storage client that provides operations on the blob storage required by + the service. + """ + + def __init__(self): + """Initializes an AzureBlobStorageHandle""" + super().__init__() + self.client: BlobServiceClient = get_blob_service_client() + self.default_container_name = CONFIG.azure_blob_storage.container + self.backend = "azure" + + def __get_container_client(self, container_name) -> ContainerClient: + + if container_name is None: + container_name = self.default_container_name + + container_client = self._StorageHandle__provide_container(container_name) + return container_client + + def _StorageHandle__provide_container(self, container_name: str): + container_client = self.client.get_container_client(container_name) + return container_client if container_client.exists() else self.client.create_container(container_name) + + def _StorageHandle__add_file(self, path, storage_path, container_name: str = None): + + container_client = self.__get_container_client(container_name) + blob_client = container_client.get_blob_client(storage_path) + + with open(path, "rb") as f: + blob_client.upload_blob(f, overwrite=True) + + def list_files(self, container_name=None) -> Iterable[str]: + """List all files in a container. + + Args: + container_name: container to list files from. + + Returns: + Iterable of filenames. + """ + return self._list_files("name", container_name=container_name) + + def get_objects(self, container_name=None): + """List all files in a container_name. + + Args: + container_name: Container to list files from. + + Returns: + Iterable over all objects in the container. + """ + container_client = self.__get_container_client(container_name) + blobs = container_client.list_blobs() + yield from blobs + + def _StorageHandle__list_containers(self): + return self.client.list_containers() + + def _StorageHandle__purge(self) -> None: + """Deletes all files and buckets in the store.""" + for container in self.client.list_containers(): + self.client.delete_container(container.name) + + def _StorageHandle__fget_object(self, container_name: str, object_name: str, target_path): + + container_client = self.__get_container_client(container_name) + blob_client = container_client.get_blob_client(object_name) + + with open(target_path, "wb") as f: + blob_data = blob_client.download_blob() + blob_data.readinto(f) + + def _StorageHandle__remove_file(self, folder: str, filename: str, container_name: str = None) -> None: + """Removes a file from the store. + + Args: + folder: Folder containing file. + filename: Name of file (without folder) to remove. + container_name: container containing file. + """ + if container_name is None: + container_name = self.default_container_name + + path = os.path.join(folder, filename) + + container_client = self.__get_container_client(container_name) + blob_client = container_client.get_blob_client(path) + if blob_client.exists(): + container_client.delete_blob(blob_client) diff --git a/pyinfra/minio.py b/pyinfra/storage/minio.py similarity index 98% rename from pyinfra/minio.py rename to pyinfra/storage/minio.py index 4455257..3a6eed9 100644 --- a/pyinfra/minio.py +++ b/pyinfra/storage/minio.py @@ -5,7 +5,7 @@ from typing import Iterable from minio import Minio from pyinfra.config import CONFIG -from pyinfra.storage import StorageHandle +from pyinfra.storage.storage import StorageHandle def get_minio_client(access_key=None, secret_key=None) -> Minio: diff --git a/pyinfra/storage.py b/pyinfra/storage/storage.py similarity index 100% rename from pyinfra/storage.py rename to pyinfra/storage/storage.py diff --git a/requirements.txt b/requirements.txt index ae2cb1a..1fdef37 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,9 @@ pika==1.2.0 retry==0.9.2 -envyaml==1.8.210417 -minio==7.1.1 -Flask==2.0.2 +envyaml==1.10.211231 +minio==7.1.3 +Flask==2.0.3 waitress==2.0.0 tqdm==4.62.3 +azure-core==1.22.1 +azure-storage-blob==12.9.0 diff --git a/scripts/manage_minio.py b/scripts/manage_minio.py index 278f32c..6433404 100644 --- a/scripts/manage_minio.py +++ b/scripts/manage_minio.py @@ -3,7 +3,7 @@ import os from tqdm import tqdm -from pyinfra.minio import MinioHandle +from pyinfra.storage.minio import MinioHandle def parse_args(): diff --git a/scripts/mock_client.py b/scripts/mock_client.py index 34bbe32..d914c69 100644 --- a/scripts/mock_client.py +++ b/scripts/mock_client.py @@ -1,7 +1,7 @@ import json from pyinfra.config import CONFIG -from pyinfra.minio import MinioHandle +from pyinfra.storage.minio import MinioHandle from pyinfra.rabbitmq import make_channel, declare_queue, make_connection diff --git a/src/serve.py b/src/serve.py index 1772242..da7fd5b 100644 --- a/src/serve.py +++ b/src/serve.py @@ -8,6 +8,10 @@ from waitress import serve from pyinfra.config import CONFIG from pyinfra.consume import consume, ConsumerError from pyinfra.callback import make_callback, make_storage_callback +from pyinfra.storage.azure_blob_storage import AzureBlobStorageHandle + +from pyinfra.storage.minio import MinioHandle +# from image_service.storage.azure_blob_storage import AzureBlobStorageHandle # TODO: implement meaningful checks @@ -33,9 +37,26 @@ def start_integrity_checks_webserver(mode="debug"): serve(app, host=CONFIG.webserver.host, port=CONFIG.webserver.port) +class UnknownStorageBackend(ValueError): + pass + + +def get_storage(): + + storage_backend = CONFIG.service.storage_backend + if storage_backend == "s3": + storage = MinioHandle() + elif storage_backend == "azure": + storage = AzureBlobStorageHandle() + else: + raise UnknownStorageBackend(f"Unknown storage backend '{storage_backend}'.") + + return storage + + def main(): - callback = make_storage_callback(output_queue_name=CONFIG.rabbitmq.queues.output) + callback = make_storage_callback(output_queue_name=CONFIG.rabbitmq.queues.output, storage=get_storage()) webserver = Process(target=start_integrity_checks_webserver, args=("debug", )) webserver.start()