From c18475a77d71303c866a9df826703c74f48edecc Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Wed, 24 Jan 2024 17:46:54 +0100 Subject: [PATCH] feat(opentelemetry): improve readability --- pyinfra/examples.py | 12 ++++++++---- pyinfra/queue/manager.py | 3 +-- scripts/start_pyinfra.py | 9 ++++++--- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/pyinfra/examples.py b/pyinfra/examples.py index ab00c4f..21a82a7 100644 --- a/pyinfra/examples.py +++ b/pyinfra/examples.py @@ -5,7 +5,7 @@ from kn_utils.logging import logger from pyinfra.config.loader import validate_settings, get_all_validators from pyinfra.queue.callback import make_download_process_upload_callback, DataProcessor from pyinfra.queue.manager import QueueManager -from pyinfra.utils.opentelemetry import setup_trace, instrument_pika, instrument_app +from pyinfra.utils.opentelemetry import setup_trace, instrument_pika from pyinfra.webserver.prometheus import ( add_prometheus_endpoint, make_prometheus_processing_time_decorator_from_settings, @@ -13,11 +13,15 @@ from pyinfra.webserver.prometheus import ( from pyinfra.webserver.utils import add_health_check_endpoint, create_webserver_thread_from_settings -def start_queue_consumer_with_prometheus_and_health_endpoints(process_fn: DataProcessor, settings: Dynaconf): +def start_standard_queue_consumer( + process_fn: DataProcessor, + settings: Dynaconf, + app: FastAPI = None, +): """Default serving logic for research services. Supplies /health, /ready and /prometheus endpoints (if enabled). The process_fn is monitored for processing time per - call. Also traces the queue messages (if enabled). + call. Also traces the queue messages via openTelemetry (if enabled). Workload is only received via queue messages. The message contains a file path to the data to be processed, which gets downloaded from the storage. The data and the message are then passed to the process_fn. The process_fn should return a json serializable object. This object is then uploaded to the storage. The response message is just the @@ -29,7 +33,7 @@ def start_queue_consumer_with_prometheus_and_health_endpoints(process_fn: DataPr logger.info(f"Starting webserver and queue consumer...") - app = FastAPI() + app = app or FastAPI() queue_manager = QueueManager(settings) diff --git a/pyinfra/queue/manager.py b/pyinfra/queue/manager.py index e284438..b71f0bd 100644 --- a/pyinfra/queue/manager.py +++ b/pyinfra/queue/manager.py @@ -141,11 +141,10 @@ class QueueManager: logger.info("Processing payload in separate thread.") future = thread_pool_executor.submit(message_processor, unpacked_message_body) - # FIXME: This block is probably not necessary, but kept since the implications of removing it are + # TODO: This block is probably not necessary, but kept since the implications of removing it are # unclear. Remove it in a future iteration where less changes are being made to the code base. while future.running(): logger.debug("Waiting for payload processing to finish...") - self.connection.process_data_events() self.connection.sleep(self.connection_sleep) return future.result() diff --git a/scripts/start_pyinfra.py b/scripts/start_pyinfra.py index 4cda348..69fe2d9 100644 --- a/scripts/start_pyinfra.py +++ b/scripts/start_pyinfra.py @@ -1,7 +1,9 @@ import time +from dynaconf import Dynaconf + from pyinfra.config.loader import load_settings, parse_args -from pyinfra.examples import start_queue_consumer_with_prometheus_and_health_endpoints +from pyinfra.examples import start_standard_queue_consumer def processor_mock(_data: dict, _message: dict) -> dict: @@ -10,5 +12,6 @@ def processor_mock(_data: dict, _message: dict) -> dict: if __name__ == "__main__": - settings = load_settings(parse_args().settings_path) - start_queue_consumer_with_prometheus_and_health_endpoints(processor_mock, settings) + arguments = parse_args() + settings = load_settings(arguments.settings_path) + start_standard_queue_consumer(processor_mock, settings)