Adjust logs
This commit is contained in:
parent
429a85b609
commit
be602d8411
@ -28,6 +28,7 @@ def load_settings(settings_path: Union[str, Path] = None, validators: list[Valid
|
||||
)
|
||||
|
||||
validate_settings(settings, validators)
|
||||
logger.info("Settings loaded and validated.")
|
||||
|
||||
return settings
|
||||
|
||||
|
||||
@ -1,7 +1,8 @@
|
||||
from dynaconf import Dynaconf
|
||||
from fastapi import FastAPI
|
||||
from kn_utils.logging import logger
|
||||
|
||||
from pyinfra.queue.callback import make_queue_message_callback, DataProcessor
|
||||
from pyinfra.queue.callback import make_download_process_upload_callback, DataProcessor
|
||||
from pyinfra.queue.manager import QueueManager
|
||||
from pyinfra.webserver.prometheus import add_prometheus_endpoint, \
|
||||
make_prometheus_processing_time_decorator_from_settings
|
||||
@ -19,6 +20,8 @@ def start_queue_consumer_with_prometheus_and_health_endpoints(process_fn: DataPr
|
||||
|
||||
Adapt as needed.
|
||||
"""
|
||||
logger.info(f"Starting webserver and queue consumer...")
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
app = add_prometheus_endpoint(app)
|
||||
@ -31,5 +34,5 @@ def start_queue_consumer_with_prometheus_and_health_endpoints(process_fn: DataPr
|
||||
webserver_thread = create_webserver_thread_from_settings(app, settings)
|
||||
webserver_thread.start()
|
||||
|
||||
callback = make_queue_message_callback(process_fn, settings)
|
||||
callback = make_download_process_upload_callback(process_fn, settings)
|
||||
queue_manager.start_consuming(callback)
|
||||
|
||||
@ -9,7 +9,7 @@ from pyinfra.storage.utils import download_data_as_specified_in_message, upload_
|
||||
DataProcessor = Callable[[Union[dict, bytes], dict], dict]
|
||||
|
||||
|
||||
def make_queue_message_callback(data_processor: DataProcessor, settings: Dynaconf):
|
||||
def make_download_process_upload_callback(data_processor: DataProcessor, settings: Dynaconf):
|
||||
"""Default callback for processing queue messages.
|
||||
Data will be downloaded from the storage as specified in the message. If a tenant id is specified, the storage
|
||||
will be configured to use that tenant id, otherwise the storage is configured as specified in the settings.
|
||||
@ -21,7 +21,7 @@ def make_queue_message_callback(data_processor: DataProcessor, settings: Dynacon
|
||||
"""
|
||||
|
||||
def inner(queue_message_payload: dict) -> dict:
|
||||
logger.info(f"Processing payload...")
|
||||
logger.info(f"Processing payload with download-process-upload callback...")
|
||||
|
||||
storage = get_storage(settings, queue_message_payload.get("X-TENANT-ID"))
|
||||
|
||||
|
||||
@ -135,10 +135,14 @@ class QueueManager:
|
||||
|
||||
def _make_on_message_callback(self, message_processor: MessageProcessor):
|
||||
def process_message_body_and_await_result(unpacked_message_body):
|
||||
# Processing the message in a separate thread is necessary for the main thread pika client to be able to
|
||||
# process data events (e.g. heartbeats) while the message is being processed.
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_pool_executor:
|
||||
logger.debug("Processing payload in separate thread.")
|
||||
logger.info("Processing payload in separate thread.")
|
||||
future = thread_pool_executor.submit(message_processor, unpacked_message_body)
|
||||
|
||||
# FIXME: This block is probably not necessary, but kept since the implications of removing it are
|
||||
# unclear. Remove it in a future iteration where less changes are being made to the code base.
|
||||
while future.running():
|
||||
logger.debug("Waiting for payload processing to finish...")
|
||||
self.connection.process_data_events()
|
||||
|
||||
@ -20,6 +20,8 @@ def get_storage(settings: Dynaconf, tenant_id: str = None) -> Storage:
|
||||
In the future, when the default storage from config is no longer needed (only multi-tenant storage will be used),
|
||||
get_storage_from_tenant_id can replace this function directly.
|
||||
"""
|
||||
logger.info("Establishing storage connection...")
|
||||
|
||||
if tenant_id:
|
||||
logger.info(f"Using tenant storage for {tenant_id}.")
|
||||
return get_storage_from_tenant_id(tenant_id, settings)
|
||||
|
||||
@ -67,6 +67,7 @@ def download_data_as_specified_in_message(storage: Storage, raw_payload: dict) -
|
||||
|
||||
data = gzip.decompress(data) if ".gz" in payload.targetFilePath else data
|
||||
data = json.loads(data.decode("utf-8")) if ".json" in payload.targetFilePath else data
|
||||
logger.info(f"Downloaded {payload.targetFilePath} from storage.")
|
||||
|
||||
return data
|
||||
|
||||
|
||||
@ -20,8 +20,7 @@ def parse_args():
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def processor_mock(_data: dict, message: dict) -> dict:
|
||||
logger.info(f"Received message for tenant {message.get('X-TENANT-ID')}")
|
||||
def processor_mock(_data: dict, _message: dict) -> dict:
|
||||
time.sleep(5)
|
||||
return {"result1": "result1"}
|
||||
|
||||
|
||||
@ -3,8 +3,6 @@ import pytest
|
||||
from pyinfra.config.loader import load_settings, pyinfra_config_path
|
||||
from pyinfra.storage.connection import get_storage_from_settings
|
||||
|
||||
pytest_plugins = ["docker_compose"]
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def settings():
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user