diff --git a/pyinfra/config/loader.py b/pyinfra/config/loader.py index a705927..56658f9 100644 --- a/pyinfra/config/loader.py +++ b/pyinfra/config/loader.py @@ -2,13 +2,13 @@ import os from pathlib import Path from typing import Union -import funcy from dynaconf import Dynaconf, ValidationError -from funcy import merge, lflatten +from funcy import lflatten from kn_utils.logging import logger def load_settings(settings_path: Union[str, Path] = None): + if not settings_path: repo_root_path = Path(__file__).resolve().parents[2] settings_path = repo_root_path / "config/" diff --git a/pyinfra/examples.py b/pyinfra/examples.py new file mode 100644 index 0000000..9297101 --- /dev/null +++ b/pyinfra/examples.py @@ -0,0 +1,35 @@ +from dynaconf import Dynaconf +from fastapi import FastAPI + +from pyinfra.queue.callback import make_queue_message_callback, DataProcessor +from pyinfra.queue.manager import QueueManager +from pyinfra.webserver.prometheus import add_prometheus_endpoint, \ + make_prometheus_processing_time_decorator_from_settings +from pyinfra.webserver.utils import add_health_check_endpoint, create_webserver_thread_from_settings + + +def start_queue_consumer_with_prometheus_and_health_endpoints(process_fn: DataProcessor, settings: Dynaconf): + """Default serving logic for research services. + + Supplies /health, /ready and /prometheus endpoints. The process_fn is monitored for processing time per call. + Workload is only received via queue messages. The message contains a file path to the data to be processed, which + gets downloaded from the storage. The data and the message are then passed to the process_fn. The process_fn should + return a json-dump-able object. This object is then uploaded to the storage. The response message is just the + original message. + + Adapt as needed. + """ + app = FastAPI() + + app = add_prometheus_endpoint(app) + process_fn = make_prometheus_processing_time_decorator_from_settings(settings)(process_fn) + + queue_manager = QueueManager(settings) + + app = add_health_check_endpoint(app, queue_manager.is_ready) + + webserver_thread = create_webserver_thread_from_settings(app, settings) + webserver_thread.start() + + callback = make_queue_message_callback(process_fn, settings) + queue_manager.start_consuming(callback) diff --git a/scripts/start_pyinfra.py b/scripts/start_pyinfra.py index fa2d5de..29874ea 100644 --- a/scripts/start_pyinfra.py +++ b/scripts/start_pyinfra.py @@ -1,17 +1,21 @@ +import argparse import time from pathlib import Path -from typing import Union - -from fastapi import FastAPI from pyinfra.config.loader import load_settings -from pyinfra.queue.callback import make_queue_message_callback -from pyinfra.queue.manager import QueueManager -from pyinfra.webserver.prometheus import ( - make_prometheus_processing_time_decorator_from_settings, - add_prometheus_endpoint, -) -from pyinfra.webserver.utils import create_webserver_thread_from_settings, add_health_check_endpoint +from pyinfra.examples import start_queue_consumer_with_prometheus_and_health_endpoints + + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--settings_path", + "-s", + type=Path, + default=None, + help="Path to settings file or folder. Must be a .toml file or a folder containing .toml files.", + ) + return parser.parse_args() def processor_mock(_data: dict, _message: dict) -> dict: @@ -19,22 +23,6 @@ def processor_mock(_data: dict, _message: dict) -> dict: return {"result1": "result1"} -def start_serving(process_fn, settings_path: Union[str, Path] = None): - settings = load_settings(settings_path) - app = FastAPI() - app = add_prometheus_endpoint(app) - process_fn = make_prometheus_processing_time_decorator_from_settings(settings)(process_fn) - - queue_manager = QueueManager(settings) - - app = add_health_check_endpoint(app, queue_manager.is_ready) - - webserver_thread = create_webserver_thread_from_settings(app, settings) - webserver_thread.start() - - callback = make_queue_message_callback(process_fn, settings) - queue_manager.start_consuming(callback) - - if __name__ == "__main__": - start_serving(processor_mock) + settings = load_settings(parse_args().settings_path) + start_queue_consumer_with_prometheus_and_health_endpoints(processor_mock, settings)