From eef371e2a867a95da0f023af239d42af9c6fb9d8 Mon Sep 17 00:00:00 2001 From: Francisco Schulz Date: Thu, 16 Feb 2023 10:47:13 +0100 Subject: [PATCH] update serve.py to work with new pyinfra version --- src/serve.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/serve.py b/src/serve.py index 81405bd..c384aa3 100644 --- a/src/serve.py +++ b/src/serve.py @@ -22,27 +22,29 @@ def analysis_callback(queue_message: dict): "dossierId", "fileId", "targetFileExtension", "responseFileExtension", "operation" )(queue_message) bucket = PYINFRA_CONFIG.storage_bucket - logging.info(f"Processing {dossier_id=}/{file_id=}, {operation=}.") + logging.info("running operation %s file_id=%s and dossier_id=%s", operation, file_id, dossier_id) storage = get_storage(PYINFRA_CONFIG) object_name = f"{dossier_id}/{file_id}.{target_file_ext}" if storage.exists(bucket, object_name): - should_publish_result = True - object_bytes = gzip.decompress(storage.get_object(bucket, object_name)) analysis_fn = get_analysis_pipeline(operation, CV_CONFIG.table_parsing_skip_pages_without_images) results = analysis_fn(object_bytes) + logging.info("predictions ready for file_id=%s and dossier_id=%s", file_id, dossier_id) + response = {**queue_message, "data": list(results)} response = gzip.compress(json.dumps(response).encode()) response_name = f"{dossier_id}/{file_id}.{response_file_ext}" + logging.info("storing predictions for file_id=%s and dossier_id=%s", file_id, dossier_id) storage.put_object(bucket, response_name, response) - else: - should_publish_result = False + + return {"dossierId": dossier_id, "fileId": file_id} - return should_publish_result, {"dossierId": dossier_id, "fileId": file_id} + else: + return None if __name__ == "__main__":