pyinfra/pyinfra/utils/config_validation.py
2024-01-17 17:39:53 +01:00

56 lines
1.7 KiB
Python

from dynaconf import Validator, Dynaconf, ValidationError
from kn_utils.logging import logger
queue_manager_validators = [
Validator("rabbitmq.host", must_exist=True),
Validator("rabbitmq.port", must_exist=True),
Validator("rabbitmq.username", must_exist=True),
Validator("rabbitmq.password", must_exist=True),
Validator("rabbitmq.heartbeat", must_exist=True),
Validator("rabbitmq.connection_sleep", must_exist=True),
Validator("rabbitmq.input_queue", must_exist=True),
Validator("rabbitmq.output_queue", must_exist=True),
Validator("rabbitmq.dead_letter_queue", must_exist=True),
]
azure_storage_validators = [
Validator("storage.azure.connection_string", must_exist=True),
]
s3_storage_validators = [
Validator("storage.s3.endpoint", must_exist=True),
Validator("storage.s3.key", must_exist=True),
Validator("storage.s3.secret", must_exist=True),
Validator("storage.s3.region", must_exist=True),
]
storage_validators = [
Validator("storage.backend", must_exist=True),
]
prometheus_validators = [
Validator("metrics.prometheus.prefix", must_exist=True),
Validator("metrics.prometheus.enabled", must_exist=True),
]
webserver_validators = [
Validator("webserver.host", must_exist=True),
Validator("webserver.port", must_exist=True),
]
def validate_settings(settings: Dynaconf, validators):
settings_valid = True
for validator in validators:
try:
validator.validate(settings)
except ValidationError as e:
settings_valid = False
logger.warning(e)
if not settings_valid:
raise ValidationError("Settings validation failed.")
logger.info("Settings validated.")