import time from pathlib import Path from typing import Union from fastapi import FastAPI from pyinfra.config.loader import load_settings from pyinfra.queue.callback import make_queue_message_callback from pyinfra.queue.manager import QueueManager from pyinfra.webserver.prometheus import ( make_prometheus_processing_time_decorator_from_settings, add_prometheus_endpoint, ) from pyinfra.webserver.utils import create_webserver_thread_from_settings, add_health_check_endpoint def processor_mock(_data: dict, _message: dict) -> dict: time.sleep(5) return {"result1": "result1"} def start_serving(process_fn, settings_path: Union[str, Path] = None): settings = load_settings(settings_path) app = FastAPI() app = add_prometheus_endpoint(app) process_fn = make_prometheus_processing_time_decorator_from_settings(settings)(process_fn) queue_manager = QueueManager(settings) app = add_health_check_endpoint(app, queue_manager.is_ready) webserver_thread = create_webserver_thread_from_settings(app, settings) webserver_thread.start() callback = make_queue_message_callback(process_fn, settings) queue_manager.start_consuming(callback) if __name__ == "__main__": start_serving(processor_mock)