Pull request #24: queue callback: add storage lookup for input file, add should_publish flag to signal processing success to queue manager

Merge in RR/image-prediction from RED-5009-extend-callback to master

Squashed commit of the following:

commit 5ed02af09812783c46c2fb47832fe3a02344aa03
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Tue Aug 23 10:56:37 2022 +0200

    queue callback: add storage lookup for input file, add should_publish flag to signal processing success to queue manager
This commit is contained in:
Julius Unverfehrt 2022-08-23 12:47:49 +02:00
parent d13b8436e2
commit d1190f7efe
2 changed files with 25 additions and 23 deletions

@ -1 +1 @@
Subproject commit be82114f8302ffedecf950c6ca9fecf01ece5573
Subproject commit 71ad2af4eb278a3718ad5385b06f07faa9059e9f

View File

@ -20,36 +20,38 @@ logger.setLevel(PYINFRA_CONFIG.logging_level_root)
def process_request(request_message):
pipeline = load_pipeline(verbose=IMAGE_CONFIG.service.verbose, batch_size=IMAGE_CONFIG.service.batch_size)
target_file_extension = request_message["targetFileExtension"]
dossier_id = request_message["dossierId"]
file_id = request_message["fileId"]
target_file_name = f"{dossier_id}/{file_id}.{request_message['targetFileExtension']}"
response_file_name = f"{dossier_id}/{file_id}.{request_message['responseFileExtension']}"
figure_data_file_name = f"{dossier_id}/{file_id}.FIGURE.json.gz"
bucket = PYINFRA_CONFIG.storage_bucket
storage = get_storage(PYINFRA_CONFIG)
object_bytes = storage.get_object(PYINFRA_CONFIG.storage_bucket, f"{dossier_id}/{file_id}.{target_file_extension}")
object_bytes = gzip.decompress(object_bytes)
pipeline = load_pipeline(verbose=IMAGE_CONFIG.service.verbose, batch_size=IMAGE_CONFIG.service.batch_size)
if storage.exists(PYINFRA_CONFIG.storage_bucket, f"{dossier_id}/{file_id}.FIGURE.json.gz"):
metadata_bytes = storage.get_object(PYINFRA_CONFIG.storage_bucket, f"{dossier_id}/{file_id}.FIGURE.json.gz")
metadata_bytes = gzip.decompress(metadata_bytes)
metadata_per_image = json.load(io.BytesIO(metadata_bytes))["data"]
classifications_cv = list(pipeline(pdf=object_bytes, metadata_per_image=metadata_per_image))
if storage.exists(bucket, target_file_name):
should_publish_result = True
object_bytes = storage.get_object(bucket, target_file_name)
object_bytes = gzip.decompress(object_bytes)
classifications = list(pipeline(pdf=object_bytes))
if storage.exists(bucket, figure_data_file_name):
metadata_bytes = storage.get_object(bucket, figure_data_file_name)
metadata_bytes = gzip.decompress(metadata_bytes)
metadata_per_image = json.load(io.BytesIO(metadata_bytes))["data"]
classifications_cv = list(pipeline(pdf=object_bytes, metadata_per_image=metadata_per_image))
else:
classifications_cv = []
result = {**request_message, "data": classifications, "dataCV": classifications_cv}
storage_bytes = gzip.compress(json.dumps(result).encode("utf-8"))
storage.put_object(bucket, response_file_name, storage_bytes)
else:
classifications_cv = []
should_publish_result = False
classifications = list(pipeline(pdf=object_bytes))
result = {**request_message, "data": classifications, "dataCV": classifications_cv}
response_file_extension = request_message["responseFileExtension"]
storage_bytes = gzip.compress(json.dumps(result).encode("utf-8"))
storage.put_object(
PYINFRA_CONFIG.storage_bucket, f"{dossier_id}/{file_id}.{response_file_extension}", storage_bytes
)
return {"dossierId": dossier_id, "fileId": file_id}
return should_publish_result, {"dossierId": dossier_id, "fileId": file_id}
def main():