"""Implements a wrapper around a MinIO client that provides operations on the MinIO store required by the service.""" import os from typing import Iterable from minio import Minio from mini_queue.utils.config import CONFIG from mini_queue.utils.storage import StorageHandle def get_minio_client(access_key=None, secret_key=None) -> Minio: """Instantiates a minio.Minio client. Args: access_key: Access key for MinIO client (username). secret_key: Secret key for MinIO client (password). Returns: A minio.Minio client. """ access_key = CONFIG.minio.user if access_key is None else access_key secret_key = CONFIG.minio.password if secret_key is None else secret_key # TODO: secure=True/False? return Minio(f"{CONFIG.minio.host}:{CONFIG.minio.port}", access_key=access_key, secret_key=secret_key, secure=False) class MinioHandle(StorageHandle): """Wrapper around a MinIO client that provides operations on the MinIO store required by the service.""" def __init__(self): """Initializes a MinioHandle""" super().__init__() self.client: Minio = get_minio_client() self.default_container_name = CONFIG.minio.bucket self.backend = "s3" def _StorageHandle__provide_container(self, container_name): if not self.client.bucket_exists(container_name): self.client.make_bucket(container_name) def _StorageHandle__add_file(self, path, storage_path, container_name=None): if container_name is None: container_name = self.default_container_name self._StorageHandle__provide_container(container_name) with open(path, "rb") as f: stat = os.stat(path) self.client.put_object(container_name, storage_path, f, stat.st_size) def list_files(self, container_name=None) -> Iterable[str]: """List all files in a container. Args: container_name: container to list files from. Returns: Iterable of filenames. """ return self._list_files("object_name", container_name=container_name) def get_objects(self, container_name=None): """Gets all objects in a container. Args: container_name: container to get objects from. Returns: Iterable over all objects in the container. """ if container_name is None: container_name = self.default_container_name yield from self.client.list_objects(container_name, recursive=True) def _StorageHandle__list_containers(self): return self.client.list_buckets() def _StorageHandle__purge(self) -> None: """Deletes all files and containers in the store.""" for container, obj in self.get_all_objects(): self.client.remove_object(container.name, obj.object_name) for container in self.client.list_buckets(): self.client.remove_bucket(container.name) def _StorageHandle__fget_object(self, container_name, object_name, target_path): self.client.fget_object(container_name, object_name, target_path) def _StorageHandle__remove_file(self, folder: str, filename: str, container_name: str = None) -> None: """Removes a file from the store. Args: folder: Folder containing file. filename: Name of file (without folder) to remove. container_name: container containing file. """ if container_name is None: container_name = self.default_container_name path = os.path.join(folder, filename) if self.client.bucket_exists(container_name): self.client.remove_object(container_name, path)