From e0b32fa4488b919c87746737abc13e9993419831 Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Wed, 24 Jan 2024 15:52:33 +0100 Subject: [PATCH] feat(opentelemetry): fastAPI tracing The tests don't work yet since the webserver has to run in a thread and the traces don't get exported from the thread with local json exporting. However, with an export to an external server this should still work. WIP --- pyinfra/examples.py | 26 +++++++++++---- pyinfra/utils/opentelemetry.py | 6 ++-- pyinfra/webserver/prometheus.py | 9 ++--- pyinfra/webserver/utils.py | 17 ++++++++++ tests/unit_test/opentelemetry_test.py | 48 ++++++++++----------------- 5 files changed, 60 insertions(+), 46 deletions(-) diff --git a/pyinfra/examples.py b/pyinfra/examples.py index e147135..ab00c4f 100644 --- a/pyinfra/examples.py +++ b/pyinfra/examples.py @@ -2,17 +2,22 @@ from dynaconf import Dynaconf from fastapi import FastAPI from kn_utils.logging import logger +from pyinfra.config.loader import validate_settings, get_all_validators from pyinfra.queue.callback import make_download_process_upload_callback, DataProcessor from pyinfra.queue.manager import QueueManager -from pyinfra.webserver.prometheus import add_prometheus_endpoint, \ - make_prometheus_processing_time_decorator_from_settings +from pyinfra.utils.opentelemetry import setup_trace, instrument_pika, instrument_app +from pyinfra.webserver.prometheus import ( + add_prometheus_endpoint, + make_prometheus_processing_time_decorator_from_settings, +) from pyinfra.webserver.utils import add_health_check_endpoint, create_webserver_thread_from_settings def start_queue_consumer_with_prometheus_and_health_endpoints(process_fn: DataProcessor, settings: Dynaconf): """Default serving logic for research services. - Supplies /health, /ready and /prometheus endpoints. The process_fn is monitored for processing time per call. + Supplies /health, /ready and /prometheus endpoints (if enabled). The process_fn is monitored for processing time per + call. Also traces the queue messages (if enabled). Workload is only received via queue messages. The message contains a file path to the data to be processed, which gets downloaded from the storage. The data and the message are then passed to the process_fn. The process_fn should return a json serializable object. This object is then uploaded to the storage. The response message is just the @@ -20,15 +25,24 @@ def start_queue_consumer_with_prometheus_and_health_endpoints(process_fn: DataPr Adapt as needed. """ + validate_settings(settings, get_all_validators()) + logger.info(f"Starting webserver and queue consumer...") app = FastAPI() - app = add_prometheus_endpoint(app) - process_fn = make_prometheus_processing_time_decorator_from_settings(settings)(process_fn) - queue_manager = QueueManager(settings) + if settings.metrics.prometheus.enabled: + logger.info(f"Prometheus metrics enabled.") + app = add_prometheus_endpoint(app) + process_fn = make_prometheus_processing_time_decorator_from_settings(settings)(process_fn) + + if settings.tracing.opentelemetry.enabled: + logger.info(f"OpenTelemetry tracing enabled.") + setup_trace(settings) + instrument_pika() + app = add_health_check_endpoint(app, queue_manager.is_ready) webserver_thread = create_webserver_thread_from_settings(app, settings) diff --git a/pyinfra/utils/opentelemetry.py b/pyinfra/utils/opentelemetry.py index c96d8e5..c994ea1 100644 --- a/pyinfra/utils/opentelemetry.py +++ b/pyinfra/utils/opentelemetry.py @@ -2,6 +2,7 @@ import json from dynaconf import Dynaconf from fastapi import FastAPI +from kn_utils.logging import logger from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor @@ -33,10 +34,9 @@ def setup_trace(settings: Dynaconf, service_name: str = None, exporter: SpanExpo exporter = exporter or get_exporter(settings) resource = Resource(attributes={"service.name": service_name}) - provider = TracerProvider(resource=resource) + provider = TracerProvider(resource=resource, shutdown_on_exit=True) processor = BatchSpanProcessor(exporter) - provider.add_span_processor(processor) # TODO: This produces a warning if trying to set the provider twice. @@ -51,7 +51,7 @@ def get_exporter(settings: Dynaconf): if settings.tracing.opentelemetry.exporter == "json": return JsonSpanExporter() elif settings.tracing.opentelemetry.exporter == "otlp": - return OTLPSpanExporter(endpoint=settings.metrics.opentelemetry.endpoint) + return OTLPSpanExporter(endpoint=settings.tracing.opentelemetry.endpoint) elif settings.tracing.opentelemetry.exporter == "console": return ConsoleSpanExporter() else: diff --git a/pyinfra/webserver/prometheus.py b/pyinfra/webserver/prometheus.py index a274dbf..4bd52dc 100644 --- a/pyinfra/webserver/prometheus.py +++ b/pyinfra/webserver/prometheus.py @@ -36,16 +36,11 @@ def make_prometheus_processing_time_decorator_from_settings( postfix: str = "processing_time", registry: CollectorRegistry = REGISTRY, ) -> Decorator: - """Make a decorator for monitoring the processing time of a function. The decorator is only applied if the - prometheus metrics are enabled in the settings. - This, and other metrics should follow the convention - {product name}_{service name}_{processing step / parameter to monitor}. + """Make a decorator for monitoring the processing time of a function. This, and other metrics should follow the + convention {product name}_{service name}_{processing step / parameter to monitor}. """ validate_settings(settings, validators=prometheus_validators) - if not settings.metrics.prometheus.enabled: - return identity - processing_time_sum = Summary( f"{settings.metrics.prometheus.prefix}_{postfix}", "Summed up processing time per call.", diff --git a/pyinfra/webserver/utils.py b/pyinfra/webserver/utils.py index fc8a16c..ffe72c5 100644 --- a/pyinfra/webserver/utils.py +++ b/pyinfra/webserver/utils.py @@ -8,11 +8,15 @@ from fastapi import FastAPI from pyinfra.config.validators import webserver_validators from pyinfra.config.loader import validate_settings +from pyinfra.utils.opentelemetry import setup_trace, instrument_app def create_webserver_thread_from_settings(app: FastAPI, settings: Dynaconf) -> threading.Thread: validate_settings(settings, validators=webserver_validators) + if settings.tracing.opentelemetry.enabled: + return create_webserver_thread_with_tracing(app, settings) + return create_webserver_thread(app=app, port=settings.webserver.port, host=settings.webserver.host) @@ -25,6 +29,19 @@ def create_webserver_thread(app: FastAPI, port: int, host: str) -> threading.Thr return thread +def create_webserver_thread_with_tracing(app: FastAPI, settings: Dynaconf) -> threading.Thread: + + def inner(): + setup_trace(settings) + instrument_app(app) + uvicorn.run(app, port=settings.webserver.port, host=settings.webserver.host, log_level=logging.WARNING) + + thread = threading.Thread(target=inner) + thread.daemon = True + + return thread + + HealthFunction = Callable[[], bool] diff --git a/tests/unit_test/opentelemetry_test.py b/tests/unit_test/opentelemetry_test.py index a38dbbc..e624b18 100644 --- a/tests/unit_test/opentelemetry_test.py +++ b/tests/unit_test/opentelemetry_test.py @@ -1,28 +1,8 @@ from time import sleep import pytest -import requests -from fastapi import FastAPI -from pyinfra.utils.opentelemetry import get_exporter, setup_trace, instrument_pika, instrument_app -from pyinfra.webserver.utils import create_webserver_thread_from_settings - - -@pytest.fixture(scope="class") -def app_with_tracing(settings): - app = FastAPI() - - @app.get("/test") - def test(): - return {"flat": "earth"} - - instrument_app(app) - - thread = create_webserver_thread_from_settings(app, settings) - thread.start() - sleep(1) - yield - thread.join(timeout=1) +from pyinfra.utils.opentelemetry import get_exporter, setup_trace, instrument_pika @pytest.fixture(scope="session") @@ -33,7 +13,6 @@ def exporter(settings): class TestOpenTelemetry: def test_queue_messages_are_traced(self, queue_manager, input_message, stop_message, settings, exporter): - setup_trace(settings, exporter=exporter) instrument_pika() @@ -53,11 +32,20 @@ class TestOpenTelemetry: exported_trace["resource"]["attributes"]["service.name"] == settings.tracing.opentelemetry.service_name ) - def test_webserver_requests_are_traced(self, settings, app_with_tracing, exporter): - settings.tracing.opentelemetry.exporter = "json" - - setup_trace(settings, exporter=exporter) - - requests.get(f"http://{settings.webserver.host}:{settings.webserver.port}/test") - - print(exporter.traces) + # def test_webserver_requests_are_traced(self, settings): + # settings.tracing.opentelemetry.exporter = "console" + # settings.tracing.opentelemetry.enabled = True + # + # app = FastAPI() + # + # @app.get("/test") + # def test(): + # return {"test": "test"} + # + # thread = create_webserver_thread_from_settings(app, settings) + # thread.start() + # sleep(1) + # + # requests.get(f"http://{settings.webserver.host}:{settings.webserver.port}/test") + # + # thread.join(timeout=1)