Pull request #12: Config refactoring
Merge in RR/pyinfra from config_refactoring to master
Squashed commit of the following:
commit 22636e5e5df1148004598a268348673537454e36
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Fri Feb 25 14:57:57 2022 +0100
applied black
commit 5d244c3f67fb9d6bd7cb78cbe92fc8035b6cf9b7
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Fri Feb 25 14:56:22 2022 +0100
restructured config and made correspondig changes in referencing code
commit 4dc64124e16e0f490e6785324b88751ee32dc49c
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Fri Feb 25 13:12:41 2022 +0100
test config restructuring
commit b0bd9aebdf58f3f4f43a1d4cdaf451bd6036d135
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Fri Feb 25 13:10:35 2022 +0100
factored out test section of config
This commit is contained in:
parent
dca206cda5
commit
b82ae951a4
67
config.yaml
67
config.yaml
@ -1,3 +1,11 @@
|
||||
service:
|
||||
logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for service logger
|
||||
|
||||
probing_webserver:
|
||||
host: $PROBING_WEBSERVER_HOST|"0.0.0.0" # Probe webserver address
|
||||
port: $PROBING_WEBSERVER_PORT|8080 # Probe webserver port
|
||||
mode: $PROBING_WEBSERVER_MODE|production # webserver mode: {development, production}
|
||||
|
||||
rabbitmq:
|
||||
host: $RABBITMQ_HOST|localhost # RabbitMQ host address
|
||||
port: $RABBITMQ_PORT|5672 # RabbitMQ host port
|
||||
@ -6,54 +14,31 @@ rabbitmq:
|
||||
heartbeat: $RABBITMQ_HEARTBEAT|7200 # Controls AMQP heartbeat timeout in seconds
|
||||
|
||||
queues:
|
||||
input: image_request_queue # Requests to service
|
||||
output: image_response_queue # Responses by service
|
||||
dead_letter: image_dead_letter_queue # Messages that failed to process
|
||||
input: $REQUEST_QUEUE|request_queue # Requests to service
|
||||
output: $RESPONSE_QUEUE|response_queue # Responses by service
|
||||
dead_letter: $DEAD_LETTER_QUEUE|dead_letter_queue # Messages that failed to process
|
||||
|
||||
prefetch_count: 1
|
||||
|
||||
retry: # Controls retry behaviour for messages the processing of which failed
|
||||
enabled: $RETRY|False # Toggles retry behaviour
|
||||
max_attempts: $MAX_ATTEMPTS|3 # Number of times a message may fail before being published to dead letter queue
|
||||
callback:
|
||||
|
||||
s3:
|
||||
endpoint: $STORAGE_ENDPOINT|"http://127.0.0.1:9000" # MinIO endpoint
|
||||
access_key: $STORAGE_KEY|root # MinIO user name
|
||||
secret_key: $STORAGE_SECRET|password # MinIO user password
|
||||
bucket: $STORAGE_BUCKET_NAME|redaction # MinIO bucket
|
||||
retry: # Controls retry behaviour for messages the processing of which failed
|
||||
# TODO: check if this actually works
|
||||
enabled: $RETRY|False # Toggles retry behaviour
|
||||
max_attempts: $MAX_ATTEMPTS|3 # Number of times a message may fail before being published to dead letter queue
|
||||
|
||||
azure_blob_storage:
|
||||
connection_string: $STORAGE_AZURECONNECTIONSTRING|"DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net"
|
||||
container: $STORAGE_AZURECONTAINERNAME|"pyinfra-test-storage"
|
||||
analysis_endpoint: $ANALYSIS_ENDPOINT|"http://127.0.0.1:5000"
|
||||
|
||||
service:
|
||||
logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for service logger
|
||||
name: $SERVICE_NAME|pyinfra-service-v1 # Name of the service in the kubernetes cluster
|
||||
storage_backend: $STORAGE_BACKEND|s3 # The storage to pull files to be processed from
|
||||
analysis_endpoint: $ANALYSIS_ENDPOINT|"http://127.0.0.1:5000"
|
||||
bucket_name: $STORAGE_BUCKET_NAME|"pyinfra-test-bucket" # TODO Maybe refactor!
|
||||
target_file_extension: $TARGET_FILE_EXTENSION|"pdf" # Defines type of file pulled from storage
|
||||
storage:
|
||||
|
||||
probing_webserver:
|
||||
host: $PROBING_WEBSERVER_HOST|"0.0.0.0" # Probe webserver address
|
||||
port: $PROBING_WEBSERVER_PORT|8080 # Probe webserver port
|
||||
mode: $PROBING_WEBSERVER_MODE|production # webserver mode: {development, production}
|
||||
backend: $STORAGE_BACKEND|s3 # The type of storage to use {s3, azure}
|
||||
bucket: $STORAGE_BUCKET|"pyinfra-test-bucket" # The bucket / container to pull files specified in queue requests from
|
||||
target_file_extension: $TARGET_FILE_EXTENSION|pdf # Defines type of file to pull from storage
|
||||
|
||||
test:
|
||||
minio:
|
||||
s3:
|
||||
endpoint: "http://127.0.0.1:9000"
|
||||
access_key: root
|
||||
secret_key: password
|
||||
|
||||
aws:
|
||||
s3:
|
||||
endpoint: https://s3.amazonaws.com
|
||||
access_key: AKIA4QVP6D4LCDAGYGN2
|
||||
secret_key: 8N6H1TUHTsbvW2qMAm7zZlJ63hMqjcXAsdN7TYED
|
||||
s3:
|
||||
endpoint: $STORAGE_ENDPOINT|"http://127.0.0.1:9000"
|
||||
access_key: $STORAGE_KEY|root
|
||||
secret_key: $STORAGE_SECRET|password
|
||||
|
||||
azure:
|
||||
azure_blob_storage:
|
||||
connection_string: "DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net"
|
||||
|
||||
bucket: "pyinfra-test-bucket"
|
||||
connection_string: $STORAGE_AZURECONNECTIONSTRING|"DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net"
|
||||
|
||||
@ -4,11 +4,11 @@ from pathlib import Path
|
||||
|
||||
|
||||
MODULE_DIR = Path(__file__).resolve().parents[0]
|
||||
|
||||
PACKAGE_ROOT_DIR = MODULE_DIR.parents[0]
|
||||
|
||||
DOCKER_COMPOSE_FILE = PACKAGE_ROOT_DIR.joinpath("docker-compose.yaml")
|
||||
TEST_DIR = MODULE_DIR / "test"
|
||||
|
||||
CONFIG_FILE = PACKAGE_ROOT_DIR.joinpath("config.yaml")
|
||||
LOG_FILE = Path("/tmp/log.log")
|
||||
CONFIG_FILE = PACKAGE_ROOT_DIR / "config.yaml"
|
||||
|
||||
DATA_DIR = PACKAGE_ROOT_DIR.joinpath("data")
|
||||
TEST_CONFIG_FILE = TEST_DIR / "config.yaml"
|
||||
|
||||
@ -3,7 +3,9 @@ from azure.storage.blob import BlobServiceClient
|
||||
from pyinfra.config import CONFIG
|
||||
|
||||
|
||||
def get_azure_client(config=None) -> BlobServiceClient:
|
||||
if not config:
|
||||
config = CONFIG
|
||||
return BlobServiceClient.from_connection_string(conn_str=config.azure_blob_storage.connection_string)
|
||||
def get_azure_client(connection_string=None) -> BlobServiceClient:
|
||||
|
||||
if not connection_string:
|
||||
connection_string = CONFIG.storage.azure.connection_string
|
||||
|
||||
return BlobServiceClient.from_connection_string(conn_str=connection_string)
|
||||
|
||||
@ -17,11 +17,17 @@ def parse_endpoint(endpoint):
|
||||
return {"secure": match.group("protocol") == "https", "endpoint": match.group("address")}
|
||||
|
||||
|
||||
def get_s3_client(config=None) -> Minio:
|
||||
def get_s3_client(params=None) -> Minio:
|
||||
"""
|
||||
Args:
|
||||
params: dict like
|
||||
{
|
||||
"endpoint": <storage_endpoint>
|
||||
"access_key": <storage_key>
|
||||
"secret_key": <storage_secret>
|
||||
}
|
||||
"""
|
||||
if not params:
|
||||
params = CONFIG.storage.s3
|
||||
|
||||
if not config:
|
||||
config = CONFIG
|
||||
|
||||
endpoint = config.s3.endpoint
|
||||
|
||||
return Minio(**parse_endpoint(endpoint), access_key=config.s3.access_key, secret_key=config.s3.secret_key)
|
||||
return Minio(**parse_endpoint(params.endpoint), access_key=params.access_key, secret_key=params.secret_key)
|
||||
|
||||
@ -8,7 +8,7 @@ class Storage:
|
||||
def make_bucket(self, bucket_name):
|
||||
self.__adapter.make_bucket(bucket_name)
|
||||
|
||||
def bucket_exists(self, bucket_name):
|
||||
def has_bucket(self, bucket_name):
|
||||
return self.__adapter.has_bucket(bucket_name)
|
||||
|
||||
def put_object(self, bucket_name, object_name, data):
|
||||
@ -17,7 +17,7 @@ class Storage:
|
||||
def get_object(self, bucket_name, object_name):
|
||||
return self.__adapter.get_object(bucket_name, object_name)
|
||||
|
||||
def get_all(self, bucket_name):
|
||||
def get_all_objects(self, bucket_name):
|
||||
return self.__adapter.get_all_objects(bucket_name)
|
||||
|
||||
def clear_bucket(self, bucket_name):
|
||||
|
||||
@ -1,4 +1,3 @@
|
||||
from distutils.command.config import config
|
||||
from pyinfra.storage.adapters.azure import AzureStorageAdapter
|
||||
from pyinfra.storage.adapters.s3 import S3StorageAdapter
|
||||
from pyinfra.storage.clients.azure import get_azure_client
|
||||
|
||||
5
pyinfra/test/config.py
Normal file
5
pyinfra/test/config.py
Normal file
@ -0,0 +1,5 @@
|
||||
from pyinfra.config import Config
|
||||
from pyinfra.locations import TEST_CONFIG_FILE
|
||||
|
||||
|
||||
CONFIG = Config(TEST_CONFIG_FILE)
|
||||
15
pyinfra/test/config.yaml
Normal file
15
pyinfra/test/config.yaml
Normal file
@ -0,0 +1,15 @@
|
||||
storage:
|
||||
minio:
|
||||
endpoint: "http://127.0.0.1:9000"
|
||||
access_key: root
|
||||
secret_key: password
|
||||
|
||||
aws:
|
||||
endpoint: https://s3.amazonaws.com
|
||||
access_key: AKIA4QVP6D4LCDAGYGN2
|
||||
secret_key: 8N6H1TUHTsbvW2qMAm7zZlJ63hMqjcXAsdN7TYED
|
||||
|
||||
azure:
|
||||
connection_string: "DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net"
|
||||
|
||||
bucket: "pyinfra-test-bucket"
|
||||
@ -2,7 +2,6 @@ import logging
|
||||
|
||||
import pytest
|
||||
|
||||
from pyinfra.config import CONFIG
|
||||
from pyinfra.exceptions import UnknownClient
|
||||
from pyinfra.storage.adapters.azure import AzureStorageAdapter
|
||||
from pyinfra.storage.adapters.s3 import S3StorageAdapter
|
||||
@ -10,10 +9,10 @@ from pyinfra.storage.clients.azure import get_azure_client
|
||||
from pyinfra.storage.clients.s3 import get_s3_client
|
||||
from pyinfra.storage.storage import Storage
|
||||
from pyinfra.storage.storages import get_azure_storage, get_s3_storage
|
||||
from pyinfra.test.config import CONFIG
|
||||
from pyinfra.test.storage.adapter_mock import StorageAdapterMock
|
||||
from pyinfra.test.storage.client_mock import StorageClientMock
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
@ -22,7 +21,7 @@ logger.setLevel(logging.DEBUG)
|
||||
class TestStorage:
|
||||
def test_clearing_bucket_yields_empty_bucket(self, storage, bucket_name):
|
||||
storage.clear_bucket(bucket_name)
|
||||
data_received = storage.get_all(bucket_name)
|
||||
data_received = storage.get_all_objects(bucket_name)
|
||||
assert not {*data_received}
|
||||
|
||||
def test_getting_object_put_in_bucket_is_object(self, storage, bucket_name):
|
||||
@ -38,12 +37,12 @@ class TestStorage:
|
||||
def test_getting_objects_put_in_bucket_are_objects(self, storage, bucket_name):
|
||||
storage.put_object(bucket_name, "file1", b"content 1")
|
||||
storage.put_object(bucket_name, "folder/file2", b"content 2")
|
||||
data_received = storage.get_all(bucket_name)
|
||||
data_received = storage.get_all_objects(bucket_name)
|
||||
assert {b"content 1", b"content 2"} == {*data_received}
|
||||
|
||||
def test_make_bucket_produces_bucket(self, storage, bucket_name):
|
||||
storage.make_bucket(bucket_name)
|
||||
assert storage.bucket_exists(bucket_name)
|
||||
assert storage.has_bucket(bucket_name)
|
||||
|
||||
def test_listing_bucket_files_yields_all_files_in_bucket(self, storage, bucket_name):
|
||||
storage.put_object(bucket_name, "file1", b"content 1")
|
||||
@ -56,9 +55,9 @@ def get_adapter(client_name, s3_backend):
|
||||
if client_name == "mock":
|
||||
return StorageAdapterMock(StorageClientMock())
|
||||
if client_name == "azure":
|
||||
return AzureStorageAdapter(get_azure_client(CONFIG.test.azure))
|
||||
return AzureStorageAdapter(get_azure_client(CONFIG.storage.azure.connection_string))
|
||||
if client_name == "s3":
|
||||
return S3StorageAdapter(get_s3_client(CONFIG.test[s3_backend]))
|
||||
return S3StorageAdapter(get_s3_client(CONFIG.storage[s3_backend]))
|
||||
else:
|
||||
raise UnknownClient(client_name)
|
||||
|
||||
|
||||
@ -7,7 +7,7 @@ from pyinfra.config import CONFIG
|
||||
|
||||
def produce_compressed_storage_pdf_object_name(path_no_ext, ext=None):
|
||||
if not ext:
|
||||
ext = CONFIG.service.target_file_extension
|
||||
ext = CONFIG.storage.target_file_extension
|
||||
return f"{path_no_ext}.ORIGIN.{ext}.gz"
|
||||
|
||||
|
||||
|
||||
@ -39,7 +39,8 @@ def add_file_compressed(storage, bucket_name, dossier_id, path) -> None:
|
||||
if __name__ == "__main__":
|
||||
|
||||
storage = get_s3_storage()
|
||||
bucket_name = CONFIG.test.bucket
|
||||
bucket_name = CONFIG.storage.bucket
|
||||
|
||||
if not storage.has_bucket(bucket_name):
|
||||
storage.make_bucket(bucket_name)
|
||||
|
||||
|
||||
@ -7,10 +7,10 @@ from pyinfra.storage.storages import get_s3_storage
|
||||
|
||||
def build_message_bodies():
|
||||
storage = get_s3_storage()
|
||||
for bucket_name, pdf_name in storage.get_all_object_names(CONFIG.test.bucket):
|
||||
for bucket_name, pdf_name in storage.get_all_object_names(CONFIG.storage.bucket):
|
||||
file_id = pdf_name.split(".")[0]
|
||||
dossier_id, file_id = file_id.split("/")
|
||||
yield json.dumps({"dossierId": dossier_id, "fileId": file_id})
|
||||
yield json.dumps({"dossierId": dossier_id, "fileId": file_id}).encode()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
10
src/serve.py
10
src/serve.py
@ -18,7 +18,7 @@ from pyinfra.storage.storages import get_azure_storage, get_s3_storage
|
||||
|
||||
def get_storage():
|
||||
|
||||
storage_backend = CONFIG.service.storage_backend
|
||||
storage_backend = CONFIG.storage.backend
|
||||
if storage_backend == "s3":
|
||||
storage = get_s3_storage()
|
||||
elif storage_backend == "azure":
|
||||
@ -40,13 +40,13 @@ def republish(channel, body, n_current_attempts):
|
||||
|
||||
def make_callback():
|
||||
|
||||
load_data = make_storage_data_loader(get_storage(), CONFIG.service.bucket_name)
|
||||
analyze_file = make_analyzer(CONFIG.service.analysis_endpoint)
|
||||
load_data = make_storage_data_loader(get_storage(), CONFIG.storage.bucket)
|
||||
analyze_file = make_analyzer(CONFIG.rabbitmq.callback.analysis_endpoint)
|
||||
|
||||
json_wrapped_body_processor = make_payload_processor(load_data, analyze_file)
|
||||
|
||||
if CONFIG.rabbitmq.retry.enabled:
|
||||
retry_callback = make_retry_callback(republish, max_attempts=CONFIG.rabbitmq.retry.max_attempts)
|
||||
if CONFIG.rabbitmq.callback.retry.enabled:
|
||||
retry_callback = make_retry_callback(republish, max_attempts=CONFIG.rabbitmq.callback.retry.max_attempts)
|
||||
|
||||
callback = make_retry_callback_for_output_queue(
|
||||
json_wrapped_body_processor=json_wrapped_body_processor,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user