From 87122ffb965c016383bfd49e2eaaa6ab3b5d7101 Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Tue, 28 Mar 2023 15:50:29 +0200 Subject: [PATCH] Update pyinfra for multi-tenancy support Update serve script with PayloadProcessor from pyinfra --- incl/pyinfra | 2 +- src/serve.py | 66 ++++++++++++++-------------------------------------- 2 files changed, 19 insertions(+), 49 deletions(-) diff --git a/incl/pyinfra b/incl/pyinfra index 0f24a7f..793a427 160000 --- a/incl/pyinfra +++ b/incl/pyinfra @@ -1 +1 @@ -Subproject commit 0f24a7f26da3ce8ce326bf02b2d1946b6483be11 +Subproject commit 793a427c50d150523856c29b79a0000d7cde88ed diff --git a/src/serve.py b/src/serve.py index da461bc..bbeaeb3 100644 --- a/src/serve.py +++ b/src/serve.py @@ -1,67 +1,37 @@ -import gzip -import json import logging -from operator import itemgetter - -from funcy import compose from cv_analysis.config import get_config from cv_analysis.server.pipeline import get_analysis_pipeline -from cv_analysis.utils.banner import make_art from pyinfra import config as pyinfra_config -from pyinfra.payload_processing.monitor import get_monitor +from pyinfra.payload_processing import make_payload_processor from pyinfra.queue.queue_manager import QueueManager -from pyinfra.storage.storage import get_storage PYINFRA_CONFIG = pyinfra_config.get_config() CV_CONFIG = get_config() -logging.basicConfig(level=PYINFRA_CONFIG.logging_level_root) +logger = logging.getLogger() +logger.setLevel(PYINFRA_CONFIG.logging_level_root) -# TODO: add kwargs/ operation key passing to processing fn in pyinfra PayloadProcessor be able to use it here. -MONITOR = get_monitor(PYINFRA_CONFIG) +def make_dispatched_data_analysis(config): + skip_pages_without_images = config.table_parsing_skip_pages_without_images + + def inner(data: bytes, operation) -> list: + analyse = get_analysis_pipeline(operation, skip_pages_without_images) + return list(analyse(data)) + + return inner -def analysis_callback(queue_message: dict): +def main(): + logger.info("Application is starting") - dossier_id, file_id, target_file_ext, response_file_ext, operation = itemgetter( - "dossierId", "fileId", "targetFileExtension", "responseFileExtension", "operation" - )(queue_message) - bucket = PYINFRA_CONFIG.storage_bucket - logging.info("running operation %s file_id=%s and dossier_id=%s", operation, file_id, dossier_id) + process_data = make_dispatched_data_analysis(config=CV_CONFIG) + process_payload = make_payload_processor(process_data, config=PYINFRA_CONFIG) - storage = get_storage(PYINFRA_CONFIG) - object_name = f"{dossier_id}/{file_id}.{target_file_ext}" - - if storage.exists(bucket, object_name): - object_bytes = gzip.decompress(storage.get_object(bucket, object_name)) - analysis_fn = MONITOR( - compose( - list, - get_analysis_pipeline(operation, CV_CONFIG.table_parsing_skip_pages_without_images), - ) - ) - - results = list(analysis_fn(object_bytes)) - logging.info("predictions ready for file_id=%s and dossier_id=%s", file_id, dossier_id) - - response = {**queue_message, "data": results} - response = gzip.compress(json.dumps(response).encode()) - response_name = f"{dossier_id}/{file_id}.{response_file_ext}" - - logging.info("storing predictions for file_id=%s and dossier_id=%s", file_id, dossier_id) - storage.put_object(bucket, response_name, response) - - return {"dossierId": dossier_id, "fileId": file_id} - - else: - return None + queue_manager = QueueManager(PYINFRA_CONFIG) + queue_manager.start_consuming(process_payload) if __name__ == "__main__": - - logging.info(make_art()) - - queue_manager = QueueManager(PYINFRA_CONFIG) - queue_manager.start_consuming(analysis_callback) + main()