Pull request #11: storage refactoring

Merge in RR/pyinfra from s3_refactoring to master

Squashed commit of the following:

commit d9224b879d47c6f1574d78b7239a209f800972dc
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Fri Feb 25 12:52:57 2022 +0100

    fixed brokem reference

commit 4189ebb0481c85b5984adb8c6008bbc9bae6d71b
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Fri Feb 25 12:51:11 2022 +0100

    renaming

commit 945f2397c5301409be19229e11bb50a733f6e2ff
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Fri Feb 25 12:48:13 2022 +0100

    renaming

commit 3d86500b4b703a56cb32d998c1c12e8fd06e3b2c
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Fri Feb 25 12:45:32 2022 +0100

    renaming

commit 4b26acca978ad49bb0b6e785addcb601d90214db
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Fri Feb 25 12:39:08 2022 +0100

    renaming

commit 81be5e55b6d70ccb9f8b07b5860f89da7231122f
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Fri Feb 25 11:44:46 2022 +0100

    renaming

commit 23b35b1f91def3a6185315772c1249826637e627
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Fri Feb 25 11:31:15 2022 +0100

    added teardown step of clearing test bucke to storage tests; added tests for nested paths

commit 6e26c1a6962bfe867827f80a38c091f63fa395f0
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date:   Fri Feb 25 10:56:32 2022 +0100

    black & removed käsebrot

commit 519c88b39933fa699e6eff5d29bfea7ed1ca8719
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date:   Fri Feb 25 10:55:18 2022 +0100

    data loading/publishing adjusted to new storage logic

commit c530b5929a6f892663009eef4b92f04b4f7f533c
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date:   Thu Feb 24 17:24:47 2022 +0100

    removing old storage logic WIP

commit 880e22832cd203dc0d0471729996cbd0b5f455f1
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date:   Thu Feb 24 17:12:44 2022 +0100

    manage minio script refactor

commit 01149770750bdf88094376b1f731ace343f32d76
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date:   Thu Feb 24 16:32:23 2022 +0100

    renaming

commit 7d6dbabf30d928cb699564e2e7a668253ec6990b
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date:   Thu Feb 24 16:26:10 2022 +0100

    added list bucket files & refactor serve.py to take new storage logic

commit b20d6fd26b4523538fd02acc5889b932de2e3f03
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Feb 24 15:42:23 2022 +0100

    added compose file referecen to pytest.ini; added coveragerc

commit ff1c7e319bd229bc5dd905ddbc04c211b78f88cd
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Feb 24 15:41:13 2022 +0100

    added pytest.ini

commit d32828ad917a20cf16785a31008d90b4f2ed4442
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date:   Thu Feb 24 15:37:30 2022 +0100

    black

commit 36931850c3d720b76d39a9f503abeaa326132531
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date:   Thu Feb 24 15:35:55 2022 +0100

    get storage test added

commit 0eb7aaf795217bb34f8f73b5e1e90576b55e9374
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Feb 24 15:08:29 2022 +0100

    added fixture magic for testing differen S3 backends

commit f3f1c42e574365702be632a416b07f025746e989
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Feb 24 14:47:03 2022 +0100

    defined test sections in config for enpoints of storages

commit 16e34052da2cc8c61473d21c1929023445e619d6
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Feb 24 14:11:53 2022 +0100

    refactoring; implemented S3 handle

... and 18 more commits
This commit is contained in:
Matthias Bisping 2022-02-25 12:58:55 +01:00
parent dccb9ac67a
commit dca206cda5
30 changed files with 523 additions and 461 deletions

54
.coveragerc Normal file
View File

