From be602d8411b3664eb5027eed51a3ecb001bf3ee3 Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Tue, 23 Jan 2024 14:10:56 +0100 Subject: [PATCH] Adjust logs --- pyinfra/config/loader.py | 1 + pyinfra/examples.py | 7 +++++-- pyinfra/queue/callback.py | 4 ++-- pyinfra/queue/manager.py | 6 +++++- pyinfra/storage/connection.py | 2 ++ pyinfra/storage/utils.py | 1 + scripts/start_pyinfra.py | 3 +-- tests/conftest.py | 2 -- 8 files changed, 17 insertions(+), 9 deletions(-) diff --git a/pyinfra/config/loader.py b/pyinfra/config/loader.py index 1e10c44..d89a95d 100644 --- a/pyinfra/config/loader.py +++ b/pyinfra/config/loader.py @@ -28,6 +28,7 @@ def load_settings(settings_path: Union[str, Path] = None, validators: list[Valid ) validate_settings(settings, validators) + logger.info("Settings loaded and validated.") return settings diff --git a/pyinfra/examples.py b/pyinfra/examples.py index 9297101..0bc512a 100644 --- a/pyinfra/examples.py +++ b/pyinfra/examples.py @@ -1,7 +1,8 @@ from dynaconf import Dynaconf from fastapi import FastAPI +from kn_utils.logging import logger -from pyinfra.queue.callback import make_queue_message_callback, DataProcessor +from pyinfra.queue.callback import make_download_process_upload_callback, DataProcessor from pyinfra.queue.manager import QueueManager from pyinfra.webserver.prometheus import add_prometheus_endpoint, \ make_prometheus_processing_time_decorator_from_settings @@ -19,6 +20,8 @@ def start_queue_consumer_with_prometheus_and_health_endpoints(process_fn: DataPr Adapt as needed. """ + logger.info(f"Starting webserver and queue consumer...") + app = FastAPI() app = add_prometheus_endpoint(app) @@ -31,5 +34,5 @@ def start_queue_consumer_with_prometheus_and_health_endpoints(process_fn: DataPr webserver_thread = create_webserver_thread_from_settings(app, settings) webserver_thread.start() - callback = make_queue_message_callback(process_fn, settings) + callback = make_download_process_upload_callback(process_fn, settings) queue_manager.start_consuming(callback) diff --git a/pyinfra/queue/callback.py b/pyinfra/queue/callback.py index 3fa8f22..f678f27 100644 --- a/pyinfra/queue/callback.py +++ b/pyinfra/queue/callback.py @@ -9,7 +9,7 @@ from pyinfra.storage.utils import download_data_as_specified_in_message, upload_ DataProcessor = Callable[[Union[dict, bytes], dict], dict] -def make_queue_message_callback(data_processor: DataProcessor, settings: Dynaconf): +def make_download_process_upload_callback(data_processor: DataProcessor, settings: Dynaconf): """Default callback for processing queue messages. Data will be downloaded from the storage as specified in the message. If a tenant id is specified, the storage will be configured to use that tenant id, otherwise the storage is configured as specified in the settings. @@ -21,7 +21,7 @@ def make_queue_message_callback(data_processor: DataProcessor, settings: Dynacon """ def inner(queue_message_payload: dict) -> dict: - logger.info(f"Processing payload...") + logger.info(f"Processing payload with download-process-upload callback...") storage = get_storage(settings, queue_message_payload.get("X-TENANT-ID")) diff --git a/pyinfra/queue/manager.py b/pyinfra/queue/manager.py index da49299..767d723 100644 --- a/pyinfra/queue/manager.py +++ b/pyinfra/queue/manager.py @@ -135,10 +135,14 @@ class QueueManager: def _make_on_message_callback(self, message_processor: MessageProcessor): def process_message_body_and_await_result(unpacked_message_body): + # Processing the message in a separate thread is necessary for the main thread pika client to be able to + # process data events (e.g. heartbeats) while the message is being processed. with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_pool_executor: - logger.debug("Processing payload in separate thread.") + logger.info("Processing payload in separate thread.") future = thread_pool_executor.submit(message_processor, unpacked_message_body) + # FIXME: This block is probably not necessary, but kept since the implications of removing it are + # unclear. Remove it in a future iteration where less changes are being made to the code base. while future.running(): logger.debug("Waiting for payload processing to finish...") self.connection.process_data_events() diff --git a/pyinfra/storage/connection.py b/pyinfra/storage/connection.py index abe0d5f..75d278a 100644 --- a/pyinfra/storage/connection.py +++ b/pyinfra/storage/connection.py @@ -20,6 +20,8 @@ def get_storage(settings: Dynaconf, tenant_id: str = None) -> Storage: In the future, when the default storage from config is no longer needed (only multi-tenant storage will be used), get_storage_from_tenant_id can replace this function directly. """ + logger.info("Establishing storage connection...") + if tenant_id: logger.info(f"Using tenant storage for {tenant_id}.") return get_storage_from_tenant_id(tenant_id, settings) diff --git a/pyinfra/storage/utils.py b/pyinfra/storage/utils.py index d125115..bb3598b 100644 --- a/pyinfra/storage/utils.py +++ b/pyinfra/storage/utils.py @@ -67,6 +67,7 @@ def download_data_as_specified_in_message(storage: Storage, raw_payload: dict) - data = gzip.decompress(data) if ".gz" in payload.targetFilePath else data data = json.loads(data.decode("utf-8")) if ".json" in payload.targetFilePath else data + logger.info(f"Downloaded {payload.targetFilePath} from storage.") return data diff --git a/scripts/start_pyinfra.py b/scripts/start_pyinfra.py index de57496..b89d4c3 100644 --- a/scripts/start_pyinfra.py +++ b/scripts/start_pyinfra.py @@ -20,8 +20,7 @@ def parse_args(): return parser.parse_args() -def processor_mock(_data: dict, message: dict) -> dict: - logger.info(f"Received message for tenant {message.get('X-TENANT-ID')}") +def processor_mock(_data: dict, _message: dict) -> dict: time.sleep(5) return {"result1": "result1"} diff --git a/tests/conftest.py b/tests/conftest.py index 2dd03d8..4939c83 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,8 +3,6 @@ import pytest from pyinfra.config.loader import load_settings, pyinfra_config_path from pyinfra.storage.connection import get_storage_from_settings -pytest_plugins = ["docker_compose"] - @pytest.fixture(scope="session") def settings():