import gzip import json import logging from typing import Callable from pyinfra.config import get_config from pyinfra.queue.queue_manager import QueueManager from pyinfra.storage import get_storage logging.basicConfig() logger = logging.getLogger() logger.setLevel(logging.INFO) def make_callback(processor: Callable, config=get_config()): bucket = config.storage_bucket storage = get_storage(config) def callback(request_message): dossier_id = request_message["dossierId"] file_id = request_message["fileId"] logger.info(f"Processing {dossier_id=} {file_id=} ...") target_file_name = f"{dossier_id}/{file_id}.{request_message['targetFileExtension']}" response_file_name = f"{dossier_id}/{file_id}.{request_message['responseFileExtension']}" if not storage.exists(bucket, target_file_name): logger.warning(f"{target_file_name=} not present in {bucket=}, cancelling processing...") return None object_bytes = storage.get_object(bucket, target_file_name) object_bytes = gzip.decompress(object_bytes) result_body = list(processor(object_bytes)) result = {**request_message, "data": result_body} storage_bytes = gzip.compress(json.dumps(result).encode("utf-8")) storage.put_object(bucket, response_file_name, storage_bytes) return {"dossierId": dossier_id, "fileId": file_id} return callback def process(body): return [{"response_key": "response_value"}] def main(): logger.info("Start consuming...") queue_manager = QueueManager(get_config()) queue_manager.start_consuming(make_callback(process)) if __name__ == "__main__": main()