feat(opentelemetry): put logic in own module
This commit is contained in:
parent
739a7c0731
commit
a415666830
64
pyinfra/utils/opentelemetry.py
Normal file
64
pyinfra/utils/opentelemetry.py
Normal file
@ -0,0 +1,64 @@
|
||||
import json
|
||||
|
||||
from dynaconf import Dynaconf
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
|
||||
from opentelemetry.instrumentation.pika import PikaInstrumentor
|
||||
from opentelemetry.sdk.resources import Resource
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
|
||||
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
|
||||
|
||||
from pyinfra.config.loader import validate_settings
|
||||
from pyinfra.config.validators import opentelemetry_validators
|
||||
|
||||
|
||||
class JsonSpanExporter(SpanExporter):
|
||||
def __init__(self):
|
||||
self.traces = []
|
||||
|
||||
def export(self, spans):
|
||||
for span in spans:
|
||||
self.traces.append(json.loads(span.to_json()))
|
||||
return SpanExportResult.SUCCESS
|
||||
|
||||
def shutdown(self):
|
||||
pass
|
||||
|
||||
|
||||
def setup_trace(settings: Dynaconf, service_name: str = None, exporter: SpanExporter = None):
|
||||
service_name = service_name or settings.tracing.opentelemetry.service_name
|
||||
exporter = exporter or get_exporter(settings)
|
||||
|
||||
resource = Resource(attributes={"service.name": service_name})
|
||||
provider = TracerProvider(resource=resource)
|
||||
|
||||
processor = BatchSpanProcessor(exporter)
|
||||
|
||||
provider.add_span_processor(processor)
|
||||
|
||||
trace.set_tracer_provider(provider)
|
||||
|
||||
|
||||
def get_exporter(settings: Dynaconf):
|
||||
validate_settings(settings, validators=opentelemetry_validators)
|
||||
|
||||
if settings.tracing.opentelemetry.exporter == "json":
|
||||
return JsonSpanExporter()
|
||||
elif settings.tracing.opentelemetry.exporter == "otlp":
|
||||
return OTLPSpanExporter(endpoint=settings.metrics.opentelemetry.endpoint)
|
||||
elif settings.tracing.opentelemetry.exporter == "console":
|
||||
return ConsoleSpanExporter()
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Invalid OpenTelemetry exporter {settings.tracing.opentelemetry.exporter}. "
|
||||
f"Valid values are 'json', 'otlp' and 'console'."
|
||||
)
|
||||
|
||||
|
||||
def instrument_pika():
|
||||
PikaInstrumentor().instrument()
|
||||
|
||||
|
||||
# def instrument_app(app: FastAPI):
|
||||
# FastAPIInstrumentor().instrument_app(app)
|
||||
@ -1,47 +1,16 @@
|
||||
import json
|
||||
from time import sleep
|
||||
|
||||
from opentelemetry.instrumentation.pika import PikaInstrumentor
|
||||
from opentelemetry.sdk.resources import Resource
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter, SpanExportResult
|
||||
from opentelemetry import trace
|
||||
|
||||
|
||||
class MySpanExporter(SpanExporter):
|
||||
def __init__(self):
|
||||
self.traces = []
|
||||
|
||||
def export(self, spans):
|
||||
for span in spans:
|
||||
self.traces.append(span.to_json())
|
||||
return SpanExportResult.SUCCESS
|
||||
|
||||
def shutdown(self):
|
||||
pass
|
||||
from pyinfra.utils.opentelemetry import get_exporter, setup_trace, instrument_pika
|
||||
|
||||
|
||||
class TestOpenTelemetry:
|
||||
def test_queue_messages_are_traced(self, queue_manager, input_message, stop_message):
|
||||
service_name = "deine-mutter-serivce"
|
||||
def test_queue_messages_are_traced(self, queue_manager, input_message, stop_message, settings):
|
||||
settings.tracing.opentelemetry.exporter = "json"
|
||||
|
||||
resource = Resource(attributes={"service.name": service_name})
|
||||
provider = TracerProvider(resource=resource)
|
||||
# processor = BatchSpanProcessor(ConsoleSpanExporter())
|
||||
exporter = MySpanExporter()
|
||||
processor = BatchSpanProcessor(exporter)
|
||||
provider.add_span_processor(processor)
|
||||
# otlp_exporter = OTLPSpanExporter(endpoint=self.tracing_endpoint)
|
||||
# span_processor = BatchSpanProcessor(otlp_exporter)
|
||||
# trace.get_tracer_provider().add_span_processor(span_processor)
|
||||
exporter = get_exporter(settings)
|
||||
setup_trace(settings, exporter=exporter)
|
||||
|
||||
# Sets the global default tracer provider
|
||||
trace.set_tracer_provider(provider)
|
||||
|
||||
# Creates a tracer from the global tracer provider
|
||||
tracer = trace.get_tracer("my.tracer.name")
|
||||
|
||||
PikaInstrumentor().instrument()
|
||||
instrument_pika()
|
||||
|
||||
queue_manager.purge_queues()
|
||||
queue_manager.publish_message_to_input_queue(input_message)
|
||||
@ -54,5 +23,6 @@ class TestOpenTelemetry:
|
||||
queue_manager.start_consuming(callback)
|
||||
|
||||
for exported_trace in exporter.traces:
|
||||
exported_trace = json.loads(exported_trace)
|
||||
assert exported_trace["resource"]["attributes"]["service.name"] == service_name
|
||||
assert (
|
||||
exported_trace["resource"]["attributes"]["service.name"] == settings.tracing.opentelemetry.service_name
|
||||
)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user