finnish config loading logic

This commit is contained in:
Julius Unverfehrt 2024-01-19 14:04:56 +01:00
parent 9c2f34e694
commit 87cbf89672
9 changed files with 77 additions and 60 deletions

View File

@ -1,23 +1,56 @@
import os
from pathlib import Path
from typing import Union
from dynaconf import Dynaconf
import funcy
from dynaconf import Dynaconf, ValidationError
from funcy import merge, lflatten
from kn_utils.logging import logger
def load_settings():
# TODO: Make dynamic, so that the settings.toml file can be loaded from any location
# TODO: add validation
root_path = Path(__file__).resolve().parents[1] # this is pyinfra/
repo_root_path = root_path.parents[0] # this is the root of the repo
os.environ["ROOT_PATH"] = str(root_path)
os.environ["REPO_ROOT_PATH"] = str(repo_root_path)
def load_settings(settings_path: Union[str, Path] = None):
if not settings_path:
repo_root_path = Path(__file__).resolve().parents[2]
settings_path = repo_root_path / "config/"
logger.info(f"No settings path provided, using relative settings path: {settings_path}")
settings_path = Path(settings_path)
if os.path.isdir(settings_path):
logger.info(f"Settings path is a directory, loading all .toml files in the directory: {settings_path}")
settings_files = list(settings_path.glob("*.toml"))
else:
logger.info(f"Settings path is a file, loading only the specified file: {settings_path}")
settings_files = [settings_path]
settings = Dynaconf(
load_dotenv=True,
envvar_prefix=False,
settings_files=[
repo_root_path / "config" / "settings.toml",
],
settings_files=settings_files,
)
validate_settings(settings, get_all_validators())
return settings
def get_all_validators():
import pyinfra.config.validators
return lflatten(validator for validator in pyinfra.config.validators.__dict__.values() if isinstance(validator, list))
def validate_settings(settings: Dynaconf, validators):
settings_valid = True
for validator in validators:
try:
validator.validate(settings)
except ValidationError as e:
settings_valid = False
logger.warning(e)
if not settings_valid:
raise ValidationError("Settings validation failed.")
logger.debug("Settings validated.")

View File

@ -1,5 +1,4 @@
from dynaconf import Validator, Dynaconf, ValidationError
from kn_utils.logging import logger
from dynaconf import Validator
queue_manager_validators = [
Validator("rabbitmq.host", must_exist=True),
@ -45,19 +44,3 @@ webserver_validators = [
Validator("webserver.host", must_exist=True),
Validator("webserver.port", must_exist=True),
]
def validate_settings(settings: Dynaconf, validators):
settings_valid = True
for validator in validators:
try:
validator.validate(settings)
except ValidationError as e:
settings_valid = False
logger.warning(e)
if not settings_valid:
raise ValidationError("Settings validation failed.")
logger.info("Settings validated.")

View File

@ -13,7 +13,8 @@ from kn_utils.logging import logger
from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection
from retry import retry
from pyinfra.config.validation import queue_manager_validators, validate_settings
from pyinfra.config.validators import queue_manager_validators
from pyinfra.config.loader import validate_settings
pika_logger = logging.getLogger("pika")
pika_logger.setLevel(logging.WARNING) # disables non-informative pika log clutter

View File

@ -8,7 +8,8 @@ from pyinfra.storage.storages.azure import get_azure_storage_from_settings
from pyinfra.storage.storages.s3 import get_s3_storage_from_settings
from pyinfra.storage.storages.storage import Storage
from pyinfra.utils.cipher import decrypt
from pyinfra.config.validation import storage_validators, multi_tenant_storage_validators, validate_settings
from pyinfra.config.validators import storage_validators, multi_tenant_storage_validators
from pyinfra.config.loader import validate_settings
def get_storage(settings: Dynaconf, tenant_id: str = None) -> Storage:

View File

@ -8,7 +8,8 @@ from kn_utils.logging import logger
from retry import retry
from pyinfra.storage.storages.storage import Storage
from pyinfra.config.validation import azure_storage_validators, validate_settings
from pyinfra.config.validators import azure_storage_validators
from pyinfra.config.loader import validate_settings
logging.getLogger("azure").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)

