43 lines
1.6 KiB
Python
43 lines
1.6 KiB
Python
from typing import Callable
|
|
|
|
from dynaconf import Dynaconf
|
|
from kn_utils.logging import logger
|
|
|
|
from pyinfra.storage.connection import get_storage
|
|
from pyinfra.storage.utils import (
|
|
download_data_bytes_as_specified_in_message,
|
|
upload_data_as_specified_in_message,
|
|
DownloadedData,
|
|
)
|
|
|
|
DataProcessor = Callable[[dict[str, DownloadedData] | DownloadedData, dict], dict | list | str]
|
|
Callback = Callable[[dict], dict]
|
|
|
|
|
|
def make_download_process_upload_callback(data_processor: DataProcessor, settings: Dynaconf) -> Callback:
|
|
"""Default callback for processing queue messages.
|
|
|
|
Data will be downloaded from the storage as specified in the message. If a tenant id is specified, the storage
|
|
will be configured to use that tenant id, otherwise the storage is configured as specified in the settings.
|
|
The data is the passed to the dataprocessor, together with the message. The dataprocessor should return a
|
|
json serializable object. This object is then uploaded to the storage as specified in the message. The response
|
|
message is just the original message.
|
|
"""
|
|
|
|
def inner(queue_message_payload: dict) -> dict:
|
|
logger.info(f"Processing payload with download-process-upload callback...")
|
|
|
|
storage = get_storage(settings, queue_message_payload.get("X-TENANT-ID"))
|
|
|
|
data: dict[str, DownloadedData] | DownloadedData = download_data_bytes_as_specified_in_message(
|
|
storage, queue_message_payload
|
|
)
|
|
|
|
result = data_processor(data, queue_message_payload)
|
|
|
|
upload_data_as_specified_in_message(storage, queue_message_payload, result)
|
|
|
|
return queue_message_payload
|
|
|
|
return inner
|