From dca206cda53e5dced9e70edb7e3588f8b47ad8a6 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Fri, 25 Feb 2022 12:58:55 +0100 Subject: [PATCH] Pull request #11: storage refactoring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Merge in RR/pyinfra from s3_refactoring to master Squashed commit of the following: commit d9224b879d47c6f1574d78b7239a209f800972dc Author: Matthias Bisping Date: Fri Feb 25 12:52:57 2022 +0100 fixed brokem reference commit 4189ebb0481c85b5984adb8c6008bbc9bae6d71b Author: Matthias Bisping Date: Fri Feb 25 12:51:11 2022 +0100 renaming commit 945f2397c5301409be19229e11bb50a733f6e2ff Author: Matthias Bisping Date: Fri Feb 25 12:48:13 2022 +0100 renaming commit 3d86500b4b703a56cb32d998c1c12e8fd06e3b2c Author: Matthias Bisping Date: Fri Feb 25 12:45:32 2022 +0100 renaming commit 4b26acca978ad49bb0b6e785addcb601d90214db Author: Matthias Bisping Date: Fri Feb 25 12:39:08 2022 +0100 renaming commit 81be5e55b6d70ccb9f8b07b5860f89da7231122f Author: Matthias Bisping Date: Fri Feb 25 11:44:46 2022 +0100 renaming commit 23b35b1f91def3a6185315772c1249826637e627 Author: Matthias Bisping Date: Fri Feb 25 11:31:15 2022 +0100 added teardown step of clearing test bucke to storage tests; added tests for nested paths commit 6e26c1a6962bfe867827f80a38c091f63fa395f0 Author: Julius Unverfehrt Date: Fri Feb 25 10:56:32 2022 +0100 black & removed käsebrot commit 519c88b39933fa699e6eff5d29bfea7ed1ca8719 Author: Julius Unverfehrt Date: Fri Feb 25 10:55:18 2022 +0100 data loading/publishing adjusted to new storage logic commit c530b5929a6f892663009eef4b92f04b4f7f533c Author: Julius Unverfehrt Date: Thu Feb 24 17:24:47 2022 +0100 removing old storage logic WIP commit 880e22832cd203dc0d0471729996cbd0b5f455f1 Author: Julius Unverfehrt Date: Thu Feb 24 17:12:44 2022 +0100 manage minio script refactor commit 01149770750bdf88094376b1f731ace343f32d76 Author: Julius Unverfehrt Date: Thu Feb 24 16:32:23 2022 +0100 renaming commit 7d6dbabf30d928cb699564e2e7a668253ec6990b Author: Julius Unverfehrt Date: Thu Feb 24 16:26:10 2022 +0100 added list bucket files & refactor serve.py to take new storage logic commit b20d6fd26b4523538fd02acc5889b932de2e3f03 Author: Matthias Bisping Date: Thu Feb 24 15:42:23 2022 +0100 added compose file referecen to pytest.ini; added coveragerc commit ff1c7e319bd229bc5dd905ddbc04c211b78f88cd Author: Matthias Bisping Date: Thu Feb 24 15:41:13 2022 +0100 added pytest.ini commit d32828ad917a20cf16785a31008d90b4f2ed4442 Author: Julius Unverfehrt Date: Thu Feb 24 15:37:30 2022 +0100 black commit 36931850c3d720b76d39a9f503abeaa326132531 Author: Julius Unverfehrt Date: Thu Feb 24 15:35:55 2022 +0100 get storage test added commit 0eb7aaf795217bb34f8f73b5e1e90576b55e9374 Author: Matthias Bisping Date: Thu Feb 24 15:08:29 2022 +0100 added fixture magic for testing differen S3 backends commit f3f1c42e574365702be632a416b07f025746e989 Author: Matthias Bisping Date: Thu Feb 24 14:47:03 2022 +0100 defined test sections in config for enpoints of storages commit 16e34052da2cc8c61473d21c1929023445e619d6 Author: Matthias Bisping Date: Thu Feb 24 14:11:53 2022 +0100 refactoring; implemented S3 handle ... and 18 more commits --- .coveragerc | 54 +++++ config.yaml | 29 ++- pyinfra/config.py | 3 + pyinfra/core.py | 4 +- pyinfra/exceptions.py | 8 + pyinfra/storage/adapters/__init__.py | 0 pyinfra/storage/adapters/adapter.py | 34 ++++ pyinfra/storage/adapters/azure.py | 79 +++++++ pyinfra/storage/adapters/s3.py | 55 +++++ pyinfra/storage/azure_blob_storage.py | 121 ----------- pyinfra/storage/clients/__init__.py | 0 pyinfra/storage/clients/azure.py | 9 + pyinfra/storage/clients/s3.py | 27 +++ pyinfra/storage/minio.py | 119 ----------- pyinfra/storage/storage.py | 192 ++---------------- pyinfra/storage/storages.py | 14 ++ pyinfra/test/__init__.py | 0 pyinfra/test/storage/__init__.py | 0 pyinfra/test/storage/adapter_mock.py | 30 +++ pyinfra/test/storage/client_mock.py | 27 +++ pyinfra/test/unit_tests/__init__.py | 0 pyinfra/test/unit_tests/azure_adapter_test.py | 10 + pyinfra/test/unit_tests/conftest.py | 6 + .../test/unit_tests/storage_adapter_test.py | 1 + pyinfra/test/unit_tests/storage_test.py | 82 ++++++++ pyinfra/utils/file.py | 20 +- pytest.ini | 4 + scripts/manage_minio.py | 36 ++-- scripts/mock_client.py | 7 +- src/serve.py | 13 +- 30 files changed, 523 insertions(+), 461 deletions(-) create mode 100644 .coveragerc create mode 100644 pyinfra/storage/adapters/__init__.py create mode 100644 pyinfra/storage/adapters/adapter.py create mode 100644 pyinfra/storage/adapters/azure.py create mode 100644 pyinfra/storage/adapters/s3.py delete mode 100644 pyinfra/storage/azure_blob_storage.py create mode 100644 pyinfra/storage/clients/__init__.py create mode 100644 pyinfra/storage/clients/azure.py create mode 100644 pyinfra/storage/clients/s3.py delete mode 100644 pyinfra/storage/minio.py create mode 100644 pyinfra/storage/storages.py create mode 100644 pyinfra/test/__init__.py create mode 100644 pyinfra/test/storage/__init__.py create mode 100644 pyinfra/test/storage/adapter_mock.py create mode 100644 pyinfra/test/storage/client_mock.py create mode 100644 pyinfra/test/unit_tests/__init__.py create mode 100644 pyinfra/test/unit_tests/azure_adapter_test.py create mode 100644 pyinfra/test/unit_tests/conftest.py create mode 100644 pyinfra/test/unit_tests/storage_adapter_test.py create mode 100644 pyinfra/test/unit_tests/storage_test.py create mode 100644 pytest.ini diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..41d11e8 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,54 @@ +# .coveragerc to control coverage.py +[run] +branch = True +omit = + */site-packages/* + */distutils/* + */test/* + */__init__.py + */setup.py + */venv/* + */env/* + */build_venv/* + */build_env/* +source = + pyinfra + src +relative_files = True +data_file = .coverage + +[report] +# Regexes for lines to exclude from consideration +exclude_lines = + # Have to re-enable the standard pragma + pragma: no cover + + # Don't complain about missing debug-only code: + def __repr__ + if self\.debug + + # Don't complain if tests don't hit defensive assertion code: + raise AssertionError + raise NotImplementedError + + # Don't complain if non-runnable code isn't run: + if 0: + if __name__ == .__main__.: +omit = + */site-packages/* + */distutils/* + */test/* + */__init__.py + */setup.py + */venv/* + */env/* + */build_venv/* + */build_env/* + +ignore_errors = True + +[html] +directory = reports + +[xml] +output = reports/coverage.xml diff --git a/config.yaml b/config.yaml index 5cbe777..acad3be 100755 --- a/config.yaml +++ b/config.yaml @@ -16,10 +16,10 @@ rabbitmq: enabled: $RETRY|False # Toggles retry behaviour max_attempts: $MAX_ATTEMPTS|3 # Number of times a message may fail before being published to dead letter queue -minio: - endpoint: $STORAGE_ENDPOINT|"127.0.0.1:9000" # MinIO endpoint - user: $STORAGE_KEY|root # MinIO user name - password: $STORAGE_SECRET|password # MinIO user password +s3: + endpoint: $STORAGE_ENDPOINT|"http://127.0.0.1:9000" # MinIO endpoint + access_key: $STORAGE_KEY|root # MinIO user name + secret_key: $STORAGE_SECRET|password # MinIO user password bucket: $STORAGE_BUCKET_NAME|redaction # MinIO bucket azure_blob_storage: @@ -31,8 +31,29 @@ service: name: $SERVICE_NAME|pyinfra-service-v1 # Name of the service in the kubernetes cluster storage_backend: $STORAGE_BACKEND|s3 # The storage to pull files to be processed from analysis_endpoint: $ANALYSIS_ENDPOINT|"http://127.0.0.1:5000" + bucket_name: $STORAGE_BUCKET_NAME|"pyinfra-test-bucket" # TODO Maybe refactor! + target_file_extension: $TARGET_FILE_EXTENSION|"pdf" # Defines type of file pulled from storage probing_webserver: host: $PROBING_WEBSERVER_HOST|"0.0.0.0" # Probe webserver address port: $PROBING_WEBSERVER_PORT|8080 # Probe webserver port mode: $PROBING_WEBSERVER_MODE|production # webserver mode: {development, production} + +test: + minio: + s3: + endpoint: "http://127.0.0.1:9000" + access_key: root + secret_key: password + + aws: + s3: + endpoint: https://s3.amazonaws.com + access_key: AKIA4QVP6D4LCDAGYGN2 + secret_key: 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 + + azure: + azure_blob_storage: + connection_string: "DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net" + + bucket: "pyinfra-test-bucket" diff --git a/pyinfra/config.py b/pyinfra/config.py index 2e5f071..c613a49 100644 --- a/pyinfra/config.py +++ b/pyinfra/config.py @@ -24,6 +24,9 @@ class DotIndexable: def __repr__(self): return self.x.__repr__() + def __getitem__(self, item): + return self.__getattr__(item) + class Config: def __init__(self, config_path): diff --git a/pyinfra/core.py b/pyinfra/core.py index 63a83ae..fe85a9d 100644 --- a/pyinfra/core.py +++ b/pyinfra/core.py @@ -9,7 +9,7 @@ from pyinfra.exceptions import DataLoadingFailure, AnalysisFailure, ProcessingFa from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name -def make_storage_data_loader(storage): +def make_storage_data_loader(storage, bucket_name): def get_object_name(payload: dict) -> str: dossier_id, file_id = itemgetter("dossierId", "fileId")(payload) object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id) @@ -18,7 +18,7 @@ def make_storage_data_loader(storage): def download(payload): object_name = get_object_name(payload) logging.debug(f"Downloading {object_name}...") - data = storage.get_object(object_name) + data = storage.get_object(bucket_name, object_name) logging.debug(f"Downloaded {object_name}.") return data diff --git a/pyinfra/exceptions.py b/pyinfra/exceptions.py index b97154f..8c66c79 100644 --- a/pyinfra/exceptions.py +++ b/pyinfra/exceptions.py @@ -12,3 +12,11 @@ class ProcessingFailure(Exception): class UnknownStorageBackend(ValueError): pass + + +class InvalidEndpoint(ValueError): + pass + + +class UnknownClient(ValueError): + pass diff --git a/pyinfra/storage/adapters/__init__.py b/pyinfra/storage/adapters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/storage/adapters/adapter.py b/pyinfra/storage/adapters/adapter.py new file mode 100644 index 0000000..4c27026 --- /dev/null +++ b/pyinfra/storage/adapters/adapter.py @@ -0,0 +1,34 @@ +from abc import ABC, abstractmethod + + +class StorageAdapter(ABC): + def __init__(self, client): + self.__client = client + + @abstractmethod + def make_bucket(self, bucket_name): + pass + + @abstractmethod + def has_bucket(self, bucket_name): + pass + + @abstractmethod + def put_object(self, bucket_name, object_name, data): + pass + + @abstractmethod + def get_object(self, bucket_name, object_name): + pass + + @abstractmethod + def get_all_objects(self, bucket_name): + pass + + @abstractmethod + def clear_bucket(self, bucket_name): + pass + + @abstractmethod + def get_all_object_names(self, bucket_name): + pass diff --git a/pyinfra/storage/adapters/azure.py b/pyinfra/storage/adapters/azure.py new file mode 100644 index 0000000..532da38 --- /dev/null +++ b/pyinfra/storage/adapters/azure.py @@ -0,0 +1,79 @@ +from itertools import repeat +import logging +from functools import wraps +from operator import attrgetter + +from azure.storage.blob import ContainerClient, BlobServiceClient +from retry import retry + +from pyinfra.storage.adapters.adapter import StorageAdapter + +logger = logging.getLogger(__name__) +logging.getLogger("azure").setLevel(logging.WARNING) +logging.getLogger("urllib3").setLevel(logging.WARNING) + + +def _retry(exceptions=Exception): + def inner(func): + @retry(exceptions=exceptions, delay=5, jitter=(0, 3), max_delay=60) + @wraps(func) + def inner(*args, **kwargs): + return func(*args, **kwargs) + + return inner + + return inner + + +class AzureStorageAdapter(StorageAdapter): + def __init__(self, client): + super().__init__(client=client) + self.__client: BlobServiceClient = self._StorageAdapter__client + + def has_bucket(self, bucket_name): + container_client = self.__client.get_container_client(bucket_name) + return container_client.exists() + + def make_bucket(self, bucket_name): + container_client = self.__client.get_container_client(bucket_name) + container_client if container_client.exists() else self.__client.create_container(bucket_name) + + def __provide_container_client(self, bucket_name) -> ContainerClient: + self.make_bucket(bucket_name) + container_client = self.__client.get_container_client(bucket_name) + return container_client + + def put_object(self, bucket_name, object_name, data): + logger.debug(f"Uploading '{object_name}'...") + container_client = self.__provide_container_client(bucket_name) + blob_client = container_client.get_blob_client(object_name) + blob_client.upload_blob(data, overwrite=True) + + def get_object(self, bucket_name, object_name): + logger.debug(f"Downloading '{object_name}'...") + container_client = self.__provide_container_client(bucket_name) + blob_client = container_client.get_blob_client(object_name) + blob_data = blob_client.download_blob() + return blob_data.readall() + + def get_all_objects(self, bucket_name): + + container_client = self.__provide_container_client(bucket_name) + blobs = container_client.list_blobs() + for blob in blobs: + logger.debug(f"Downloading '{blob.name}'...") + blob_client = container_client.get_blob_client(blob) + blob_data = blob_client.download_blob() + data = blob_data.readall() + yield data + + def clear_bucket(self, bucket_name): + logger.debug(f"Clearing Azure container '{bucket_name}'...") + container_client = self.__client.get_container_client(bucket_name) + blobs = container_client.list_blobs() + container_client.delete_blobs(*blobs) + + def get_all_object_names(self, bucket_name): + container_client = self.__provide_container_client(bucket_name) + blobs = container_client.list_blobs() + return zip(repeat(bucket_name), map(attrgetter("name"), blobs)) diff --git a/pyinfra/storage/adapters/s3.py b/pyinfra/storage/adapters/s3.py new file mode 100644 index 0000000..04ecbb4 --- /dev/null +++ b/pyinfra/storage/adapters/s3.py @@ -0,0 +1,55 @@ +import io +from itertools import repeat +import logging +from operator import attrgetter + +from minio import Minio + +from pyinfra.storage.adapters.adapter import StorageAdapter + +logger = logging.getLogger(__name__) + + +class S3StorageAdapter(StorageAdapter): + def __init__(self, client): + super().__init__(client=client) + self.__client: Minio = self._StorageAdapter__client + + def make_bucket(self, bucket_name): + if not self.has_bucket(bucket_name): + self.__client.make_bucket(bucket_name) + + def has_bucket(self, bucket_name): + return self.__client.bucket_exists(bucket_name) + + def put_object(self, bucket_name, object_name, data): + logger.debug(f"Uploading '{object_name}'...") + data = io.BytesIO(data) + self.__client.put_object(bucket_name, object_name, data, length=data.getbuffer().nbytes) + + def get_object(self, bucket_name, object_name): + logger.debug(f"Downloading '{object_name}'...") + response = None + + try: + response = self.__client.get_object(bucket_name, object_name) + return response.data + finally: + if response: + response.close() + response.release_conn() + + def get_all_objects(self, bucket_name): + for obj in self.__client.list_objects(bucket_name, recursive=True): + logger.debug(f"Downloading '{obj.object_name}'...") + yield self.get_object(bucket_name, obj.object_name) + + def clear_bucket(self, bucket_name): + logger.debug(f"Clearing S3 bucket '{bucket_name}'...") + objects = self.__client.list_objects(bucket_name, recursive=True) + for obj in objects: + self.__client.remove_object(bucket_name, obj.object_name) + + def get_all_object_names(self, bucket_name): + objs = self.__client.list_objects(bucket_name, recursive=True) + return zip(repeat(bucket_name), map(attrgetter("object_name"), objs)) diff --git a/pyinfra/storage/azure_blob_storage.py b/pyinfra/storage/azure_blob_storage.py deleted file mode 100644 index 54805c6..0000000 --- a/pyinfra/storage/azure_blob_storage.py +++ /dev/null @@ -1,121 +0,0 @@ -"""Implements a wrapper around an Azure blob storage client that provides operations on the blob storage required by the service.""" -import os -from typing import Iterable - -from azure.storage.blob import BlobServiceClient, ContainerClient - -from pyinfra.config import CONFIG -from pyinfra.storage.storage import StorageHandle - - -def get_blob_service_client(connection_string=None) -> BlobServiceClient: - """Instantiates a minio.Minio client. - - Args: - connection_string: Azure blob storage connection string. - - Returns: - A minio.Minio client. - """ - connection_string = CONFIG.azure_blob_storage.connection_string if connection_string is None else connection_string - return BlobServiceClient.from_connection_string(conn_str=connection_string) - - -class AzureBlobStorageHandle(StorageHandle): - """Implements a wrapper around an Azure blob storage client that provides operations on the blob storage required by - the service. - """ - - def __init__(self): - """Initializes an AzureBlobStorageHandle""" - super().__init__() - self.client: BlobServiceClient = get_blob_service_client() - self.default_container_name = CONFIG.azure_blob_storage.container - self.backend = "azure" - - def __get_container_client(self, container_name) -> ContainerClient: - - if container_name is None: - container_name = self.default_container_name - - container_client = self._StorageHandle__provide_container(container_name) - return container_client - - def _StorageHandle__provide_container(self, container_name: str): - container_client = self.client.get_container_client(container_name) - return container_client if container_client.exists() else self.client.create_container(container_name) - - def _StorageHandle__add_file(self, path, storage_path, container_name: str = None): - - container_client = self.__get_container_client(container_name) - blob_client = container_client.get_blob_client(storage_path) - - with open(path, "rb") as f: - blob_client.upload_blob(f, overwrite=True) - - def list_files(self, container_name=None) -> Iterable[str]: - """List all files in a container. - - Args: - container_name: container to list files from. - - Returns: - Iterable of filenames. - """ - return self._list_files("name", container_name=container_name) - - def get_objects(self, container_name=None): - """List all files in a container_name. - - Args: - container_name: Container to list files from. - - Returns: - Iterable over all objects in the container. - """ - container_client = self.__get_container_client(container_name) - blobs = container_client.list_blobs() - yield from blobs - - def _StorageHandle__list_containers(self): - return self.client.list_containers() - - def _StorageHandle__purge(self) -> None: - """Deletes all files and buckets in the store.""" - for container in self.client.list_containers(): - self.client.delete_container(container.name) - - def _StorageHandle__fget_object(self, container_name: str, object_name: str, target_path): - - with open(target_path, "wb") as f: - blob_data = self.get_object(container_name, object_name) - blob_data.readinto(f) - - def get_object(self, object_name: str, container_name: str = None): - - if container_name is None: - container_name = self.default_container_name - - container_client = self.__get_container_client(container_name) - blob_client = container_client.get_blob_client(object_name) - blob_data = blob_client.download_blob() - - return blob_data - - def _StorageHandle__remove_file(self, folder: str, filename: str, container_name: str = None) -> None: - """Removes a file from the store. - - Args: - folder: Folder containing file. - filename: Name of file (without folder) to remove. - container_name: container containing file. - """ - if container_name is None: - container_name = self.default_container_name - - path = os.path.join(folder, filename) - - container_client = self.__get_container_client(container_name) - blob_client = container_client.get_blob_client(path) - if blob_client.exists(): - container_client.delete_blob(blob_client) diff --git a/pyinfra/storage/clients/__init__.py b/pyinfra/storage/clients/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/storage/clients/azure.py b/pyinfra/storage/clients/azure.py new file mode 100644 index 0000000..526465a --- /dev/null +++ b/pyinfra/storage/clients/azure.py @@ -0,0 +1,9 @@ +from azure.storage.blob import BlobServiceClient + +from pyinfra.config import CONFIG + + +def get_azure_client(config=None) -> BlobServiceClient: + if not config: + config = CONFIG + return BlobServiceClient.from_connection_string(conn_str=config.azure_blob_storage.connection_string) diff --git a/pyinfra/storage/clients/s3.py b/pyinfra/storage/clients/s3.py new file mode 100644 index 0000000..1d66265 --- /dev/null +++ b/pyinfra/storage/clients/s3.py @@ -0,0 +1,27 @@ +import re + +from minio import Minio + +from pyinfra.config import CONFIG +from pyinfra.exceptions import InvalidEndpoint + + +def parse_endpoint(endpoint): + endpoint_pattern = r"(?Phttps?)://(?P
(?:(?:(?:\d{1,3}\.){3}\d{1,3})|(?:\w|\.)+)(?:\:\d+)?)" + + match = re.match(endpoint_pattern, endpoint) + + if not match: + raise InvalidEndpoint(f"Endpoint {endpoint} is invalid; expected {endpoint_pattern}") + + return {"secure": match.group("protocol") == "https", "endpoint": match.group("address")} + + +def get_s3_client(config=None) -> Minio: + + if not config: + config = CONFIG + + endpoint = config.s3.endpoint + + return Minio(**parse_endpoint(endpoint), access_key=config.s3.access_key, secret_key=config.s3.secret_key) diff --git a/pyinfra/storage/minio.py b/pyinfra/storage/minio.py deleted file mode 100644 index 5f4f4b4..0000000 --- a/pyinfra/storage/minio.py +++ /dev/null @@ -1,119 +0,0 @@ -"""Implements a wrapper around a MinIO client that provides operations on the MinIO store required by the service.""" -import os -from typing import Iterable - -from minio import Minio - -from pyinfra.config import CONFIG -from pyinfra.storage.storage import StorageHandle - - -def get_minio_client(access_key=None, secret_key=None) -> Minio: - """Instantiates a minio.Minio client. - - Args: - access_key: Access key for MinIO client (username). - secret_key: Secret key for MinIO client (password). - - Returns: - A minio.Minio client. - """ - access_key = CONFIG.minio.user if access_key is None else access_key - secret_key = CONFIG.minio.password if secret_key is None else secret_key - - # TODO: secure=True/False? - return Minio(CONFIG.minio.endpoint, access_key=access_key, secret_key=secret_key, secure=False) - - -class MinioHandle(StorageHandle): - """Wrapper around a MinIO client that provides operations on the MinIO store required by the service.""" - - def __init__(self): - """Initializes a MinioHandle""" - super().__init__() - self.client: Minio = get_minio_client() - self.default_container_name = CONFIG.minio.bucket - self.backend = "s3" - - def _StorageHandle__provide_container(self, container_name): - if not self.client.bucket_exists(container_name): - self.client.make_bucket(container_name) - - def _StorageHandle__add_file(self, path, storage_path, container_name=None): - - if container_name is None: - container_name = self.default_container_name - - self._StorageHandle__provide_container(container_name) - - with open(path, "rb") as f: - stat = os.stat(path) - self.client.put_object(container_name, storage_path, f, stat.st_size) - - def list_files(self, container_name=None) -> Iterable[str]: - """List all files in a container. - - Args: - container_name: container to list files from. - - Returns: - Iterable of filenames. - """ - return self._list_files("object_name", container_name=container_name) - - def get_objects(self, container_name=None): - """Gets all objects in a container. - - Args: - container_name: container to get objects from. - - Returns: - Iterable over all objects in the container. - """ - if container_name is None: - container_name = self.default_container_name - yield from self.client.list_objects(container_name, recursive=True) - - def _StorageHandle__list_containers(self): - return self.client.list_buckets() - - def _StorageHandle__purge(self) -> None: - """Deletes all files and containers in the store.""" - for container, obj in self.get_all_objects(): - self.client.remove_object(container.name, obj.object_name) - - for container in self.client.list_buckets(): - self.client.remove_bucket(container.name) - - def _StorageHandle__fget_object(self, container_name, object_name, target_path): - self.client.fget_object(container_name, object_name, target_path) - - def get_object(self, object_name, container_name=None): - if container_name is None: - container_name = self.default_container_name - - response = None - - try: - response = self.client.get_object(container_name, object_name) - return response.data - finally: - if response: - response.close() - response.release_conn() - - def _StorageHandle__remove_file(self, folder: str, filename: str, container_name: str = None) -> None: - """Removes a file from the store. - - Args: - folder: Folder containing file. - filename: Name of file (without folder) to remove. - container_name: container containing file. - """ - if container_name is None: - container_name = self.default_container_name - - path = os.path.join(folder, filename) - - if self.client.bucket_exists(container_name): - self.client.remove_object(container_name, path) diff --git a/pyinfra/storage/storage.py b/pyinfra/storage/storage.py index a1b8d2d..7a86202 100644 --- a/pyinfra/storage/storage.py +++ b/pyinfra/storage/storage.py @@ -1,183 +1,27 @@ -import abc -import gzip -import logging -import os -from itertools import repeat -from operator import attrgetter -from typing import Iterable - -from pyinfra.utils.file import path_to_compressed_storage_pdf_object_name, provide_directory -from pyinfra.utils.retry import NoAttemptsLeft, max_attempts +from pyinfra.storage.adapters.adapter import StorageAdapter -class StorageHandle: - """Storage API base""" +class Storage: + def __init__(self, adapter: StorageAdapter): + self.__adapter = adapter - def __init__(self): - self.default_container_name = None + def make_bucket(self, bucket_name): + self.__adapter.make_bucket(bucket_name) - @abc.abstractmethod - def __provide_container(self, container_name): - pass + def bucket_exists(self, bucket_name): + return self.__adapter.has_bucket(bucket_name) - @abc.abstractmethod - def __add_file(self, path, filename, container_name=None): - pass + def put_object(self, bucket_name, object_name, data): + self.__adapter.put_object(bucket_name, object_name, data) - @max_attempts(n_attempts=10, max_timeout=60) - def add_file(self, path: str, folder: str = None, container_name: str = None) -> None: - """Adds a file to the store. + def get_object(self, bucket_name, object_name): + return self.__adapter.get_object(bucket_name, object_name) - Args: - path: Path to file to add to store. - folder: Folder to hold file. - container_name: container to hold file. - """ - storage_path = self.__storage_path(path, folder=folder) - self.__add_file(path, storage_path, container_name) + def get_all(self, bucket_name): + return self.__adapter.get_all_objects(bucket_name) - @max_attempts() - def _list_files(self, object_name_attr="object_name", container_name=None) -> Iterable[str]: - """List all files in a container. + def clear_bucket(self, bucket_name): + return self.__adapter.clear_bucket(bucket_name) - Args: - container_name: container to list files from. - - Returns: - Iterable of filenames. - """ - if container_name is None: - container_name = self.default_container_name - return map(attrgetter(object_name_attr), self.get_objects(container_name)) - - @abc.abstractmethod - def list_files(self, container_name=None) -> Iterable[str]: - pass - - @abc.abstractmethod - def get_objects(self, container_name=None): - pass - - @abc.abstractmethod - def __list_containers(self): - pass - - @max_attempts() - def get_all_objects(self) -> Iterable: - """Gets all objects in the store - - Returns: - Iterable over all objects in the store. - """ - for container in self.__list_containers(): - yield from zip(repeat(container), self.get_objects(container.name)) - - @abc.abstractmethod - def __purge(self) -> None: - pass - - @max_attempts() - def purge(self) -> None: - self.__purge() - - def list_files_by_type(self, container_name=None, extension=".pdf.gz"): - return filter(lambda p: p.endswith(extension), self.list_files(container_name)) - - @abc.abstractmethod - def __fget_object(self, *args, **kwargs): - pass - - @abc.abstractmethod - def get_object(self, *args, **kwargs): - pass - - @staticmethod - def __storage_path(path, folder: str = None): - def path_to_filename(path): - return os.path.basename(path) - - storage_path = path_to_filename(path) - if folder is not None: - storage_path = os.path.join(folder, storage_path) - - return storage_path - - @max_attempts() - def list_folders_and_files(self, container_name: str = None) -> Iterable[str]: - """Lists pairs of folder name (dossier-IDs) and file name (file-IDs) of items in a container. - - Args: - container_name: container to list items for. - - Returns: - Iterable of pairs folder name (dossier-ID) and file names (file-ID) - """ - return map(lambda p: p.split("/"), self.list_files_by_type(container_name)) - - @abc.abstractmethod - def __remove_file(self, folder: str, filename: str, container_name: str = None) -> None: - pass - - @max_attempts() - def remove_file(self, folder: str, filename: str, container_name: str = None) -> None: - self.__remove_file(folder, filename, container_name) - - def add_file_compressed(self, path, folder: str = None, container_name: str = None) -> None: - """Adds a file as a .gz archive to the store. - - Args: - path: Path to file to add to store. - folder: Folder to hold file. - container_name: container to hold file. - """ - - def compress(path_in: str, path_out: str): - with open(path_in, "rb") as f_in, gzip.open(path_out, "wb") as f_out: - f_out.writelines(f_in) - - path_gz = path_to_compressed_storage_pdf_object_name(path) - compress(path, path_gz) - - self.add_file(path_gz, folder, container_name) - os.unlink(path_gz) - - @max_attempts() - def download_file(self, object_names: str, target_root_dir: str, container_name: str = None) -> str: - """Downloads a file from the store. - - Args: - object_names: Complete object name (folder and file). - target_root_dir: Root directory to download file into (including its folder). - container_name: container to load file from. - - Returns: - str: Path to downloaded file. - """ - - @max_attempts(5, exceptions=(FileNotFoundError,)) - def download(object_name: str) -> str: - - path, basename = os.path.split(object_name) - target_dir = os.path.join(target_root_dir, path) - provide_directory(target_dir) - target_path = os.path.join(target_dir, basename) - - logging.log(msg=f"Downloading {object_name}...", level=logging.DEBUG) - try: - self.__fget_object(container_name, object_name, target_path) - logging.log(msg=f"Downloaded {object_name}.", level=logging.DEBUG) - except Exception as err: - logging.log(msg=f"Downloading {object_name} failed.", level=logging.ERROR) - raise err - return target_path - - if container_name is None: - container_name = self.default_container_name - - try: - target_path = download(object_names) - except NoAttemptsLeft as err: - logging.log(msg=f"{err}", level=logging.ERROR) - raise err - - return target_path + def get_all_object_names(self, bucket_name): + return self.__adapter.get_all_object_names(bucket_name) diff --git a/pyinfra/storage/storages.py b/pyinfra/storage/storages.py new file mode 100644 index 0000000..8d194f4 --- /dev/null +++ b/pyinfra/storage/storages.py @@ -0,0 +1,14 @@ +from distutils.command.config import config +from pyinfra.storage.adapters.azure import AzureStorageAdapter +from pyinfra.storage.adapters.s3 import S3StorageAdapter +from pyinfra.storage.clients.azure import get_azure_client +from pyinfra.storage.clients.s3 import get_s3_client +from pyinfra.storage.storage import Storage + + +def get_azure_storage(config=None): + return Storage(AzureStorageAdapter(get_azure_client(config))) + + +def get_s3_storage(config=None): + return Storage(S3StorageAdapter(get_s3_client(config))) diff --git a/pyinfra/test/__init__.py b/pyinfra/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/test/storage/__init__.py b/pyinfra/test/storage/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/test/storage/adapter_mock.py b/pyinfra/test/storage/adapter_mock.py new file mode 100644 index 0000000..98f3d81 --- /dev/null +++ b/pyinfra/test/storage/adapter_mock.py @@ -0,0 +1,30 @@ +from pyinfra.storage.adapters.adapter import StorageAdapter +from pyinfra.test.storage.client_mock import StorageClientMock + + +class StorageAdapterMock(StorageAdapter): + def __init__(self, client: StorageClientMock): + assert isinstance(client, StorageClientMock) + super().__init__(client=client) + self.__client = self._StorageAdapter__client + + def make_bucket(self, bucket_name): + self.__client.make_bucket(bucket_name) + + def has_bucket(self, bucket_name): + return self.__client.has_bucket(bucket_name) + + def put_object(self, bucket_name, object_name, data): + return self.__client.put_object(bucket_name, object_name, data) + + def get_object(self, bucket_name, object_name): + return self.__client.get_object(bucket_name, object_name) + + def get_all_objects(self, bucket_name): + return self.__client.get_all_objects(bucket_name) + + def clear_bucket(self, bucket_name): + return self.__client.clear_bucket(bucket_name) + + def get_all_object_names(self, bucket_name): + return self.__client.get_all_object_names(bucket_name) diff --git a/pyinfra/test/storage/client_mock.py b/pyinfra/test/storage/client_mock.py new file mode 100644 index 0000000..c81b02b --- /dev/null +++ b/pyinfra/test/storage/client_mock.py @@ -0,0 +1,27 @@ +from itertools import repeat + + +class StorageClientMock: + def __init__(self): + self.__data = {} + + def make_bucket(self, bucket_name): + self.__data[bucket_name] = {} + + def has_bucket(self, bucket_name): + return bucket_name in self.__data + + def put_object(self, bucket_name, object_name, data): + self.__data[bucket_name][object_name] = data + + def get_object(self, bucket_name, object_name): + return self.__data[bucket_name][object_name] + + def get_all_objects(self, bucket_name): + return self.__data[bucket_name].values() + + def clear_bucket(self, bucket_name): + self.__data[bucket_name] = {} + + def get_all_object_names(self, bucket_name): + return zip(repeat(bucket_name), self.__data[bucket_name]) diff --git a/pyinfra/test/unit_tests/__init__.py b/pyinfra/test/unit_tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/test/unit_tests/azure_adapter_test.py b/pyinfra/test/unit_tests/azure_adapter_test.py new file mode 100644 index 0000000..d758668 --- /dev/null +++ b/pyinfra/test/unit_tests/azure_adapter_test.py @@ -0,0 +1,10 @@ +import pytest + +from pyinfra.storage.adapters.azure import AzureStorageAdapter +from pyinfra.test.storage.client_mock import StorageClientMock + + +@pytest.fixture +def adapter(): + adapter = AzureStorageAdapter(StorageClientMock()) + return adapter diff --git a/pyinfra/test/unit_tests/conftest.py b/pyinfra/test/unit_tests/conftest.py new file mode 100644 index 0000000..0bd9b30 --- /dev/null +++ b/pyinfra/test/unit_tests/conftest.py @@ -0,0 +1,6 @@ +import pytest + + +@pytest.fixture +def bucket_name(): + return "pyinfra-test-bucket" diff --git a/pyinfra/test/unit_tests/storage_adapter_test.py b/pyinfra/test/unit_tests/storage_adapter_test.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/pyinfra/test/unit_tests/storage_adapter_test.py @@ -0,0 +1 @@ + diff --git a/pyinfra/test/unit_tests/storage_test.py b/pyinfra/test/unit_tests/storage_test.py new file mode 100644 index 0000000..f0fa9ff --- /dev/null +++ b/pyinfra/test/unit_tests/storage_test.py @@ -0,0 +1,82 @@ +import logging + +import pytest + +from pyinfra.config import CONFIG +from pyinfra.exceptions import UnknownClient +from pyinfra.storage.adapters.azure import AzureStorageAdapter +from pyinfra.storage.adapters.s3 import S3StorageAdapter +from pyinfra.storage.clients.azure import get_azure_client +from pyinfra.storage.clients.s3 import get_s3_client +from pyinfra.storage.storage import Storage +from pyinfra.storage.storages import get_azure_storage, get_s3_storage +from pyinfra.test.storage.adapter_mock import StorageAdapterMock +from pyinfra.test.storage.client_mock import StorageClientMock + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + + +@pytest.mark.parametrize("client_name", ["mock", "azure", "s3"]) +class TestStorage: + def test_clearing_bucket_yields_empty_bucket(self, storage, bucket_name): + storage.clear_bucket(bucket_name) + data_received = storage.get_all(bucket_name) + assert not {*data_received} + + def test_getting_object_put_in_bucket_is_object(self, storage, bucket_name): + storage.put_object(bucket_name, "file", b"content") + data_received = storage.get_object(bucket_name, "file") + assert b"content" == data_received + + def test_getting_nested_object_put_in_bucket_is_nested_object(self, storage, bucket_name): + storage.put_object(bucket_name, "folder/file", b"content") + data_received = storage.get_object(bucket_name, "folder/file") + assert b"content" == data_received + + def test_getting_objects_put_in_bucket_are_objects(self, storage, bucket_name): + storage.put_object(bucket_name, "file1", b"content 1") + storage.put_object(bucket_name, "folder/file2", b"content 2") + data_received = storage.get_all(bucket_name) + assert {b"content 1", b"content 2"} == {*data_received} + + def test_make_bucket_produces_bucket(self, storage, bucket_name): + storage.make_bucket(bucket_name) + assert storage.bucket_exists(bucket_name) + + def test_listing_bucket_files_yields_all_files_in_bucket(self, storage, bucket_name): + storage.put_object(bucket_name, "file1", b"content 1") + storage.put_object(bucket_name, "file2", b"content 2") + full_names_received = storage.get_all_object_names(bucket_name) + assert {(bucket_name, "file1"), (bucket_name, "file2")} == {*full_names_received} + + +def get_adapter(client_name, s3_backend): + if client_name == "mock": + return StorageAdapterMock(StorageClientMock()) + if client_name == "azure": + return AzureStorageAdapter(get_azure_client(CONFIG.test.azure)) + if client_name == "s3": + return S3StorageAdapter(get_s3_client(CONFIG.test[s3_backend])) + else: + raise UnknownClient(client_name) + + +@pytest.fixture(params=["minio", "aws"]) +def storage(client_name, bucket_name, request): + logger.debug("Setup for storage") + storage = Storage(get_adapter(client_name, request.param)) + storage.make_bucket(bucket_name) + storage.clear_bucket(bucket_name) + yield storage + logger.debug("Teardown for storage") + storage.clear_bucket(bucket_name) + + +def test_get_azure_storage_yields_storage(): + assert isinstance(get_azure_storage(), Storage) + + +def test_get_s3_storage_yields_storage(): + assert isinstance(get_s3_storage(), Storage) diff --git a/pyinfra/utils/file.py b/pyinfra/utils/file.py index c3b865a..4068df4 100644 --- a/pyinfra/utils/file.py +++ b/pyinfra/utils/file.py @@ -2,18 +2,12 @@ import os - -def provide_directory(path): - if os.path.isfile(path): - provide_directory(os.path.dirname(path)) - if not os.path.isdir(path): - try: - os.makedirs(path) - except FileExistsError: - pass +from pyinfra.config import CONFIG -def produce_compressed_storage_pdf_object_name(path_no_ext, ext="pdf"): +def produce_compressed_storage_pdf_object_name(path_no_ext, ext=None): + if not ext: + ext = CONFIG.service.target_file_extension return f"{path_no_ext}.ORIGIN.{ext}.gz" @@ -21,9 +15,3 @@ def dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, fil path_no_ext = os.path.join(dossier_id, file_id) pdf_object_name = produce_compressed_storage_pdf_object_name(path_no_ext) return pdf_object_name - - -def path_to_compressed_storage_pdf_object_name(path): - path_no_ext, ext = os.path.splitext(path) - path_gz = produce_compressed_storage_pdf_object_name(path_no_ext) - return path_gz diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..19deb86 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,4 @@ +[pytest] +log_cli = 1 +log_cli_level = DEBUG + diff --git a/scripts/manage_minio.py b/scripts/manage_minio.py index 6433404..570d63c 100644 --- a/scripts/manage_minio.py +++ b/scripts/manage_minio.py @@ -1,9 +1,13 @@ import argparse +import gzip import os +from pathlib import Path from tqdm import tqdm -from pyinfra.storage.minio import MinioHandle +from pyinfra.config import CONFIG +from pyinfra.storage.storages import get_s3_storage +from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name def parse_args(): @@ -17,35 +21,39 @@ def parse_args(): add_group.add_argument("--file", "-f") add_group.add_argument("--directory", "-d") - parser_remove = subparsers.add_parser("remove", help="Remove a file from the MinIO store") - parser_remove.add_argument("dossier_id") - parser_remove.add_argument("file_path") - subparsers.add_parser("purge", help="Delete all files and buckets in the MinIO store") args = parser.parse_args() return args +def add_file_compressed(storage, bucket_name, dossier_id, path) -> None: + + path_gz = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, Path(path).stem) + + with open(path, "rb") as f: + data = gzip.compress(f.read()) + storage.put_object(bucket_name, path_gz, data) + + if __name__ == "__main__": - client = MinioHandle() + storage = get_s3_storage() + bucket_name = CONFIG.test.bucket + if not storage.has_bucket(bucket_name): + storage.make_bucket(bucket_name) args = parse_args() if args.command == "add": if args.file: - client.add_file_compressed(args.file, folder=args.dossier_id) + add_file_compressed(storage, bucket_name, args.dossier_id, args.file) elif args.directory: for fname in tqdm([*os.listdir(args.directory)], desc="Adding files"): - path = os.path.join(args.directory, fname) - client.add_file_compressed(path, folder=args.dossier_id) - - elif args.command == "remove": - fname = os.path.basename(args.file_path) - client.remove_file(folder=args.dossier_id, filename=fname) + path = Path(args.directory) / fname + add_file_compressed(storage, bucket_name, args.dossier_id, path) elif args.command == "purge": - client.purge() + storage.clear_bucket(bucket_name) diff --git a/scripts/mock_client.py b/scripts/mock_client.py index df9addc..943ccc6 100644 --- a/scripts/mock_client.py +++ b/scripts/mock_client.py @@ -1,14 +1,15 @@ import json from pyinfra.config import CONFIG -from pyinfra.storage.minio import MinioHandle from pyinfra.rabbitmq import make_channel, declare_queue, make_connection +from pyinfra.storage.storages import get_s3_storage def build_message_bodies(): - minio_client = MinioHandle() - for dossier_id, pdf_name in minio_client.list_folders_and_files(): + storage = get_s3_storage() + for bucket_name, pdf_name in storage.get_all_object_names(CONFIG.test.bucket): file_id = pdf_name.split(".")[0] + dossier_id, file_id = file_id.split("/") yield json.dumps({"dossierId": dossier_id, "fileId": file_id}) diff --git a/src/serve.py b/src/serve.py index a37f3b4..f7b6d00 100644 --- a/src/serve.py +++ b/src/serve.py @@ -13,20 +13,16 @@ from pyinfra.consume import consume, ConsumerError from pyinfra.core import make_payload_processor, make_storage_data_loader, make_analyzer from pyinfra.exceptions import UnknownStorageBackend from pyinfra.flask import run_probing_webserver, set_up_probing_webserver -from pyinfra.storage.azure_blob_storage import AzureBlobStorageHandle -from pyinfra.storage.minio import MinioHandle - - -# TODO: implement meaningful checks +from pyinfra.storage.storages import get_azure_storage, get_s3_storage def get_storage(): storage_backend = CONFIG.service.storage_backend if storage_backend == "s3": - storage = MinioHandle() + storage = get_s3_storage() elif storage_backend == "azure": - storage = AzureBlobStorageHandle() + storage = get_azure_storage() else: raise UnknownStorageBackend(f"Unknown storage backend '{storage_backend}'.") @@ -44,7 +40,7 @@ def republish(channel, body, n_current_attempts): def make_callback(): - load_data = make_storage_data_loader(get_storage()) + load_data = make_storage_data_loader(get_storage(), CONFIG.service.bucket_name) analyze_file = make_analyzer(CONFIG.service.analysis_endpoint) json_wrapped_body_processor = make_payload_processor(load_data, analyze_file) @@ -66,6 +62,7 @@ def make_callback(): def main(): + # TODO: implement meaningful checks webserver = Process(target=run_probing_webserver, args=(set_up_probing_webserver(),)) logging.info("Starting webserver...") webserver.start()