diff --git a/pyinfra/config/loader.py b/pyinfra/config/loader.py index a66bbd7..a705927 100644 --- a/pyinfra/config/loader.py +++ b/pyinfra/config/loader.py @@ -1,23 +1,56 @@ import os from pathlib import Path +from typing import Union -from dynaconf import Dynaconf +import funcy +from dynaconf import Dynaconf, ValidationError +from funcy import merge, lflatten +from kn_utils.logging import logger -def load_settings(): - # TODO: Make dynamic, so that the settings.toml file can be loaded from any location - # TODO: add validation - root_path = Path(__file__).resolve().parents[1] # this is pyinfra/ - repo_root_path = root_path.parents[0] # this is the root of the repo - os.environ["ROOT_PATH"] = str(root_path) - os.environ["REPO_ROOT_PATH"] = str(repo_root_path) +def load_settings(settings_path: Union[str, Path] = None): + if not settings_path: + repo_root_path = Path(__file__).resolve().parents[2] + settings_path = repo_root_path / "config/" + logger.info(f"No settings path provided, using relative settings path: {settings_path}") + + settings_path = Path(settings_path) + + if os.path.isdir(settings_path): + logger.info(f"Settings path is a directory, loading all .toml files in the directory: {settings_path}") + settings_files = list(settings_path.glob("*.toml")) + else: + logger.info(f"Settings path is a file, loading only the specified file: {settings_path}") + settings_files = [settings_path] settings = Dynaconf( load_dotenv=True, envvar_prefix=False, - settings_files=[ - repo_root_path / "config" / "settings.toml", - ], + settings_files=settings_files, ) + validate_settings(settings, get_all_validators()) + return settings + + +def get_all_validators(): + import pyinfra.config.validators + + return lflatten(validator for validator in pyinfra.config.validators.__dict__.values() if isinstance(validator, list)) + + +def validate_settings(settings: Dynaconf, validators): + settings_valid = True + + for validator in validators: + try: + validator.validate(settings) + except ValidationError as e: + settings_valid = False + logger.warning(e) + + if not settings_valid: + raise ValidationError("Settings validation failed.") + + logger.debug("Settings validated.") diff --git a/pyinfra/config/validation.py b/pyinfra/config/validators.py similarity index 75% rename from pyinfra/config/validation.py rename to pyinfra/config/validators.py index 629891d..bdc6204 100644 --- a/pyinfra/config/validation.py +++ b/pyinfra/config/validators.py @@ -1,5 +1,4 @@ -from dynaconf import Validator, Dynaconf, ValidationError -from kn_utils.logging import logger +from dynaconf import Validator queue_manager_validators = [ Validator("rabbitmq.host", must_exist=True), @@ -45,19 +44,3 @@ webserver_validators = [ Validator("webserver.host", must_exist=True), Validator("webserver.port", must_exist=True), ] - - -def validate_settings(settings: Dynaconf, validators): - settings_valid = True - - for validator in validators: - try: - validator.validate(settings) - except ValidationError as e: - settings_valid = False - logger.warning(e) - - if not settings_valid: - raise ValidationError("Settings validation failed.") - - logger.info("Settings validated.") diff --git a/pyinfra/queue/manager.py b/pyinfra/queue/manager.py index 34d5313..ea9c1b4 100644 --- a/pyinfra/queue/manager.py +++ b/pyinfra/queue/manager.py @@ -13,7 +13,8 @@ from kn_utils.logging import logger from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection from retry import retry -from pyinfra.config.validation import queue_manager_validators, validate_settings +from pyinfra.config.validators import queue_manager_validators +from pyinfra.config.loader import validate_settings pika_logger = logging.getLogger("pika") pika_logger.setLevel(logging.WARNING) # disables non-informative pika log clutter diff --git a/pyinfra/storage/connection.py b/pyinfra/storage/connection.py index b7c5845..abe0d5f 100644 --- a/pyinfra/storage/connection.py +++ b/pyinfra/storage/connection.py @@ -8,7 +8,8 @@ from pyinfra.storage.storages.azure import get_azure_storage_from_settings from pyinfra.storage.storages.s3 import get_s3_storage_from_settings from pyinfra.storage.storages.storage import Storage from pyinfra.utils.cipher import decrypt -from pyinfra.config.validation import storage_validators, multi_tenant_storage_validators, validate_settings +from pyinfra.config.validators import storage_validators, multi_tenant_storage_validators +from pyinfra.config.loader import validate_settings def get_storage(settings: Dynaconf, tenant_id: str = None) -> Storage: diff --git a/pyinfra/storage/storages/azure.py b/pyinfra/storage/storages/azure.py index 5689a51..1207d9a 100644 --- a/pyinfra/storage/storages/azure.py +++ b/pyinfra/storage/storages/azure.py @@ -8,7 +8,8 @@ from kn_utils.logging import logger from retry import retry from pyinfra.storage.storages.storage import Storage -from pyinfra.config.validation import azure_storage_validators, validate_settings +from pyinfra.config.validators import azure_storage_validators +from pyinfra.config.loader import validate_settings logging.getLogger("azure").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) diff --git a/pyinfra/storage/storages/s3.py b/pyinfra/storage/storages/s3.py index 52f3957..0a6636e 100644 --- a/pyinfra/storage/storages/s3.py +++ b/pyinfra/storage/storages/s3.py @@ -8,7 +8,8 @@ from minio import Minio from retry import retry from pyinfra.storage.storages.storage import Storage -from pyinfra.config.validation import s3_storage_validators, validate_settings +from pyinfra.config.validators import s3_storage_validators +from pyinfra.config.loader import validate_settings from pyinfra.utils.url_parsing import validate_and_parse_s3_endpoint diff --git a/pyinfra/webserver/prometheus.py b/pyinfra/webserver/prometheus.py index f9bddd4..a274dbf 100644 --- a/pyinfra/webserver/prometheus.py +++ b/pyinfra/webserver/prometheus.py @@ -7,7 +7,8 @@ from funcy import identity from prometheus_client import generate_latest, CollectorRegistry, REGISTRY, Summary from starlette.responses import Response -from pyinfra.config.validation import prometheus_validators, validate_settings +from pyinfra.config.validators import prometheus_validators +from pyinfra.config.loader import validate_settings def add_prometheus_endpoint(app: FastAPI, registry: CollectorRegistry = REGISTRY) -> FastAPI: @@ -31,28 +32,24 @@ Decorator = TypeVar("Decorator", bound=Callable[[Callable], Callable]) def make_prometheus_processing_time_decorator_from_settings( - settings: Dynaconf, registry: CollectorRegistry = REGISTRY + settings: Dynaconf, + postfix: str = "processing_time", + registry: CollectorRegistry = REGISTRY, ) -> Decorator: """Make a decorator for monitoring the processing time of a function. The decorator is only applied if the prometheus metrics are enabled in the settings. + This, and other metrics should follow the convention + {product name}_{service name}_{processing step / parameter to monitor}. """ validate_settings(settings, validators=prometheus_validators) if not settings.metrics.prometheus.enabled: return identity - return make_prometheus_processing_time_decorator( - prefix=settings.metrics.prometheus.prefix, - registry=registry, - ) - - -def make_prometheus_processing_time_decorator( - prefix: str = "readactmanager_research_service", - registry: CollectorRegistry = REGISTRY, -) -> Decorator: processing_time_sum = Summary( - f"{prefix}_processing_time", "Summed up average processing time per call.", registry=registry + f"{settings.metrics.prometheus.prefix}_{postfix}", + "Summed up processing time per call.", + registry=registry, ) def decorator(process_fn: Callable) -> Callable: diff --git a/pyinfra/webserver/utils.py b/pyinfra/webserver/utils.py index fc7534b..fc8a16c 100644 --- a/pyinfra/webserver/utils.py +++ b/pyinfra/webserver/utils.py @@ -6,7 +6,8 @@ import uvicorn from dynaconf import Dynaconf from fastapi import FastAPI -from pyinfra.config.validation import webserver_validators, validate_settings +from pyinfra.config.validators import webserver_validators +from pyinfra.config.loader import validate_settings def create_webserver_thread_from_settings(app: FastAPI, settings: Dynaconf) -> threading.Thread: diff --git a/scripts/start_pyinfra.py b/scripts/start_pyinfra.py index 86fca6c..fa2d5de 100644 --- a/scripts/start_pyinfra.py +++ b/scripts/start_pyinfra.py @@ -1,30 +1,29 @@ -import logging import time +from pathlib import Path +from typing import Union from fastapi import FastAPI from pyinfra.config.loader import load_settings -from pyinfra.webserver.prometheus import make_prometheus_processing_time_decorator_from_settings, add_prometheus_endpoint from pyinfra.queue.callback import make_queue_message_callback from pyinfra.queue.manager import QueueManager +from pyinfra.webserver.prometheus import ( + make_prometheus_processing_time_decorator_from_settings, + add_prometheus_endpoint, +) from pyinfra.webserver.utils import create_webserver_thread_from_settings, add_health_check_endpoint -logging.basicConfig() -logger = logging.getLogger() -logger.setLevel(logging.INFO) -settings = load_settings() - - -@make_prometheus_processing_time_decorator_from_settings(settings) -def json_processor_mock(_data: dict, _message: dict) -> dict: +def processor_mock(_data: dict, _message: dict) -> dict: time.sleep(5) return {"result1": "result1"} -def main(): +def start_serving(process_fn, settings_path: Union[str, Path] = None): + settings = load_settings(settings_path) app = FastAPI() app = add_prometheus_endpoint(app) + process_fn = make_prometheus_processing_time_decorator_from_settings(settings)(process_fn) queue_manager = QueueManager(settings) @@ -33,9 +32,9 @@ def main(): webserver_thread = create_webserver_thread_from_settings(app, settings) webserver_thread.start() - callback = make_queue_message_callback(json_processor_mock, settings) + callback = make_queue_message_callback(process_fn, settings) queue_manager.start_consuming(callback) if __name__ == "__main__": - main() + start_serving(processor_mock)