feat(opentelemetry): add queue instrumenting test

This commit is contained in:
Julius Unverfehrt 2024-01-24 12:10:46 +01:00
parent 936bb4fe80
commit 739a7c0731
5 changed files with 86 additions and 45 deletions

View File

@ -46,6 +46,6 @@ webserver_validators = [
]
opentelemetry_validators = [
Validator("tracing.endpoint", must_exist=True, is_type_of=str),
Validator("tracing.service.name", must_exist=True, is_type_of=str),
Validator("tracing.opentelemetry.endpoint", must_exist=True, is_type_of=str),
Validator("tracing.opentelemetry.service_name", must_exist=True, is_type_of=str),
]

View File

@ -10,12 +10,6 @@ import pika
import pika.exceptions
from dynaconf import Dynaconf
from kn_utils.logging import logger
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.pika import PikaInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection
from retry import retry
@ -42,9 +36,6 @@ class QueueManager:
self.channel: Union[BlockingChannel, None] = None
self.connection_sleep = settings.rabbitmq.connection_sleep
self.tracing_endpoint = settings.tracing.endpoint
self.service_name = settings.tracing.service_name
atexit.register(self.stop_consuming)
signal.signal(signal.SIGTERM, self._handle_stop_signal)
signal.signal(signal.SIGINT, self._handle_stop_signal)
@ -85,17 +76,6 @@ class QueueManager:
logger.info("Connection to RabbitMQ established, channel open.")
resource = Resource(attributes={"service.name": self.service_name})
trace.set_tracer_provider(TracerProvider(resource=resource))
tracer = trace.get_tracer(__name__)
otlp_exporter = OTLPSpanExporter(endpoint=self.tracing_endpoint)
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
pika_instrumentation = PikaInstrumentor()
pika_instrumentation.instrument_channel(channel=self.channel)
def is_ready(self):
self.establish_connection()
return self.channel.is_open

View File

@ -1,6 +1,9 @@
import json
import pytest
from pyinfra.config.loader import load_settings, pyinfra_config_path
from pyinfra.queue.manager import QueueManager
from pyinfra.storage.connection import get_storage_from_settings
@ -18,3 +21,26 @@ def storage(storage_backend, settings):
yield storage
storage.clear_bucket()
@pytest.fixture(scope="session")
def queue_manager(settings):
settings.rabbitmq_heartbeat = 10
settings.connection_sleep = 5
queue_manager = QueueManager(settings)
yield queue_manager
@pytest.fixture
def input_message():
return json.dumps(
{
"targetFilePath": "test/target.json.gz",
"responseFilePath": "test/response.json.gz",
}
)
@pytest.fixture
def stop_message():
return "STOP"

View File

@ -0,0 +1,58 @@
import json
from time import sleep
from opentelemetry.instrumentation.pika import PikaInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter, SpanExportResult
from opentelemetry import trace
class MySpanExporter(SpanExporter):
def __init__(self):
self.traces = []
def export(self, spans):
for span in spans:
self.traces.append(span.to_json())
return SpanExportResult.SUCCESS
def shutdown(self):
pass
class TestOpenTelemetry:
def test_queue_messages_are_traced(self, queue_manager, input_message, stop_message):
service_name = "deine-mutter-serivce"
resource = Resource(attributes={"service.name": service_name})
provider = TracerProvider(resource=resource)
# processor = BatchSpanProcessor(ConsoleSpanExporter())
exporter = MySpanExporter()
processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor)
# otlp_exporter = OTLPSpanExporter(endpoint=self.tracing_endpoint)
# span_processor = BatchSpanProcessor(otlp_exporter)
# trace.get_tracer_provider().add_span_processor(span_processor)
# Sets the global default tracer provider
trace.set_tracer_provider(provider)
# Creates a tracer from the global tracer provider
tracer = trace.get_tracer("my.tracer.name")
PikaInstrumentor().instrument()
queue_manager.purge_queues()
queue_manager.publish_message_to_input_queue(input_message)
queue_manager.publish_message_to_input_queue(stop_message)
def callback(_):
sleep(2)
return {"flat": "earth"}
queue_manager.start_consuming(callback)
for exported_trace in exporter.traces:
exported_trace = json.loads(exported_trace)
assert exported_trace["resource"]["attributes"]["service.name"] == service_name

View File

@ -6,8 +6,6 @@ import pika
import pytest
from kn_utils.logging import logger
from pyinfra.queue.manager import QueueManager
logger.remove()
logger.add(sink=stdout, level="DEBUG")
@ -20,27 +18,6 @@ def make_callback(process_time):
return callback
@pytest.fixture(scope="session")
def queue_manager(settings):
settings.rabbitmq_heartbeat = 10
settings.connection_sleep = 5
queue_manager = QueueManager(settings)
yield queue_manager
@pytest.fixture
def input_message():
return json.dumps({
"targetFilePath": "test/target.json.gz",
"responseFilePath": "test/response.json.gz",
})
@pytest.fixture
def stop_message():
return "STOP"
class TestQueueManager:
def test_processing_of_several_messages(self, queue_manager, input_message, stop_message):
queue_manager.purge_queues()