View File

@ -8,7 +8,8 @@ from minio import Minio
from retry import retry
from pyinfra.storage.storages.storage import Storage
from pyinfra.config.validation import s3_storage_validators, validate_settings
from pyinfra.config.validators import s3_storage_validators
from pyinfra.config.loader import validate_settings
from pyinfra.utils.url_parsing import validate_and_parse_s3_endpoint

View File

@ -7,7 +7,8 @@ from funcy import identity
from prometheus_client import generate_latest, CollectorRegistry, REGISTRY, Summary
from starlette.responses import Response
from pyinfra.config.validation import prometheus_validators, validate_settings
from pyinfra.config.validators import prometheus_validators
from pyinfra.config.loader import validate_settings
def add_prometheus_endpoint(app: FastAPI, registry: CollectorRegistry = REGISTRY) -> FastAPI:
@ -31,28 +32,24 @@ Decorator = TypeVar("Decorator", bound=Callable[[Callable], Callable])
def make_prometheus_processing_time_decorator_from_settings(
settings: Dynaconf, registry: CollectorRegistry = REGISTRY
settings: Dynaconf,
postfix: str = "processing_time",
registry: CollectorRegistry = REGISTRY,
) -> Decorator:
"""Make a decorator for monitoring the processing time of a function. The decorator is only applied if the
prometheus metrics are enabled in the settings.
This, and other metrics should follow the convention
{product name}_{service name}_{processing step / parameter to monitor}.
"""
validate_settings(settings, validators=prometheus_validators)
if not settings.metrics.prometheus.enabled:
return identity
return make_prometheus_processing_time_decorator(
prefix=settings.metrics.prometheus.prefix,
registry=registry,
)
def make_prometheus_processing_time_decorator(
prefix: str = "readactmanager_research_service",
registry: CollectorRegistry = REGISTRY,
) -> Decorator:
processing_time_sum = Summary(
f"{prefix}_processing_time", "Summed up average processing time per call.", registry=registry
f"{settings.metrics.prometheus.prefix}_{postfix}",
"Summed up processing time per call.",
registry=registry,
)
def decorator(process_fn: Callable) -> Callable:

View File

@ -6,7 +6,8 @@ import uvicorn
from dynaconf import Dynaconf
from fastapi import FastAPI
from pyinfra.config.validation import webserver_validators, validate_settings
from pyinfra.config.validators import webserver_validators
from pyinfra.config.loader import validate_settings
def create_webserver_thread_from_settings(app: FastAPI, settings: Dynaconf) -> threading.Thread:

View File

@ -1,30 +1,29 @@
import logging
import time
from pathlib import Path
from typing import Union
from fastapi import FastAPI
from pyinfra.config.loader import load_settings
from pyinfra.webserver.prometheus import make_prometheus_processing_time_decorator_from_settings, add_prometheus_endpoint
from pyinfra.queue.callback import make_queue_message_callback
from pyinfra.queue.manager import QueueManager
from pyinfra.webserver.prometheus import (
make_prometheus_processing_time_decorator_from_settings,
add_prometheus_endpoint,
)
from pyinfra.webserver.utils import create_webserver_thread_from_settings, add_health_check_endpoint
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.INFO)
settings = load_settings()
@make_prometheus_processing_time_decorator_from_settings(settings)
def json_processor_mock(_data: dict, _message: dict) -> dict:
def processor_mock(_data: dict, _message: dict) -> dict:
time.sleep(5)
return {"result1": "result1"}
def main():
def start_serving(process_fn, settings_path: Union[str, Path] = None):
settings = load_settings(settings_path)
app = FastAPI()
app = add_prometheus_endpoint(app)
process_fn = make_prometheus_processing_time_decorator_from_settings(settings)(process_fn)
queue_manager = QueueManager(settings)
@ -33,9 +32,9 @@ def main():
webserver_thread = create_webserver_thread_from_settings(app, settings)
webserver_thread.start()
callback = make_queue_message_callback(json_processor_mock, settings)
callback = make_queue_message_callback(process_fn, settings)
queue_manager.start_consuming(callback)
if __name__ == "__main__":
main()
start_serving(processor_mock)