61 lines
2.1 KiB
Python
61 lines
2.1 KiB
Python
import logging
|
|
import threading
|
|
from typing import Callable
|
|
|
|
import uvicorn
|
|
from dynaconf import Dynaconf
|
|
from fastapi import FastAPI
|
|
|
|
from pyinfra.config.loader import validate_settings
|
|
from pyinfra.config.validators import webserver_validators
|
|
from pyinfra.utils.opentelemetry import instrument_app, setup_trace
|
|
|
|
|
|
def create_webserver_thread_from_settings(app: FastAPI, settings: Dynaconf) -> threading.Thread:
|
|
validate_settings(settings, validators=webserver_validators)
|
|
|
|
if settings.tracing.opentelemetry.enabled:
|
|
return create_webserver_thread_with_tracing(app, settings)
|
|
|
|
return create_webserver_thread(app=app, port=settings.webserver.port, host=settings.webserver.host)
|
|
|
|
|
|
def create_webserver_thread(app: FastAPI, port: int, host: str) -> threading.Thread:
|
|
"""Creates a thread that runs a FastAPI webserver. Start with thread.start(), and join with thread.join().
|
|
Note that the thread is a daemon thread, so it will be terminated when the main thread is terminated.
|
|
"""
|
|
thread = threading.Thread(target=lambda: uvicorn.run(app, port=port, host=host, log_level=logging.WARNING))
|
|
thread.daemon = True
|
|
return thread
|
|
|
|
|
|
def create_webserver_thread_with_tracing(app: FastAPI, settings: Dynaconf) -> threading.Thread:
|
|
def inner():
|
|
setup_trace(settings)
|
|
instrument_app(app)
|
|
uvicorn.run(app, port=settings.webserver.port, host=settings.webserver.host, log_level=logging.WARNING)
|
|
|
|
thread = threading.Thread(target=inner)
|
|
thread.daemon = True
|
|
|
|
return thread
|
|
|
|
|
|
HealthFunction = Callable[[], bool]
|
|
|
|
|
|
def add_health_check_endpoint(app: FastAPI, health_function: HealthFunction) -> FastAPI:
|
|
"""Add a health check endpoint to the app. The health function should return True if the service is healthy,
|
|
and False otherwise. The health function is called when the endpoint is hit.
|
|
"""
|
|
|
|
@app.get("/health")
|
|
@app.get("/ready")
|
|
def check_health():
|
|
if health_function():
|
|
return {"status": "OK"}, 200
|
|
else:
|
|
return {"status": "Service Unavailable"}, 503
|
|
|
|
return app
|