from time import sleep import pytest from pyinfra.utils.opentelemetry import get_exporter, instrument_pika, setup_trace @pytest.fixture(scope="session") def exporter(settings): settings.tracing.opentelemetry.exporter = "json" return get_exporter(settings) class TestOpenTelemetry: def test_queue_messages_are_traced(self, queue_manager, input_message, stop_message, settings, exporter): setup_trace(settings, exporter=exporter) instrument_pika() queue_manager.purge_queues() queue_manager.publish_message_to_input_queue(input_message) queue_manager.publish_message_to_input_queue(stop_message) def callback(_): sleep(2) return {"flat": "earth"} queue_manager.start_consuming(callback) for exported_trace in exporter.traces: assert ( exported_trace["resource"]["attributes"]["service.name"] == settings.tracing.opentelemetry.service_name ) # def test_webserver_requests_are_traced(self, settings): # settings.tracing.opentelemetry.exporter = "console" # settings.tracing.enabled = True # # app = FastAPI() # # @app.get("/test") # def test(): # return {"test": "test"} # # thread = create_webserver_thread_from_settings(app, settings) # thread.start() # sleep(1) # # requests.get(f"http://{settings.webserver.host}:{settings.webserver.port}/test") # # thread.join(timeout=1)