diff --git a/pyinfra/config/validators.py b/pyinfra/config/validators.py index 4db9f84..bb1204b 100644 --- a/pyinfra/config/validators.py +++ b/pyinfra/config/validators.py @@ -46,6 +46,6 @@ webserver_validators = [ ] opentelemetry_validators = [ - Validator("tracing.endpoint", must_exist=True, is_type_of=str), - Validator("tracing.service.name", must_exist=True, is_type_of=str), + Validator("tracing.opentelemetry.endpoint", must_exist=True, is_type_of=str), + Validator("tracing.opentelemetry.service_name", must_exist=True, is_type_of=str), ] diff --git a/pyinfra/queue/manager.py b/pyinfra/queue/manager.py index f23a179..e284438 100644 --- a/pyinfra/queue/manager.py +++ b/pyinfra/queue/manager.py @@ -10,12 +10,6 @@ import pika import pika.exceptions from dynaconf import Dynaconf from kn_utils.logging import logger -from opentelemetry import trace -from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter -from opentelemetry.instrumentation.pika import PikaInstrumentor -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection from retry import retry @@ -42,9 +36,6 @@ class QueueManager: self.channel: Union[BlockingChannel, None] = None self.connection_sleep = settings.rabbitmq.connection_sleep - self.tracing_endpoint = settings.tracing.endpoint - self.service_name = settings.tracing.service_name - atexit.register(self.stop_consuming) signal.signal(signal.SIGTERM, self._handle_stop_signal) signal.signal(signal.SIGINT, self._handle_stop_signal) @@ -85,17 +76,6 @@ class QueueManager: logger.info("Connection to RabbitMQ established, channel open.") - resource = Resource(attributes={"service.name": self.service_name}) - trace.set_tracer_provider(TracerProvider(resource=resource)) - tracer = trace.get_tracer(__name__) - - otlp_exporter = OTLPSpanExporter(endpoint=self.tracing_endpoint) - span_processor = BatchSpanProcessor(otlp_exporter) - trace.get_tracer_provider().add_span_processor(span_processor) - - pika_instrumentation = PikaInstrumentor() - pika_instrumentation.instrument_channel(channel=self.channel) - def is_ready(self): self.establish_connection() return self.channel.is_open diff --git a/tests/conftest.py b/tests/conftest.py index 4939c83..796a508 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,9 @@ +import json + import pytest from pyinfra.config.loader import load_settings, pyinfra_config_path +from pyinfra.queue.manager import QueueManager from pyinfra.storage.connection import get_storage_from_settings @@ -18,3 +21,26 @@ def storage(storage_backend, settings): yield storage storage.clear_bucket() + + +@pytest.fixture(scope="session") +def queue_manager(settings): + settings.rabbitmq_heartbeat = 10 + settings.connection_sleep = 5 + queue_manager = QueueManager(settings) + yield queue_manager + + +@pytest.fixture +def input_message(): + return json.dumps( + { + "targetFilePath": "test/target.json.gz", + "responseFilePath": "test/response.json.gz", + } + ) + + +@pytest.fixture +def stop_message(): + return "STOP" diff --git a/tests/unit_test/opentelemetry_test.py b/tests/unit_test/opentelemetry_test.py new file mode 100644 index 0000000..a21c563 --- /dev/null +++ b/tests/unit_test/opentelemetry_test.py @@ -0,0 +1,58 @@ +import json +from time import sleep + +from opentelemetry.instrumentation.pika import PikaInstrumentor +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter, SpanExportResult +from opentelemetry import trace + + +class MySpanExporter(SpanExporter): + def __init__(self): + self.traces = [] + + def export(self, spans): + for span in spans: + self.traces.append(span.to_json()) + return SpanExportResult.SUCCESS + + def shutdown(self): + pass + + +class TestOpenTelemetry: + def test_queue_messages_are_traced(self, queue_manager, input_message, stop_message): + service_name = "deine-mutter-serivce" + + resource = Resource(attributes={"service.name": service_name}) + provider = TracerProvider(resource=resource) + # processor = BatchSpanProcessor(ConsoleSpanExporter()) + exporter = MySpanExporter() + processor = BatchSpanProcessor(exporter) + provider.add_span_processor(processor) + # otlp_exporter = OTLPSpanExporter(endpoint=self.tracing_endpoint) + # span_processor = BatchSpanProcessor(otlp_exporter) + # trace.get_tracer_provider().add_span_processor(span_processor) + + # Sets the global default tracer provider + trace.set_tracer_provider(provider) + + # Creates a tracer from the global tracer provider + tracer = trace.get_tracer("my.tracer.name") + + PikaInstrumentor().instrument() + + queue_manager.purge_queues() + queue_manager.publish_message_to_input_queue(input_message) + queue_manager.publish_message_to_input_queue(stop_message) + + def callback(_): + sleep(2) + return {"flat": "earth"} + + queue_manager.start_consuming(callback) + + for exported_trace in exporter.traces: + exported_trace = json.loads(exported_trace) + assert exported_trace["resource"]["attributes"]["service.name"] == service_name \ No newline at end of file diff --git a/tests/unit_test/queue_test.py b/tests/unit_test/queue_test.py index 720c6c3..492b24f 100644 --- a/tests/unit_test/queue_test.py +++ b/tests/unit_test/queue_test.py @@ -6,8 +6,6 @@ import pika import pytest from kn_utils.logging import logger -from pyinfra.queue.manager import QueueManager - logger.remove() logger.add(sink=stdout, level="DEBUG") @@ -20,27 +18,6 @@ def make_callback(process_time): return callback -@pytest.fixture(scope="session") -def queue_manager(settings): - settings.rabbitmq_heartbeat = 10 - settings.connection_sleep = 5 - queue_manager = QueueManager(settings) - yield queue_manager - - -@pytest.fixture -def input_message(): - return json.dumps({ - "targetFilePath": "test/target.json.gz", - "responseFilePath": "test/response.json.gz", - }) - - -@pytest.fixture -def stop_message(): - return "STOP" - - class TestQueueManager: def test_processing_of_several_messages(self, queue_manager, input_message, stop_message): queue_manager.purge_queues()