From 2b5c4f1e45001dbf6fcf8bfdefdd4d37a6dc4ba8 Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Tue, 23 Aug 2022 10:43:34 +0200 Subject: [PATCH] Pull request #28: queue callback: add storage lookup for input file, add should_publish flag to signal processing success to queue manager Merge in RR/cv-analysis from RED-5009-extend-callback to master Squashed commit of the following: commit aa9871e11ca56c721024d702351a3b2d51d4c69d Author: Julius Unverfehrt Date: Tue Aug 23 10:31:03 2022 +0200 queue callback: add storage lookup for input file, add should_publish flag to signal processing success to queue manager --- incl/pyinfra | 2 +- src/serve.py | 25 +++++++++++++++++-------- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/incl/pyinfra b/incl/pyinfra index 0f6512d..71ad2af 160000 --- a/incl/pyinfra +++ b/incl/pyinfra @@ -1 +1 @@ -Subproject commit 0f6512df5423df98d334f5735170cd1f7642998a +Subproject commit 71ad2af4eb278a3718ad5385b06f07faa9059e9f diff --git a/src/serve.py b/src/serve.py index 69aeef1..8fd2fc9 100644 --- a/src/serve.py +++ b/src/serve.py @@ -19,19 +19,28 @@ def analysis_callback(queue_message: dict): dossier_id, file_id, target_file_ext, response_file_ext, operation = itemgetter( "dossierId", "fileId", "targetFileExtension", "responseFileExtension", "operation" )(queue_message) + bucket = PYINFRA_CONFIG.storage_bucket logging.info(f"Processing {dossier_id=}/{file_id=}, {operation=}.") + storage = get_storage(PYINFRA_CONFIG) object_name = f"{dossier_id}/{file_id}.{target_file_ext}" - object_bytes = gzip.decompress(storage.get_object(PYINFRA_CONFIG.storage_bucket, object_name)) - analysis_fn = get_analysis_pipeline(operation) - results = analysis_fn(object_bytes) - response = {**queue_message, "data": list(results)} - response = gzip.compress(json.dumps(response).encode()) - response_name = f"{dossier_id}/{file_id}.{response_file_ext}" + if storage.exists(bucket, object_name): + should_publish_result = True - storage.put_object(PYINFRA_CONFIG.storage_bucket, response_name, response) - return {"dossierId": dossier_id, "fileId": file_id} + object_bytes = gzip.decompress(storage.get_object(bucket, object_name)) + analysis_fn = get_analysis_pipeline(operation) + + results = analysis_fn(object_bytes) + response = {**queue_message, "data": list(results)} + response = gzip.compress(json.dumps(response).encode()) + response_name = f"{dossier_id}/{file_id}.{response_file_ext}" + + storage.put_object(bucket, response_name, response) + else: + should_publish_result = False + + return should_publish_result, {"dossierId": dossier_id, "fileId": file_id} if __name__ == "__main__":