feat(opentelemetry): fastAPI tracing

The tests don't work yet since the webserver has to run in a thread and
the traces don't get exported from the thread with local json exporting.
However, with an export to an external server this should still work.
WIP
This commit is contained in:
Julius Unverfehrt 2024-01-24 15:52:33 +01:00
parent da163897c4
commit e0b32fa448
5 changed files with 60 additions and 46 deletions

View File

@ -2,17 +2,22 @@ from dynaconf import Dynaconf
from fastapi import FastAPI from fastapi import FastAPI
from kn_utils.logging import logger from kn_utils.logging import logger
from pyinfra.config.loader import validate_settings, get_all_validators
from pyinfra.queue.callback import make_download_process_upload_callback, DataProcessor from pyinfra.queue.callback import make_download_process_upload_callback, DataProcessor
from pyinfra.queue.manager import QueueManager from pyinfra.queue.manager import QueueManager
from pyinfra.webserver.prometheus import add_prometheus_endpoint, \ from pyinfra.utils.opentelemetry import setup_trace, instrument_pika, instrument_app
make_prometheus_processing_time_decorator_from_settings from pyinfra.webserver.prometheus import (
add_prometheus_endpoint,
make_prometheus_processing_time_decorator_from_settings,
)
from pyinfra.webserver.utils import add_health_check_endpoint, create_webserver_thread_from_settings from pyinfra.webserver.utils import add_health_check_endpoint, create_webserver_thread_from_settings
def start_queue_consumer_with_prometheus_and_health_endpoints(process_fn: DataProcessor, settings: Dynaconf): def start_queue_consumer_with_prometheus_and_health_endpoints(process_fn: DataProcessor, settings: Dynaconf):
"""Default serving logic for research services. """Default serving logic for research services.
Supplies /health, /ready and /prometheus endpoints. The process_fn is monitored for processing time per call. Supplies /health, /ready and /prometheus endpoints (if enabled). The process_fn is monitored for processing time per
call. Also traces the queue messages (if enabled).
Workload is only received via queue messages. The message contains a file path to the data to be processed, which Workload is only received via queue messages. The message contains a file path to the data to be processed, which
gets downloaded from the storage. The data and the message are then passed to the process_fn. The process_fn should gets downloaded from the storage. The data and the message are then passed to the process_fn. The process_fn should
return a json serializable object. This object is then uploaded to the storage. The response message is just the return a json serializable object. This object is then uploaded to the storage. The response message is just the
@ -20,14 +25,23 @@ def start_queue_consumer_with_prometheus_and_health_endpoints(process_fn: DataPr
Adapt as needed. Adapt as needed.
""" """
validate_settings(settings, get_all_validators())
logger.info(f"Starting webserver and queue consumer...") logger.info(f"Starting webserver and queue consumer...")
app = FastAPI() app = FastAPI()
queue_manager = QueueManager(settings)
if settings.metrics.prometheus.enabled:
logger.info(f"Prometheus metrics enabled.")
app = add_prometheus_endpoint(app) app = add_prometheus_endpoint(app)
process_fn = make_prometheus_processing_time_decorator_from_settings(settings)(process_fn) process_fn = make_prometheus_processing_time_decorator_from_settings(settings)(process_fn)
queue_manager = QueueManager(settings) if settings.tracing.opentelemetry.enabled:
logger.info(f"OpenTelemetry tracing enabled.")
setup_trace(settings)
instrument_pika()
app = add_health_check_endpoint(app, queue_manager.is_ready) app = add_health_check_endpoint(app, queue_manager.is_ready)

View File

@ -2,6 +2,7 @@ import json
from dynaconf import Dynaconf from dynaconf import Dynaconf
from fastapi import FastAPI from fastapi import FastAPI
from kn_utils.logging import logger
from opentelemetry import trace from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
@ -33,10 +34,9 @@ def setup_trace(settings: Dynaconf, service_name: str = None, exporter: SpanExpo
exporter = exporter or get_exporter(settings) exporter = exporter or get_exporter(settings)
resource = Resource(attributes={"service.name": service_name}) resource = Resource(attributes={"service.name": service_name})
provider = TracerProvider(resource=resource) provider = TracerProvider(resource=resource, shutdown_on_exit=True)
processor = BatchSpanProcessor(exporter) processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor) provider.add_span_processor(processor)
# TODO: This produces a warning if trying to set the provider twice. # TODO: This produces a warning if trying to set the provider twice.
@ -51,7 +51,7 @@ def get_exporter(settings: Dynaconf):
if settings.tracing.opentelemetry.exporter == "json": if settings.tracing.opentelemetry.exporter == "json":
return JsonSpanExporter() return JsonSpanExporter()
elif settings.tracing.opentelemetry.exporter == "otlp": elif settings.tracing.opentelemetry.exporter == "otlp":
return OTLPSpanExporter(endpoint=settings.metrics.opentelemetry.endpoint) return OTLPSpanExporter(endpoint=settings.tracing.opentelemetry.endpoint)
elif settings.tracing.opentelemetry.exporter == "console": elif settings.tracing.opentelemetry.exporter == "console":
return ConsoleSpanExporter() return ConsoleSpanExporter()
else: else:

View File

@ -36,16 +36,11 @@ def make_prometheus_processing_time_decorator_from_settings(
postfix: str = "processing_time", postfix: str = "processing_time",
registry: CollectorRegistry = REGISTRY, registry: CollectorRegistry = REGISTRY,
) -> Decorator: ) -> Decorator:
"""Make a decorator for monitoring the processing time of a function. The decorator is only applied if the """Make a decorator for monitoring the processing time of a function. This, and other metrics should follow the
prometheus metrics are enabled in the settings. convention {product name}_{service name}_{processing step / parameter to monitor}.
This, and other metrics should follow the convention
{product name}_{service name}_{processing step / parameter to monitor}.
""" """
validate_settings(settings, validators=prometheus_validators) validate_settings(settings, validators=prometheus_validators)
if not settings.metrics.prometheus.enabled:
return identity
processing_time_sum = Summary( processing_time_sum = Summary(
f"{settings.metrics.prometheus.prefix}_{postfix}", f"{settings.metrics.prometheus.prefix}_{postfix}",
"Summed up processing time per call.", "Summed up processing time per call.",

View File

@ -8,11 +8,15 @@ from fastapi import FastAPI
from pyinfra.config.validators import webserver_validators from pyinfra.config.validators import webserver_validators
from pyinfra.config.loader import validate_settings from pyinfra.config.loader import validate_settings
from pyinfra.utils.opentelemetry import setup_trace, instrument_app
def create_webserver_thread_from_settings(app: FastAPI, settings: Dynaconf) -> threading.Thread: def create_webserver_thread_from_settings(app: FastAPI, settings: Dynaconf) -> threading.Thread:
validate_settings(settings, validators=webserver_validators) validate_settings(settings, validators=webserver_validators)
if settings.tracing.opentelemetry.enabled:
return create_webserver_thread_with_tracing(app, settings)
return create_webserver_thread(app=app, port=settings.webserver.port, host=settings.webserver.host) return create_webserver_thread(app=app, port=settings.webserver.port, host=settings.webserver.host)
@ -25,6 +29,19 @@ def create_webserver_thread(app: FastAPI, port: int, host: str) -> threading.Thr
return thread return thread
def create_webserver_thread_with_tracing(app: FastAPI, settings: Dynaconf) -> threading.Thread:
def inner():
setup_trace(settings)
instrument_app(app)
uvicorn.run(app, port=settings.webserver.port, host=settings.webserver.host, log_level=logging.WARNING)
thread = threading.Thread(target=inner)
thread.daemon = True
return thread
HealthFunction = Callable[[], bool] HealthFunction = Callable[[], bool]

View File

@ -1,28 +1,8 @@
from time import sleep from time import sleep
import pytest import pytest
import requests
from fastapi import FastAPI
from pyinfra.utils.opentelemetry import get_exporter, setup_trace, instrument_pika, instrument_app from pyinfra.utils.opentelemetry import get_exporter, setup_trace, instrument_pika
from pyinfra.webserver.utils import create_webserver_thread_from_settings
@pytest.fixture(scope="class")
def app_with_tracing(settings):
app = FastAPI()
@app.get("/test")
def test():
return {"flat": "earth"}
instrument_app(app)
thread = create_webserver_thread_from_settings(app, settings)
thread.start()
sleep(1)
yield
thread.join(timeout=1)
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
@ -33,7 +13,6 @@ def exporter(settings):
class TestOpenTelemetry: class TestOpenTelemetry:
def test_queue_messages_are_traced(self, queue_manager, input_message, stop_message, settings, exporter): def test_queue_messages_are_traced(self, queue_manager, input_message, stop_message, settings, exporter):
setup_trace(settings, exporter=exporter) setup_trace(settings, exporter=exporter)
instrument_pika() instrument_pika()
@ -53,11 +32,20 @@ class TestOpenTelemetry:
exported_trace["resource"]["attributes"]["service.name"] == settings.tracing.opentelemetry.service_name exported_trace["resource"]["attributes"]["service.name"] == settings.tracing.opentelemetry.service_name
) )
def test_webserver_requests_are_traced(self, settings, app_with_tracing, exporter): # def test_webserver_requests_are_traced(self, settings):
settings.tracing.opentelemetry.exporter = "json" # settings.tracing.opentelemetry.exporter = "console"
# settings.tracing.opentelemetry.enabled = True
setup_trace(settings, exporter=exporter) #
# app = FastAPI()
requests.get(f"http://{settings.webserver.host}:{settings.webserver.port}/test") #
# @app.get("/test")
print(exporter.traces) # def test():
# return {"test": "test"}
#
# thread = create_webserver_thread_from_settings(app, settings)
# thread.start()
# sleep(1)
#
# requests.get(f"http://{settings.webserver.host}:{settings.webserver.port}/test")
#
# thread.join(timeout=1)