Pull request #28: queue callback: add storage lookup for input file, add should_publish flag to signal processing success to queue manager

Merge in RR/cv-analysis from RED-5009-extend-callback to master

Squashed commit of the following:

commit aa9871e11ca56c721024d702351a3b2d51d4c69d
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Tue Aug 23 10:31:03 2022 +0200

    queue callback: add storage lookup for input file, add should_publish flag to signal processing success to queue manager
This commit is contained in:
Julius Unverfehrt 2022-08-23 10:43:34 +02:00
parent ce3b460217
commit 2b5c4f1e45
2 changed files with 18 additions and 9 deletions

@ -1 +1 @@
Subproject commit 0f6512df5423df98d334f5735170cd1f7642998a
Subproject commit 71ad2af4eb278a3718ad5385b06f07faa9059e9f

View File

@ -19,19 +19,28 @@ def analysis_callback(queue_message: dict):
dossier_id, file_id, target_file_ext, response_file_ext, operation = itemgetter(
"dossierId", "fileId", "targetFileExtension", "responseFileExtension", "operation"
)(queue_message)
bucket = PYINFRA_CONFIG.storage_bucket
logging.info(f"Processing {dossier_id=}/{file_id=}, {operation=}.")
storage = get_storage(PYINFRA_CONFIG)
object_name = f"{dossier_id}/{file_id}.{target_file_ext}"
object_bytes = gzip.decompress(storage.get_object(PYINFRA_CONFIG.storage_bucket, object_name))
analysis_fn = get_analysis_pipeline(operation)
results = analysis_fn(object_bytes)
response = {**queue_message, "data": list(results)}
response = gzip.compress(json.dumps(response).encode())
response_name = f"{dossier_id}/{file_id}.{response_file_ext}"
if storage.exists(bucket, object_name):
should_publish_result = True
storage.put_object(PYINFRA_CONFIG.storage_bucket, response_name, response)
return {"dossierId": dossier_id, "fileId": file_id}
object_bytes = gzip.decompress(storage.get_object(bucket, object_name))
analysis_fn = get_analysis_pipeline(operation)
results = analysis_fn(object_bytes)
response = {**queue_message, "data": list(results)}
response = gzip.compress(json.dumps(response).encode())
response_name = f"{dossier_id}/{file_id}.{response_file_ext}"
storage.put_object(bucket, response_name, response)
else:
should_publish_result = False
return should_publish_result, {"dossierId": dossier_id, "fileId": file_id}
if __name__ == "__main__":