adapt serve script to advanced pyinfra API including monitoring of the processing time of images.

This commit is contained in:
Julius Unverfehrt 2023-03-16 16:25:19 +01:00
parent 0fa0c44c37
commit 030dc660e6

View File

@ -1,7 +1,3 @@
import gzip
import json
import logging
from image_prediction import logger
from image_prediction.config import Config
from image_prediction.locations import CONFIG_FILE
@ -9,8 +5,8 @@ from image_prediction.pipeline import load_pipeline
from image_prediction.utils.banner import load_banner
from image_prediction.utils.process_wrapping import wrap_in_process
from pyinfra import config
from pyinfra.payload_processing import make_payload_processor
from pyinfra.queue.queue_manager import QueueManager
from pyinfra.storage.storage import get_storage
PYINFRA_CONFIG = config.get_config()
IMAGE_CONFIG = Config(CONFIG_FILE)
@ -18,50 +14,23 @@ IMAGE_CONFIG = Config(CONFIG_FILE)
logger.setLevel(PYINFRA_CONFIG.logging_level_root)
# A component of the callback (probably tensorflow) does not release allocated memory (see RED-4206).
# A component of the processing pipeline (probably tensorflow) does not release allocated memory (see RED-4206).
# See: https://stackoverflow.com/questions/39758094/clearing-tensorflow-gpu-memory-after-model-execution
# Workaround: Manage Memory with the operating system, by wrapping the callback in a sub-process.
# Workaround: Manage Memory with the operating system, by wrapping the processing in a sub-process.
# FIXME: Find more fine-grained solution or if the problem occurs persistently for python services,
# FIXME: move the process wrapper to a general module (see RED-4929).
@wrap_in_process
def process_request(request_message):
dossier_id = request_message["dossierId"]
file_id = request_message["fileId"]
target_file_name = f"{dossier_id}/{file_id}.{request_message['targetFileExtension']}"
response_file_name = f"{dossier_id}/{file_id}.{request_message['responseFileExtension']}"
logger.info("Processing file %s w/ file_id=%s and dossier_id=%s", target_file_name, file_id, dossier_id)
bucket = PYINFRA_CONFIG.storage_bucket
storage = get_storage(PYINFRA_CONFIG)
logger.debug("loading model pipeline")
def process_data(data: bytes) -> list:
pipeline = load_pipeline(verbose=IMAGE_CONFIG.service.verbose, batch_size=IMAGE_CONFIG.service.batch_size)
if storage.exists(bucket, target_file_name):
logger.info("fetching file for file_id=%s and dossier_id=%s", file_id, dossier_id)
object_bytes = storage.get_object(bucket, target_file_name)
object_bytes = gzip.decompress(object_bytes)
classifications = list(pipeline(pdf=object_bytes))
logger.info("predictions ready for file_id=%s and dossier_id=%s", file_id, dossier_id)
result = {**request_message, "data": classifications}
storage_bytes = gzip.compress(json.dumps(result).encode("utf-8"))
logger.info("storing predictions for file_id=%s and dossier_id=%s", file_id, dossier_id)
storage.put_object(bucket, response_file_name, storage_bytes)
return {"dossierId": dossier_id, "fileId": file_id}
else:
logger.info("no files found for file_id=%s and dossier_id=%s", file_id, dossier_id)
return None
return list(pipeline(data))
def main():
logger.info(load_banner())
process_payload = make_payload_processor(process_data, config=PYINFRA_CONFIG)
queue_manager = QueueManager(PYINFRA_CONFIG)
queue_manager.start_consuming(process_request)
queue_manager.start_consuming(process_payload)
if __name__ == "__main__":