pyinfra/tests/integration_test/opentelemetry_test.py
2024-11-18 17:31:15 +01:00

42 lines
1.3 KiB
Python

from time import sleep
import pytest
from pyinfra.utils.opentelemetry import get_exporter, instrument_pika, setup_trace
@pytest.fixture(scope="session")
def exporter(settings):
settings.tracing.opentelemetry.exporter = "json"
return get_exporter(settings)
@pytest.fixture(autouse=True)
def setup_test_trace(settings, exporter, tracing_type):
settings.tracing.type = tracing_type
setup_trace(settings, exporter=exporter)
class TestOpenTelemetry:
@pytest.mark.xfail(
reason="Azure Monitor requires a connection string. Therefore the test is allowed to fail in this case."
)
@pytest.mark.parametrize("tracing_type", ["opentelemetry", "azure_monitor"])
def test_queue_messages_are_traced(self, queue_manager, input_message, stop_message, settings, exporter):
instrument_pika()
queue_manager.purge_queues()
queue_manager.publish_message_to_input_queue(input_message)
queue_manager.publish_message_to_input_queue(stop_message)
def callback(_):
sleep(2)
return {"flat": "earth"}
queue_manager.start_consuming(callback)
for exported_trace in exporter.traces:
assert (
exported_trace["resource"]["attributes"]["service.name"] == settings.tracing.opentelemetry.service_name
)