@ -0,0 +1,54 @@
# .coveragerc to control coverage.py
[run]
branch = True
omit =
*/site-packages/*
*/distutils/*
*/test/*
*/__init__.py
*/setup.py
*/venv/*
*/env/*
*/build_venv/*
*/build_env/*
source =
pyinfra
src
relative_files = True
data_file = .coverage
[report]
# Regexes for lines to exclude from consideration
exclude_lines =
# Have to re-enable the standard pragma
pragma: no cover
# Don't complain about missing debug-only code:
def __repr__
if self\.debug
# Don't complain if tests don't hit defensive assertion code:
raise AssertionError
raise NotImplementedError
# Don't complain if non-runnable code isn't run:
if 0:
if __name__ == .__main__.:
omit =
*/site-packages/*
*/distutils/*
*/test/*
*/__init__.py
*/setup.py
*/venv/*
*/env/*
*/build_venv/*
*/build_env/*
ignore_errors = True
[html]
directory = reports
[xml]
output = reports/coverage.xml

View File

@ -16,10 +16,10 @@ rabbitmq:
enabled: $RETRY|False # Toggles retry behaviour
max_attempts: $MAX_ATTEMPTS|3 # Number of times a message may fail before being published to dead letter queue
minio:
endpoint: $STORAGE_ENDPOINT|"127.0.0.1:9000" # MinIO endpoint
user: $STORAGE_KEY|root # MinIO user name
password: $STORAGE_SECRET|password # MinIO user password
s3:
endpoint: $STORAGE_ENDPOINT|"http://127.0.0.1:9000" # MinIO endpoint
access_key: $STORAGE_KEY|root # MinIO user name
secret_key: $STORAGE_SECRET|password # MinIO user password
bucket: $STORAGE_BUCKET_NAME|redaction # MinIO bucket
azure_blob_storage:
@ -31,8 +31,29 @@ service:
name: $SERVICE_NAME|pyinfra-service-v1 # Name of the service in the kubernetes cluster
storage_backend: $STORAGE_BACKEND|s3 # The storage to pull files to be processed from
analysis_endpoint: $ANALYSIS_ENDPOINT|"http://127.0.0.1:5000"
bucket_name: $STORAGE_BUCKET_NAME|"pyinfra-test-bucket" # TODO Maybe refactor!
target_file_extension: $TARGET_FILE_EXTENSION|"pdf" # Defines type of file pulled from storage
probing_webserver:
host: $PROBING_WEBSERVER_HOST|"0.0.0.0" # Probe webserver address
port: $PROBING_WEBSERVER_PORT|8080 # Probe webserver port
mode: $PROBING_WEBSERVER_MODE|production # webserver mode: {development, production}
test:
minio:
s3:
endpoint: "http://127.0.0.1:9000"
access_key: root
secret_key: password
aws:
s3:
endpoint: https://s3.amazonaws.com
access_key: AKIA4QVP6D4LCDAGYGN2
secret_key: 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
azure:
azure_blob_storage:
connection_string: "DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net"
bucket: "pyinfra-test-bucket"

View File

@ -24,6 +24,9 @@ class DotIndexable:
def __repr__(self):
return self.x.__repr__()
def __getitem__(self, item):
return self.__getattr__(item)
class Config:
def __init__(self, config_path):

View File

@ -9,7 +9,7 @@ from pyinfra.exceptions import DataLoadingFailure, AnalysisFailure, ProcessingFa
from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name
def make_storage_data_loader(storage):
def make_storage_data_loader(storage, bucket_name):
def get_object_name(payload: dict) -> str:
dossier_id, file_id = itemgetter("dossierId", "fileId")(payload)
object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id)
@ -18,7 +18,7 @@ def make_storage_data_loader(storage):
def download(payload):
object_name = get_object_name(payload)
logging.debug(f"Downloading {object_name}...")
data = storage.get_object(object_name)
data = storage.get_object(bucket_name, object_name)
logging.debug(f"Downloaded {object_name}.")
return data

View File

@ -12,3 +12,11 @@ class ProcessingFailure(Exception):
class UnknownStorageBackend(ValueError):
pass
class InvalidEndpoint(ValueError):
pass
class UnknownClient(ValueError):
pass

View File

View File

@ -0,0 +1,34 @@
from abc import ABC, abstractmethod
class StorageAdapter(ABC):
def __init__(self, client):
self.__client = client
@abstractmethod
def make_bucket(self, bucket_name):
pass
@abstractmethod
def has_bucket(self, bucket_name):
pass
@abstractmethod
def put_object(self, bucket_name, object_name, data):
pass
@abstractmethod
def get_object(self, bucket_name, object_name):
pass
@abstractmethod
def get_all_objects(self, bucket_name):
pass
@abstractmethod
def clear_bucket(self, bucket_name):
pass
@abstractmethod
def get_all_object_names(self, bucket_name):
pass

View File

@ -0,0 +1,79 @@
from itertools import repeat
import logging
from functools import wraps
from operator import attrgetter
from azure.storage.blob import ContainerClient, BlobServiceClient
from retry import retry
from pyinfra.storage.adapters.adapter import StorageAdapter
logger = logging.getLogger(__name__)
logging.getLogger("azure").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
def _retry(exceptions=Exception):
def inner(func):
@retry(exceptions=exceptions, delay=5, jitter=(0, 3), max_delay=60)
@wraps(func)
def inner(*args, **kwargs):
return func(*args, **kwargs)
return inner
return inner
class AzureStorageAdapter(StorageAdapter):
def __init__(self, client):
super().__init__(client=client)
self.__client: BlobServiceClient = self._StorageAdapter__client
def has_bucket(self, bucket_name):
container_client = self.__client.get_container_client(bucket_name)
return container_client.exists()
def make_bucket(self, bucket_name):
container_client = self.__client.get_container_client(bucket_name)
container_client if container_client.exists() else self.__client.create_container(bucket_name)
def __provide_container_client(self, bucket_name) -> ContainerClient:
self.make_bucket(bucket_name)
container_client = self.__client.get_container_client(bucket_name)
return container_client
def put_object(self, bucket_name, object_name, data):
logger.debug(f"Uploading '{object_name}'...")
container_client = self.__provide_container_client(bucket_name)
blob_client = container_client.get_blob_client(object_name)
blob_client.upload_blob(data, overwrite=True)
def get_object(self, bucket_name, object_name):
logger.debug(f"Downloading '{object_name}'...")
container_client = self.__provide_container_client(bucket_name)
blob_client = container_client.get_blob_client(object_name)
blob_data = blob_client.download_blob()
return blob_data.readall()
def get_all_objects(self, bucket_name):
container_client = self.__provide_container_client(bucket_name)
blobs = container_client.list_blobs()
for blob in blobs:
logger.debug(f"Downloading '{blob.name}'...")
blob_client = container_client.get_blob_client(blob)
blob_data = blob_client.download_blob()
data = blob_data.readall()
yield data
def clear_bucket(self, bucket_name):
logger.debug(f"Clearing Azure container '{bucket_name}'...")
container_client = self.__client.get_container_client(bucket_name)
blobs = container_client.list_blobs()
container_client.delete_blobs(*blobs)
def get_all_object_names(self, bucket_name):
container_client = self.__provide_container_client(bucket_name)
blobs = container_client.list_blobs()
return zip(repeat(bucket_name), map(attrgetter("name"), blobs))

View File

@ -0,0 +1,55 @@
import io
from itertools import repeat
import logging
from operator import attrgetter
from minio import Minio
from pyinfra.storage.adapters.adapter import StorageAdapter
logger = logging.getLogger(__name__)
class S3StorageAdapter(StorageAdapter):
def __init__(self, client):
super().__init__(client=client)
self.__client: Minio = self._StorageAdapter__client
def make_bucket(self, bucket_name):
if not self.has_bucket(bucket_name):
self.__client.make_bucket(bucket_name)
def has_bucket(self, bucket_name):
return self.__client.bucket_exists(bucket_name)
def put_object(self, bucket_name, object_name, data):
logger.debug(f"Uploading '{object_name}'...")
data = io.BytesIO(data)
self.__client.put_object(bucket_name, object_name, data, length=data.getbuffer().nbytes)
def get_object(self, bucket_name, object_name):
logger.debug(f"Downloading '{object_name}'...")
response = None
try:
response = self.__client.get_object(bucket_name, object_name)
return response.data
finally:
if response:
response.close()
response.release_conn()
def get_all_objects(self, bucket_name):
for obj in self.__client.list_objects(bucket_name, recursive=True):
logger.debug(f"Downloading '{obj.object_name}'...")
yield self.get_object(bucket_name, obj.object_name)
def clear_bucket(self, bucket_name):
logger.debug(f"Clearing S3 bucket '{bucket_name}'...")
objects = self.__client.list_objects(bucket_name, recursive=True)
for obj in objects:
self.__client.remove_object(bucket_name, obj.object_name)
def get_all_object_names(self, bucket_name):
objs = self.__client.list_objects(bucket_name, recursive=True)
return zip(repeat(bucket_name), map(attrgetter("object_name"), objs))

View File

@ -1,121 +0,0 @@
"""Implements a wrapper around an Azure blob storage client that provides operations on the blob storage required by the service."""
import os
from typing import Iterable
from azure.storage.blob import BlobServiceClient, ContainerClient
from pyinfra.config import CONFIG
from pyinfra.storage.storage import StorageHandle
def get_blob_service_client(connection_string=None) -> BlobServiceClient:
"""Instantiates a minio.Minio client.
Args:
connection_string: Azure blob storage connection string.
Returns:
A minio.Minio client.
"""
connection_string = CONFIG.azure_blob_storage.connection_string if connection_string is None else connection_string
return BlobServiceClient.from_connection_string(conn_str=connection_string)
class AzureBlobStorageHandle(StorageHandle):
"""Implements a wrapper around an Azure blob storage client that provides operations on the blob storage required by
the service.
"""
def __init__(self):
"""Initializes an AzureBlobStorageHandle"""
super().__init__()
self.client: BlobServiceClient = get_blob_service_client()
self.default_container_name = CONFIG.azure_blob_storage.container
self.backend = "azure"
def __get_container_client(self, container_name) -> ContainerClient:
if container_name is None:
container_name = self.default_container_name
container_client = self._StorageHandle__provide_container(container_name)
return container_client
def _StorageHandle__provide_container(self, container_name: str):
container_client = self.client.get_container_client(container_name)
return container_client if container_client.exists() else self.client.create_container(container_name)
def _StorageHandle__add_file(self, path, storage_path, container_name: str = None):
container_client = self.__get_container_client(container_name)
blob_client = container_client.get_blob_client(storage_path)
with open(path, "rb") as f:
blob_client.upload_blob(f, overwrite=True)
def list_files(self, container_name=None) -> Iterable[str]:
"""List all files in a container.
Args:
container_name: container to list files from.
Returns:
Iterable of filenames.
"""
return self._list_files("name", container_name=container_name)
def get_objects(self, container_name=None):
"""List all files in a container_name.
Args:
container_name: Container to list files from.
Returns:
Iterable over all objects in the container.
"""
container_client = self.__get_container_client(container_name)
blobs = container_client.list_blobs()
yield from blobs
def _StorageHandle__list_containers(self):
return self.client.list_containers()
def _StorageHandle__purge(self) -> None:
"""Deletes all files and buckets in the store."""
for container in self.client.list_containers():
self.client.delete_container(container.name)
def _StorageHandle__fget_object(self, container_name: str, object_name: str, target_path):
with open(target_path, "wb") as f:
blob_data = self.get_object(container_name, object_name)
blob_data.readinto(f)
def get_object(self, object_name: str, container_name: str = None):
if container_name is None:
container_name = self.default_container_name
container_client = self.__get_container_client(container_name)
blob_client = container_client.get_blob_client(object_name)
blob_data = blob_client.download_blob()
return blob_data
def _StorageHandle__remove_file(self, folder: str, filename: str, container_name: str = None) -> None:
"""Removes a file from the store.
Args:
folder: Folder containing file.
filename: Name of file (without folder) to remove.
container_name: container containing file.
"""
if container_name is None:
container_name = self.default_container_name
path = os.path.join(folder, filename)
container_client = self.__get_container_client(container_name)
blob_client = container_client.get_blob_client(path)
if blob_client.exists():
container_client.delete_blob(blob_client)

View File

View File

@ -0,0 +1,9 @@
from azure.storage.blob import BlobServiceClient
from pyinfra.config import CONFIG
def get_azure_client(config=None) -> BlobServiceClient:
if not config:
config = CONFIG
return BlobServiceClient.from_connection_string(conn_str=config.azure_blob_storage.connection_string)

View File

@ -0,0 +1,27 @@
import re
from minio import Minio
from pyinfra.config import CONFIG
from pyinfra.exceptions import InvalidEndpoint
def parse_endpoint(endpoint):
endpoint_pattern = r"(?P<protocol>https?)://(?P<address>(?:(?:(?:\d{1,3}\.){3}\d{1,3})|(?:\w|\.)+)(?:\:\d+)?)"
match = re.match(endpoint_pattern, endpoint)
if not match:
raise InvalidEndpoint(f"Endpoint {endpoint} is invalid; expected {endpoint_pattern}")
return {"secure": match.group("protocol") == "https", "endpoint": match.group("address")}
def get_s3_client(config=None) -> Minio:
if not config:
config = CONFIG
endpoint = config.s3.endpoint
return Minio(**parse_endpoint(endpoint), access_key=config.s3.access_key, secret_key=config.s3.secret_key)

View File

@ -1,119 +0,0 @@
"""Implements a wrapper around a MinIO client that provides operations on the MinIO store required by the service."""
import os
from typing import Iterable
from minio import Minio
from pyinfra.config import CONFIG
from pyinfra.storage.storage import StorageHandle
def get_minio_client(access_key=None, secret_key=None) -> Minio:
"""Instantiates a minio.Minio client.
Args:
access_key: Access key for MinIO client (username).
secret_key: Secret key for MinIO client (password).
Returns:
A minio.Minio client.
"""
access_key = CONFIG.minio.user if access_key is None else access_key
secret_key = CONFIG.minio.password if secret_key is None else secret_key
# TODO: secure=True/False?
return Minio(CONFIG.minio.endpoint, access_key=access_key, secret_key=secret_key, secure=False)
class MinioHandle(StorageHandle):
"""Wrapper around a MinIO client that provides operations on the MinIO store required by the service."""
def __init__(self):
"""Initializes a MinioHandle"""
super().__init__()
self.client: Minio = get_minio_client()
self.default_container_name = CONFIG.minio.bucket
self.backend = "s3"
def _StorageHandle__provide_container(self, container_name):
if not self.client.bucket_exists(container_name):
self.client.make_bucket(container_name)
def _StorageHandle__add_file(self, path, storage_path, container_name=None):
if container_name is None:
container_name = self.default_container_name
self._StorageHandle__provide_container(container_name)
with open(path, "rb") as f:
stat = os.stat(path)
self.client.put_object(container_name, storage_path, f, stat.st_size)
def list_files(self, container_name=None) -> Iterable[str]:
"""List all files in a container.
Args:
container_name: container to list files from.
Returns:
Iterable of filenames.
"""
return self._list_files("object_name", container_name=container_name)
def get_objects(self, container_name=None):
"""Gets all objects in a container.
Args:
container_name: container to get objects from.
Returns:
Iterable over all objects in the container.
"""
if container_name is None:
container_name = self.default_container_name
yield from self.client.list_objects(container_name, recursive=True)
def _StorageHandle__list_containers(self):
return self.client.list_buckets()
def _StorageHandle__purge(self) -> None:
"""Deletes all files and containers in the store."""
for container, obj in self.get_all_objects():
self.client.remove_object(container.name, obj.object_name)
for container in self.client.list_buckets():
self.client.remove_bucket(container.name)
def _StorageHandle__fget_object(self, container_name, object_name, target_path):
self.client.fget_object(container_name, object_name, target_path)
def get_object(self, object_name, container_name=None):
if container_name is None:
container_name = self.default_container_name
response = None
try:
response = self.client.get_object(container_name, object_name)
return response.data
finally:
if response:
response.close()
response.release_conn()
def _StorageHandle__remove_file(self, folder: str, filename: str, container_name: str = None) -> None:
"""Removes a file from the store.
Args:
folder: Folder containing file.
filename: Name of file (without folder) to remove.
container_name: container containing file.
"""
if container_name is None:
container_name = self.default_container_name
path = os.path.join(folder, filename)
if self.client.bucket_exists(container_name):
self.client.remove_object(container_name, path)

View File

@ -1,183 +1,27 @@
import abc
import gzip
import logging
import os
from itertools import repeat
from operator import attrgetter
from typing import Iterable
from pyinfra.utils.file import path_to_compressed_storage_pdf_object_name, provide_directory
from pyinfra.utils.retry import NoAttemptsLeft, max_attempts
from pyinfra.storage.adapters.adapter import StorageAdapter
class StorageHandle:
"""Storage API base"""
class Storage:
def __init__(self, adapter: StorageAdapter):
self.__adapter = adapter
def __init__(self):
self.default_container_name = None
def make_bucket(self, bucket_name):
self.__adapter.make_bucket(bucket_name)
@abc.abstractmethod
def __provide_container(self, container_name):
pass
def bucket_exists(self, bucket_name):
return self.__adapter.has_bucket(bucket_name)
@abc.abstractmethod
def __add_file(self, path, filename, container_name=None):
pass
def put_object(self, bucket_name, object_name, data):
self.__adapter.put_object(bucket_name, object_name, data)
@max_attempts(n_attempts=10, max_timeout=60)
def add_file(self, path: str, folder: str = None, container_name: str = None) -> None:
"""Adds a file to the store.
def get_object(self, bucket_name, object_name):
return self.__adapter.get_object(bucket_name, object_name)
Args:
path: Path to file to add to store.
folder: Folder to hold file.
container_name: container to hold file.
"""
storage_path = self.__storage_path(path, folder=folder)
self.__add_file(path, storage_path, container_name)
def get_all(self, bucket_name):
return self.__adapter.get_all_objects(bucket_name)
@max_attempts()
def _list_files(self, object_name_attr="object_name", container_name=None) -> Iterable[str]:
"""List all files in a container.
def clear_bucket(self, bucket_name):
return self.__adapter.clear_bucket(bucket_name)
Args:
container_name: container to list files from.
Returns:
Iterable of filenames.
"""
if container_name is None:
container_name = self.default_container_name
return map(attrgetter(object_name_attr), self.get_objects(container_name))
@abc.abstractmethod
def list_files(self, container_name=None) -> Iterable[str]:
pass
@abc.abstractmethod
def get_objects(self, container_name=None):
pass
@abc.abstractmethod
def __list_containers(self):
pass
@max_attempts()
def get_all_objects(self) -> Iterable:
"""Gets all objects in the store
Returns:
Iterable over all objects in the store.
"""
for container in self.__list_containers():
yield from zip(repeat(container), self.get_objects(container.name))
@abc.abstractmethod
def __purge(self) -> None:
pass
@max_attempts()
def purge(self) -> None:
self.__purge()
def list_files_by_type(self, container_name=None, extension=".pdf.gz"):
return filter(lambda p: p.endswith(extension), self.list_files(container_name))
@abc.abstractmethod
def __fget_object(self, *args, **kwargs):
pass
@abc.abstractmethod
def get_object(self, *args, **kwargs):
pass
@staticmethod
def __storage_path(path, folder: str = None):
def path_to_filename(path):
return os.path.basename(path)
storage_path = path_to_filename(path)
if folder is not None:
storage_path = os.path.join(folder, storage_path)
return storage_path
@max_attempts()
def list_folders_and_files(self, container_name: str = None) -> Iterable[str]:
"""Lists pairs of folder name (dossier-IDs) and file name (file-IDs) of items in a container.
Args:
container_name: container to list items for.
Returns:
Iterable of pairs folder name (dossier-ID) and file names (file-ID)
"""
return map(lambda p: p.split("/"), self.list_files_by_type(container_name))
@abc.abstractmethod
def __remove_file(self, folder: str, filename: str, container_name: str = None) -> None:
pass
@max_attempts()
def remove_file(self, folder: str, filename: str, container_name: str = None) -> None:
self.__remove_file(folder, filename, container_name)
def add_file_compressed(self, path, folder: str = None, container_name: str = None) -> None:
"""Adds a file as a .gz archive to the store.
Args:
path: Path to file to add to store.
folder: Folder to hold file.
container_name: container to hold file.
"""
def compress(path_in: str, path_out: str):
with open(path_in, "rb") as f_in, gzip.open(path_out, "wb") as f_out:
f_out.writelines(f_in)
path_gz = path_to_compressed_storage_pdf_object_name(path)
compress(path, path_gz)
self.add_file(path_gz, folder, container_name)
os.unlink(path_gz)
@max_attempts()
def download_file(self, object_names: str, target_root_dir: str, container_name: str = None) -> str:
"""Downloads a file from the store.
Args:
object_names: Complete object name (folder and file).
target_root_dir: Root directory to download file into (including its folder).
container_name: container to load file from.
Returns:
str: Path to downloaded file.
"""
@max_attempts(5, exceptions=(FileNotFoundError,))
def download(object_name: str) -> str:
path, basename = os.path.split(object_name)
target_dir = os.path.join(target_root_dir, path)
provide_directory(target_dir)
target_path = os.path.join(target_dir, basename)
logging.log(msg=f"Downloading {object_name}...", level=logging.DEBUG)
try:
self.__fget_object(container_name, object_name, target_path)
logging.log(msg=f"Downloaded {object_name}.", level=logging.DEBUG)
except Exception as err:
logging.log(msg=f"Downloading {object_name} failed.", level=logging.ERROR)
raise err
return target_path
if container_name is None:
container_name = self.default_container_name
try:
target_path = download(object_names)
except NoAttemptsLeft as err:
logging.log(msg=f"{err}", level=logging.ERROR)
raise err
return target_path
def get_all_object_names(self, bucket_name):
return self.__adapter.get_all_object_names(bucket_name)

View File

@ -0,0 +1,14 @@
from distutils.command.config import config
from pyinfra.storage.adapters.azure import AzureStorageAdapter
from pyinfra.storage.adapters.s3 import S3StorageAdapter
from pyinfra.storage.clients.azure import get_azure_client
from pyinfra.storage.clients.s3 import get_s3_client
from pyinfra.storage.storage import Storage
def get_azure_storage(config=None):
return Storage(AzureStorageAdapter(get_azure_client(config)))
def get_s3_storage(config=None):
return Storage(S3StorageAdapter(get_s3_client(config)))

0
pyinfra/test/__init__.py Normal file
View File

View File

View File

@ -0,0 +1,30 @@
from pyinfra.storage.adapters.adapter import StorageAdapter
from pyinfra.test.storage.client_mock import StorageClientMock
class StorageAdapterMock(StorageAdapter):
def __init__(self, client: StorageClientMock):
assert isinstance(client, StorageClientMock)
super().__init__(client=client)
self.__client = self._StorageAdapter__client
def make_bucket(self, bucket_name):
self.__client.make_bucket(bucket_name)
def has_bucket(self, bucket_name):
return self.__client.has_bucket(bucket_name)
def put_object(self, bucket_name, object_name, data):
return self.__client.put_object(bucket_name, object_name, data)
def get_object(self, bucket_name, object_name):
return self.__client.get_object(bucket_name, object_name)
def get_all_objects(self, bucket_name):
return self.__client.get_all_objects(bucket_name)
def clear_bucket(self, bucket_name):
return self.__client.clear_bucket(bucket_name)
def get_all_object_names(self, bucket_name):
return self.__client.get_all_object_names(bucket_name)

View File

@ -0,0 +1,27 @@
from itertools import repeat
class StorageClientMock:
def __init__(self):
self.__data = {}
def make_bucket(self, bucket_name):
self.__data[bucket_name] = {}
def has_bucket(self, bucket_name):
return bucket_name in self.__data
def put_object(self, bucket_name, object_name, data):
self.__data[bucket_name][object_name] = data
def get_object(self, bucket_name, object_name):
return self.__data[bucket_name][object_name]
def get_all_objects(self, bucket_name):
return self.__data[bucket_name].values()
def clear_bucket(self, bucket_name):
self.__data[bucket_name] = {}
def get_all_object_names(self, bucket_name):
return zip(repeat(bucket_name), self.__data[bucket_name])

View File

View File

@ -0,0 +1,10 @@
import pytest
from pyinfra.storage.adapters.azure import AzureStorageAdapter
from pyinfra.test.storage.client_mock import StorageClientMock
@pytest.fixture
def adapter():
adapter = AzureStorageAdapter(StorageClientMock())
return adapter

View File

@ -0,0 +1,6 @@
import pytest
@pytest.fixture
def bucket_name():
return "pyinfra-test-bucket"

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,82 @@
import logging
import pytest
from pyinfra.config import CONFIG
from pyinfra.exceptions import UnknownClient
from pyinfra.storage.adapters.azure import AzureStorageAdapter
from pyinfra.storage.adapters.s3 import S3StorageAdapter
from pyinfra.storage.clients.azure import get_azure_client
from pyinfra.storage.clients.s3 import get_s3_client
from pyinfra.storage.storage import Storage
from pyinfra.storage.storages import get_azure_storage, get_s3_storage
from pyinfra.test.storage.adapter_mock import StorageAdapterMock
from pyinfra.test.storage.client_mock import StorageClientMock
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@pytest.mark.parametrize("client_name", ["mock", "azure", "s3"])
class TestStorage:
def test_clearing_bucket_yields_empty_bucket(self, storage, bucket_name):
storage.clear_bucket(bucket_name)
data_received = storage.get_all(bucket_name)
assert not {*data_received}
def test_getting_object_put_in_bucket_is_object(self, storage, bucket_name):
storage.put_object(bucket_name, "file", b"content")
data_received = storage.get_object(bucket_name, "file")
assert b"content" == data_received
def test_getting_nested_object_put_in_bucket_is_nested_object(self, storage, bucket_name):
storage.put_object(bucket_name, "folder/file", b"content")
data_received = storage.get_object(bucket_name, "folder/file")
assert b"content" == data_received
def test_getting_objects_put_in_bucket_are_objects(self, storage, bucket_name):
storage.put_object(bucket_name, "file1", b"content 1")
storage.put_object(bucket_name, "folder/file2", b"content 2")
data_received = storage.get_all(bucket_name)
assert {b"content 1", b"content 2"} == {*data_received}
def test_make_bucket_produces_bucket(self, storage, bucket_name):
storage.make_bucket(bucket_name)
assert storage.bucket_exists(bucket_name)
def test_listing_bucket_files_yields_all_files_in_bucket(self, storage, bucket_name):
storage.put_object(bucket_name, "file1", b"content 1")
storage.put_object(bucket_name, "file2", b"content 2")
full_names_received = storage.get_all_object_names(bucket_name)
assert {(bucket_name, "file1"), (bucket_name, "file2")} == {*full_names_received}
def get_adapter(client_name, s3_backend):
if client_name == "mock":
return StorageAdapterMock(StorageClientMock())
if client_name == "azure":
return AzureStorageAdapter(get_azure_client(CONFIG.test.azure))
if client_name == "s3":
return S3StorageAdapter(get_s3_client(CONFIG.test[s3_backend]))
else:
raise UnknownClient(client_name)
@pytest.fixture(params=["minio", "aws"])
def storage(client_name, bucket_name, request):
logger.debug("Setup for storage")
storage = Storage(get_adapter(client_name, request.param))
storage.make_bucket(bucket_name)
storage.clear_bucket(bucket_name)
yield storage
logger.debug("Teardown for storage")
storage.clear_bucket(bucket_name)
def test_get_azure_storage_yields_storage():
assert isinstance(get_azure_storage(), Storage)
def test_get_s3_storage_yields_storage():
assert isinstance(get_s3_storage(), Storage)

View File

@ -2,18 +2,12 @@
import os
def provide_directory(path):
if os.path.isfile(path):
provide_directory(os.path.dirname(path))
if not os.path.isdir(path):
try:
os.makedirs(path)
except FileExistsError:
pass
from pyinfra.config import CONFIG
def produce_compressed_storage_pdf_object_name(path_no_ext, ext="pdf"):
def produce_compressed_storage_pdf_object_name(path_no_ext, ext=None):
if not ext:
ext = CONFIG.service.target_file_extension
return f"{path_no_ext}.ORIGIN.{ext}.gz"
@ -21,9 +15,3 @@ def dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, fil
path_no_ext = os.path.join(dossier_id, file_id)
pdf_object_name = produce_compressed_storage_pdf_object_name(path_no_ext)
return pdf_object_name
def path_to_compressed_storage_pdf_object_name(path):
path_no_ext, ext = os.path.splitext(path)
path_gz = produce_compressed_storage_pdf_object_name(path_no_ext)
return path_gz

4
pytest.ini Normal file
View File

@ -0,0 +1,4 @@
[pytest]
log_cli = 1
log_cli_level = DEBUG

View File

@ -1,9 +1,13 @@
import argparse
import gzip
import os
from pathlib import Path
from tqdm import tqdm
from pyinfra.storage.minio import MinioHandle
from pyinfra.config import CONFIG
from pyinfra.storage.storages import get_s3_storage
from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name
def parse_args():
@ -17,35 +21,39 @@ def parse_args():
add_group.add_argument("--file", "-f")
add_group.add_argument("--directory", "-d")
parser_remove = subparsers.add_parser("remove", help="Remove a file from the MinIO store")
parser_remove.add_argument("dossier_id")
parser_remove.add_argument("file_path")
subparsers.add_parser("purge", help="Delete all files and buckets in the MinIO store")
args = parser.parse_args()
return args
def add_file_compressed(storage, bucket_name, dossier_id, path) -> None:
path_gz = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, Path(path).stem)
with open(path, "rb") as f:
data = gzip.compress(f.read())
storage.put_object(bucket_name, path_gz, data)
if __name__ == "__main__":
client = MinioHandle()
storage = get_s3_storage()
bucket_name = CONFIG.test.bucket
if not storage.has_bucket(bucket_name):
storage.make_bucket(bucket_name)
args = parse_args()
if args.command == "add":
if args.file:
client.add_file_compressed(args.file, folder=args.dossier_id)
add_file_compressed(storage, bucket_name, args.dossier_id, args.file)
elif args.directory:
for fname in tqdm([*os.listdir(args.directory)], desc="Adding files"):
path = os.path.join(args.directory, fname)
client.add_file_compressed(path, folder=args.dossier_id)
elif args.command == "remove":
fname = os.path.basename(args.file_path)
client.remove_file(folder=args.dossier_id, filename=fname)
path = Path(args.directory) / fname
add_file_compressed(storage, bucket_name, args.dossier_id, path)
elif args.command == "purge":
client.purge()
storage.clear_bucket(bucket_name)

View File

@ -1,14 +1,15 @@
import json
from pyinfra.config import CONFIG
from pyinfra.storage.minio import MinioHandle
from pyinfra.rabbitmq import make_channel, declare_queue, make_connection
from pyinfra.storage.storages import get_s3_storage
def build_message_bodies():
minio_client = MinioHandle()
for dossier_id, pdf_name in minio_client.list_folders_and_files():
storage = get_s3_storage()
for bucket_name, pdf_name in storage.get_all_object_names(CONFIG.test.bucket):
file_id = pdf_name.split(".")[0]
dossier_id, file_id = file_id.split("/")
yield json.dumps({"dossierId": dossier_id, "fileId": file_id})

View File

@ -13,20 +13,16 @@ from pyinfra.consume import consume, ConsumerError
from pyinfra.core import make_payload_processor, make_storage_data_loader, make_analyzer
from pyinfra.exceptions import UnknownStorageBackend
from pyinfra.flask import run_probing_webserver, set_up_probing_webserver
from pyinfra.storage.azure_blob_storage import AzureBlobStorageHandle
from pyinfra.storage.minio import MinioHandle
# TODO: implement meaningful checks
from pyinfra.storage.storages import get_azure_storage, get_s3_storage
def get_storage():
storage_backend = CONFIG.service.storage_backend
if storage_backend == "s3":
storage = MinioHandle()
storage = get_s3_storage()
elif storage_backend == "azure":
storage = AzureBlobStorageHandle()
storage = get_azure_storage()
else:
raise UnknownStorageBackend(f"Unknown storage backend '{storage_backend}'.")
@ -44,7 +40,7 @@ def republish(channel, body, n_current_attempts):
def make_callback():
load_data = make_storage_data_loader(get_storage())
load_data = make_storage_data_loader(get_storage(), CONFIG.service.bucket_name)
analyze_file = make_analyzer(CONFIG.service.analysis_endpoint)
json_wrapped_body_processor = make_payload_processor(load_data, analyze_file)
@ -66,6 +62,7 @@ def make_callback():
def main():
# TODO: implement meaningful checks
webserver = Process(target=run_probing_webserver, args=(set_up_probing_webserver(),))
logging.info("Starting webserver...")
webserver.start()