39 lines
1.7 KiB
Python
39 lines
1.7 KiB
Python
from dynaconf import Dynaconf
|
|
from fastapi import FastAPI
|
|
from kn_utils.logging import logger
|
|
|
|
from pyinfra.queue.callback import make_download_process_upload_callback, DataProcessor
|
|
from pyinfra.queue.manager import QueueManager
|
|
from pyinfra.webserver.prometheus import add_prometheus_endpoint, \
|
|
make_prometheus_processing_time_decorator_from_settings
|
|
from pyinfra.webserver.utils import add_health_check_endpoint, create_webserver_thread_from_settings
|
|
|
|
|
|
def start_queue_consumer_with_prometheus_and_health_endpoints(process_fn: DataProcessor, settings: Dynaconf):
|
|
"""Default serving logic for research services.
|
|
|
|
Supplies /health, /ready and /prometheus endpoints. The process_fn is monitored for processing time per call.
|
|
Workload is only received via queue messages. The message contains a file path to the data to be processed, which
|
|
gets downloaded from the storage. The data and the message are then passed to the process_fn. The process_fn should
|
|
return a json serializable object. This object is then uploaded to the storage. The response message is just the
|
|
original message.
|
|
|
|
Adapt as needed.
|
|
"""
|
|
logger.info(f"Starting webserver and queue consumer...")
|
|
|
|
app = FastAPI()
|
|
|
|
app = add_prometheus_endpoint(app)
|
|
process_fn = make_prometheus_processing_time_decorator_from_settings(settings)(process_fn)
|
|
|
|
queue_manager = QueueManager(settings)
|
|
|
|
app = add_health_check_endpoint(app, queue_manager.is_ready)
|
|
|
|
webserver_thread = create_webserver_thread_from_settings(app, settings)
|
|
webserver_thread.start()
|
|
|
|
callback = make_download_process_upload_callback(process_fn, settings)
|
|
queue_manager.start_consuming(callback)
|