pyinfra/pyinfra/examples.py
2024-09-25 11:58:51 +02:00

139 lines
5.1 KiB
Python

import asyncio
import sys
from dynaconf import Dynaconf
from fastapi import FastAPI
from kn_utils.logging import logger
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from pyinfra.config.loader import get_pyinfra_validators, validate_settings
from pyinfra.queue.async_manager import AsyncQueueManager, RabbitMQConfig
from pyinfra.queue.callback import Callback
from pyinfra.queue.manager import QueueManager
from pyinfra.utils.opentelemetry import instrument_app, instrument_pika, setup_trace
from pyinfra.webserver.prometheus import (
add_prometheus_endpoint,
make_prometheus_processing_time_decorator_from_settings,
)
from pyinfra.webserver.utils import (
add_health_check_endpoint,
create_webserver_thread_from_settings,
run_async_webserver,
)
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type((Exception,)), # You might want to be more specific here
reraise=True,
)
async def run_async_queues(manager, app, port, host):
"""Run the async webserver and the async queue manager concurrently."""
try:
await manager.run()
await run_async_webserver(app, port, host)
except asyncio.CancelledError:
logger.info("Main task is cancelled.")
except Exception as e:
logger.error(f"An error occurred while running async queues: {e}", exc_info=True)
sys.exit(1)
finally:
logger.info("Signal received, shutting down...")
await manager.shutdown()
# async def run_async_queues(manager, app, port, host):
# server = None
# try:
# await manager.run()
# server = await asyncio.start_server(app, host, port)
# await server.serve_forever()
# except Exception as e:
# logger.error(f"An error occurred while running async queues: {e}")
# finally:
# if server:
# server.close()
# await server.wait_closed()
# await manager.shutdown()
def start_standard_queue_consumer(
callback: Callback,
settings: Dynaconf,
app: FastAPI = None,
):
"""Default serving logic for research services.
Supplies /health, /ready and /prometheus endpoints (if enabled). The callback is monitored for processing time per
message. Also traces the queue messages via openTelemetry (if enabled).
Workload is received via queue messages and processed by the callback function (see pyinfra.queue.callback for
callbacks).
"""
validate_settings(settings, get_pyinfra_validators())
logger.info("Starting webserver and queue consumer...")
app = app or FastAPI()
if settings.metrics.prometheus.enabled:
logger.info("Prometheus metrics enabled.")
app = add_prometheus_endpoint(app)
callback = make_prometheus_processing_time_decorator_from_settings(settings)(callback)
if settings.tracing.enabled:
setup_trace(settings)
instrument_pika(dynamic_queues=settings.dynamic_tenant_queues.enabled)
instrument_app(app)
if settings.dynamic_tenant_queues.enabled:
logger.info("Dynamic tenant queues enabled. Running async queues.")
config = RabbitMQConfig(
host=settings.rabbitmq.host,
port=settings.rabbitmq.port,
username=settings.rabbitmq.username,
password=settings.rabbitmq.password,
heartbeat=settings.rabbitmq.heartbeat,
input_queue_prefix=settings.rabbitmq.service_request_queue_prefix,
tenant_event_queue_suffix=settings.rabbitmq.tenant_event_queue_suffix,
tenant_exchange_name=settings.rabbitmq.tenant_exchange_name,
service_request_exchange_name=settings.rabbitmq.service_request_exchange_name,
service_response_exchange_name=settings.rabbitmq.service_response_exchange_name,
service_dead_letter_queue_name=settings.rabbitmq.service_dlq_name,
queue_expiration_time=settings.rabbitmq.queue_expiration_time,
pod_name=settings.kubernetes.pod_name,
)
manager = AsyncQueueManager(
config=config,
tenant_service_url=settings.storage.tenant_server.endpoint,
message_processor=callback,
max_concurrent_tasks=(
settings.asyncio.max_concurrent_tasks if hasattr(settings.asyncio, "max_concurrent_tasks") else 10
),
)
else:
logger.info("Dynamic tenant queues disabled. Running sync queues.")
manager = QueueManager(settings)
app = add_health_check_endpoint(app, manager.is_ready)
if isinstance(manager, AsyncQueueManager):
asyncio.run(run_async_queues(manager, app, port=settings.webserver.port, host=settings.webserver.host))
elif isinstance(manager, QueueManager):
webserver = create_webserver_thread_from_settings(app, settings)
webserver.start()
try:
manager.start_consuming(callback)
except Exception as e:
logger.error(f"An error occurred while consuming messages: {e}", exc_info=True)
sys.exit(1)
else:
logger.warning(f"Behavior for type {type(manager)} is not defined")