From 030dc660e6060ae326c32fba8c2944a10866fbb6 Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Thu, 16 Mar 2023 16:25:19 +0100 Subject: [PATCH] adapt serve script to advanced pyinfra API including monitoring of the processing time of images. --- src/serve.py | 47 ++++++++--------------------------------------- 1 file changed, 8 insertions(+), 39 deletions(-) diff --git a/src/serve.py b/src/serve.py index e123213..cd2417b 100644 --- a/src/serve.py +++ b/src/serve.py @@ -1,7 +1,3 @@ -import gzip -import json -import logging - from image_prediction import logger from image_prediction.config import Config from image_prediction.locations import CONFIG_FILE @@ -9,8 +5,8 @@ from image_prediction.pipeline import load_pipeline from image_prediction.utils.banner import load_banner from image_prediction.utils.process_wrapping import wrap_in_process from pyinfra import config +from pyinfra.payload_processing import make_payload_processor from pyinfra.queue.queue_manager import QueueManager -from pyinfra.storage.storage import get_storage PYINFRA_CONFIG = config.get_config() IMAGE_CONFIG = Config(CONFIG_FILE) @@ -18,50 +14,23 @@ IMAGE_CONFIG = Config(CONFIG_FILE) logger.setLevel(PYINFRA_CONFIG.logging_level_root) -# A component of the callback (probably tensorflow) does not release allocated memory (see RED-4206). +# A component of the processing pipeline (probably tensorflow) does not release allocated memory (see RED-4206). # See: https://stackoverflow.com/questions/39758094/clearing-tensorflow-gpu-memory-after-model-execution -# Workaround: Manage Memory with the operating system, by wrapping the callback in a sub-process. +# Workaround: Manage Memory with the operating system, by wrapping the processing in a sub-process. # FIXME: Find more fine-grained solution or if the problem occurs persistently for python services, -# FIXME: move the process wrapper to a general module (see RED-4929). @wrap_in_process -def process_request(request_message): - dossier_id = request_message["dossierId"] - file_id = request_message["fileId"] - target_file_name = f"{dossier_id}/{file_id}.{request_message['targetFileExtension']}" - response_file_name = f"{dossier_id}/{file_id}.{request_message['responseFileExtension']}" - logger.info("Processing file %s w/ file_id=%s and dossier_id=%s", target_file_name, file_id, dossier_id) - - bucket = PYINFRA_CONFIG.storage_bucket - storage = get_storage(PYINFRA_CONFIG) - - logger.debug("loading model pipeline") +def process_data(data: bytes) -> list: pipeline = load_pipeline(verbose=IMAGE_CONFIG.service.verbose, batch_size=IMAGE_CONFIG.service.batch_size) - - if storage.exists(bucket, target_file_name): - logger.info("fetching file for file_id=%s and dossier_id=%s", file_id, dossier_id) - object_bytes = storage.get_object(bucket, target_file_name) - object_bytes = gzip.decompress(object_bytes) - - classifications = list(pipeline(pdf=object_bytes)) - logger.info("predictions ready for file_id=%s and dossier_id=%s", file_id, dossier_id) - - result = {**request_message, "data": classifications} - storage_bytes = gzip.compress(json.dumps(result).encode("utf-8")) - - logger.info("storing predictions for file_id=%s and dossier_id=%s", file_id, dossier_id) - storage.put_object(bucket, response_file_name, storage_bytes) - - return {"dossierId": dossier_id, "fileId": file_id} - else: - logger.info("no files found for file_id=%s and dossier_id=%s", file_id, dossier_id) - return None + return list(pipeline(data)) def main(): logger.info(load_banner()) + process_payload = make_payload_processor(process_data, config=PYINFRA_CONFIG) + queue_manager = QueueManager(PYINFRA_CONFIG) - queue_manager.start_consuming(process_request) + queue_manager.start_consuming(process_payload) if __name__ == "__main__":