diff --git a/pyinfra/utils/opentelemetry.py b/pyinfra/utils/opentelemetry.py new file mode 100644 index 0000000..3c8f46e --- /dev/null +++ b/pyinfra/utils/opentelemetry.py @@ -0,0 +1,64 @@ +import json + +from dynaconf import Dynaconf +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.instrumentation.pika import PikaInstrumentor +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult + +from pyinfra.config.loader import validate_settings +from pyinfra.config.validators import opentelemetry_validators + + +class JsonSpanExporter(SpanExporter): + def __init__(self): + self.traces = [] + + def export(self, spans): + for span in spans: + self.traces.append(json.loads(span.to_json())) + return SpanExportResult.SUCCESS + + def shutdown(self): + pass + + +def setup_trace(settings: Dynaconf, service_name: str = None, exporter: SpanExporter = None): + service_name = service_name or settings.tracing.opentelemetry.service_name + exporter = exporter or get_exporter(settings) + + resource = Resource(attributes={"service.name": service_name}) + provider = TracerProvider(resource=resource) + + processor = BatchSpanProcessor(exporter) + + provider.add_span_processor(processor) + + trace.set_tracer_provider(provider) + + +def get_exporter(settings: Dynaconf): + validate_settings(settings, validators=opentelemetry_validators) + + if settings.tracing.opentelemetry.exporter == "json": + return JsonSpanExporter() + elif settings.tracing.opentelemetry.exporter == "otlp": + return OTLPSpanExporter(endpoint=settings.metrics.opentelemetry.endpoint) + elif settings.tracing.opentelemetry.exporter == "console": + return ConsoleSpanExporter() + else: + raise ValueError( + f"Invalid OpenTelemetry exporter {settings.tracing.opentelemetry.exporter}. " + f"Valid values are 'json', 'otlp' and 'console'." + ) + + +def instrument_pika(): + PikaInstrumentor().instrument() + + +# def instrument_app(app: FastAPI): +# FastAPIInstrumentor().instrument_app(app) diff --git a/tests/unit_test/opentelemetry_test.py b/tests/unit_test/opentelemetry_test.py index a21c563..a1c2599 100644 --- a/tests/unit_test/opentelemetry_test.py +++ b/tests/unit_test/opentelemetry_test.py @@ -1,47 +1,16 @@ -import json from time import sleep -from opentelemetry.instrumentation.pika import PikaInstrumentor -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter, SpanExportResult -from opentelemetry import trace - - -class MySpanExporter(SpanExporter): - def __init__(self): - self.traces = [] - - def export(self, spans): - for span in spans: - self.traces.append(span.to_json()) - return SpanExportResult.SUCCESS - - def shutdown(self): - pass +from pyinfra.utils.opentelemetry import get_exporter, setup_trace, instrument_pika class TestOpenTelemetry: - def test_queue_messages_are_traced(self, queue_manager, input_message, stop_message): - service_name = "deine-mutter-serivce" + def test_queue_messages_are_traced(self, queue_manager, input_message, stop_message, settings): + settings.tracing.opentelemetry.exporter = "json" - resource = Resource(attributes={"service.name": service_name}) - provider = TracerProvider(resource=resource) - # processor = BatchSpanProcessor(ConsoleSpanExporter()) - exporter = MySpanExporter() - processor = BatchSpanProcessor(exporter) - provider.add_span_processor(processor) - # otlp_exporter = OTLPSpanExporter(endpoint=self.tracing_endpoint) - # span_processor = BatchSpanProcessor(otlp_exporter) - # trace.get_tracer_provider().add_span_processor(span_processor) + exporter = get_exporter(settings) + setup_trace(settings, exporter=exporter) - # Sets the global default tracer provider - trace.set_tracer_provider(provider) - - # Creates a tracer from the global tracer provider - tracer = trace.get_tracer("my.tracer.name") - - PikaInstrumentor().instrument() + instrument_pika() queue_manager.purge_queues() queue_manager.publish_message_to_input_queue(input_message) @@ -54,5 +23,6 @@ class TestOpenTelemetry: queue_manager.start_consuming(callback) for exported_trace in exporter.traces: - exported_trace = json.loads(exported_trace) - assert exported_trace["resource"]["attributes"]["service.name"] == service_name \ No newline at end of file + assert ( + exported_trace["resource"]["attributes"]["service.name"] == settings.tracing.opentelemetry.service_name + )