pyinfra/tests/unit_test/opentelemetry_test.py
2024-01-24 14:26:10 +01:00

64 lines
1.7 KiB
Python

from time import sleep
import pytest
import requests
from fastapi import FastAPI
from pyinfra.utils.opentelemetry import get_exporter, setup_trace, instrument_pika, instrument_app
from pyinfra.webserver.utils import create_webserver_thread_from_settings
@pytest.fixture(scope="class")
def app_with_tracing(settings):
app = FastAPI()
@app.get("/test")
def test():
return {"flat": "earth"}
instrument_app(app)
thread = create_webserver_thread_from_settings(app, settings)
thread.start()
sleep(1)
yield
thread.join(timeout=1)
@pytest.fixture(scope="session")
def exporter(settings):
settings.tracing.opentelemetry.exporter = "json"
return get_exporter(settings)
class TestOpenTelemetry:
def test_queue_messages_are_traced(self, queue_manager, input_message, stop_message, settings, exporter):
setup_trace(settings, exporter=exporter)
instrument_pika()
queue_manager.purge_queues()
queue_manager.publish_message_to_input_queue(input_message)
queue_manager.publish_message_to_input_queue(stop_message)
def callback(_):
sleep(2)
return {"flat": "earth"}
queue_manager.start_consuming(callback)
for exported_trace in exporter.traces:
assert (
exported_trace["resource"]["attributes"]["service.name"] == settings.tracing.opentelemetry.service_name
)
def test_webserver_requests_are_traced(self, settings, app_with_tracing, exporter):
settings.tracing.opentelemetry.exporter = "json"
setup_trace(settings, exporter=exporter)
requests.get(f"http://{settings.webserver.host}:{settings.webserver.port}/test")
print(exporter.traces)