from time import sleep import pytest from pyinfra.utils.opentelemetry import get_exporter, instrument_pika, setup_trace @pytest.fixture(scope="session") def exporter(settings): settings.tracing.opentelemetry.exporter = "json" return get_exporter(settings) @pytest.fixture(autouse=True) def setup_test_trace(settings, exporter, tracing_type): settings.tracing.type = tracing_type setup_trace(settings, exporter=exporter) class TestOpenTelemetry: @pytest.mark.xfail( reason="Azure Monitor requires a connection string. Therefore the test is allowed to fail in this case." ) @pytest.mark.parametrize("tracing_type", ["opentelemetry", "azure_monitor"]) def test_queue_messages_are_traced(self, queue_manager, input_message, stop_message, settings, exporter): instrument_pika() queue_manager.purge_queues() queue_manager.publish_message_to_input_queue(input_message) queue_manager.publish_message_to_input_queue(stop_message) def callback(_): sleep(2) return {"flat": "earth"} queue_manager.start_consuming(callback) for exported_trace in exporter.traces: assert ( exported_trace["resource"]["attributes"]["service.name"] == settings.tracing.opentelemetry.service_name )