Add serving example

TODO: - update readme
      - check if logs are adequate
This commit is contained in:
Julius Unverfehrt 2024-01-19 14:53:02 +01:00
parent 8cd1d6b283
commit 73eba97ede
3 changed files with 53 additions and 30 deletions

View File

@ -2,13 +2,13 @@ import os
from pathlib import Path from pathlib import Path
from typing import Union from typing import Union
import funcy
from dynaconf import Dynaconf, ValidationError from dynaconf import Dynaconf, ValidationError
from funcy import merge, lflatten from funcy import lflatten
from kn_utils.logging import logger from kn_utils.logging import logger
def load_settings(settings_path: Union[str, Path] = None): def load_settings(settings_path: Union[str, Path] = None):
if not settings_path: if not settings_path:
repo_root_path = Path(__file__).resolve().parents[2] repo_root_path = Path(__file__).resolve().parents[2]
settings_path = repo_root_path / "config/" settings_path = repo_root_path / "config/"

35
pyinfra/examples.py Normal file
View File

@ -0,0 +1,35 @@
from dynaconf import Dynaconf
from fastapi import FastAPI
from pyinfra.queue.callback import make_queue_message_callback, DataProcessor
from pyinfra.queue.manager import QueueManager
from pyinfra.webserver.prometheus import add_prometheus_endpoint, \
make_prometheus_processing_time_decorator_from_settings
from pyinfra.webserver.utils import add_health_check_endpoint, create_webserver_thread_from_settings
def start_queue_consumer_with_prometheus_and_health_endpoints(process_fn: DataProcessor, settings: Dynaconf):
"""Default serving logic for research services.
Supplies /health, /ready and /prometheus endpoints. The process_fn is monitored for processing time per call.
Workload is only received via queue messages. The message contains a file path to the data to be processed, which
gets downloaded from the storage. The data and the message are then passed to the process_fn. The process_fn should
return a json-dump-able object. This object is then uploaded to the storage. The response message is just the
original message.
Adapt as needed.
"""
app = FastAPI()
app = add_prometheus_endpoint(app)
process_fn = make_prometheus_processing_time_decorator_from_settings(settings)(process_fn)
queue_manager = QueueManager(settings)
app = add_health_check_endpoint(app, queue_manager.is_ready)
webserver_thread = create_webserver_thread_from_settings(app, settings)
webserver_thread.start()
callback = make_queue_message_callback(process_fn, settings)
queue_manager.start_consuming(callback)

View File

@ -1,17 +1,21 @@
import argparse
import time import time
from pathlib import Path from pathlib import Path
from typing import Union
from fastapi import FastAPI
from pyinfra.config.loader import load_settings from pyinfra.config.loader import load_settings
from pyinfra.queue.callback import make_queue_message_callback from pyinfra.examples import start_queue_consumer_with_prometheus_and_health_endpoints
from pyinfra.queue.manager import QueueManager
from pyinfra.webserver.prometheus import (
make_prometheus_processing_time_decorator_from_settings, def parse_args():
add_prometheus_endpoint, parser = argparse.ArgumentParser()
) parser.add_argument(
from pyinfra.webserver.utils import create_webserver_thread_from_settings, add_health_check_endpoint "--settings_path",
"-s",
type=Path,
default=None,
help="Path to settings file or folder. Must be a .toml file or a folder containing .toml files.",
)
return parser.parse_args()
def processor_mock(_data: dict, _message: dict) -> dict: def processor_mock(_data: dict, _message: dict) -> dict:
@ -19,22 +23,6 @@ def processor_mock(_data: dict, _message: dict) -> dict:
return {"result1": "result1"} return {"result1": "result1"}
def start_serving(process_fn, settings_path: Union[str, Path] = None):
settings = load_settings(settings_path)
app = FastAPI()
app = add_prometheus_endpoint(app)
process_fn = make_prometheus_processing_time_decorator_from_settings(settings)(process_fn)
queue_manager = QueueManager(settings)
app = add_health_check_endpoint(app, queue_manager.is_ready)
webserver_thread = create_webserver_thread_from_settings(app, settings)
webserver_thread.start()
callback = make_queue_message_callback(process_fn, settings)
queue_manager.start_consuming(callback)
if __name__ == "__main__": if __name__ == "__main__":
start_serving(processor_mock) settings = load_settings(parse_args().settings_path)
start_queue_consumer_with_prometheus_and_health_endpoints(processor_mock, settings)