pyinfra/tests/unit_test/opentelemetry_test.py
2024-06-12 10:41:52 +02:00

52 lines
1.5 KiB
Python

from time import sleep
import pytest
from pyinfra.utils.opentelemetry import get_exporter, instrument_pika, setup_trace
@pytest.fixture(scope="session")
def exporter(settings):
settings.tracing.opentelemetry.exporter = "json"
return get_exporter(settings)
class TestOpenTelemetry:
def test_queue_messages_are_traced(self, queue_manager, input_message, stop_message, settings, exporter):
setup_trace(settings, exporter=exporter)
instrument_pika()
queue_manager.purge_queues()
queue_manager.publish_message_to_input_queue(input_message)
queue_manager.publish_message_to_input_queue(stop_message)
def callback(_):
sleep(2)
return {"flat": "earth"}
queue_manager.start_consuming(callback)
for exported_trace in exporter.traces:
assert (
exported_trace["resource"]["attributes"]["service.name"] == settings.tracing.opentelemetry.service_name
)
# def test_webserver_requests_are_traced(self, settings):
# settings.tracing.opentelemetry.exporter = "console"
# settings.tracing.enabled = True
#
# app = FastAPI()
#
# @app.get("/test")
# def test():
# return {"test": "test"}
#
# thread = create_webserver_thread_from_settings(app, settings)
# thread.start()
# sleep(1)
#
# requests.get(f"http://{settings.webserver.host}:{settings.webserver.port}/test")
#
# thread.join(timeout=1)