From 81520b1a532575e4475937af817c982476cb3761 Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Fri, 17 Mar 2023 16:12:59 +0100 Subject: [PATCH] Pull request #46: RED-6205 add prometheus monitoring Merge in RR/image-prediction from RED-6205-add-prometheus-monitoring to master Squashed commit of the following: commit 6932b5ee579a31d0317dc3f76acb8dd2845fdb4b Author: Julius Unverfehrt Date: Thu Mar 16 17:30:57 2023 +0100 update pyinfra commit d6e55534623eae2edcddaa6dd333f93171d421dc Author: Julius Unverfehrt Date: Thu Mar 16 16:30:14 2023 +0100 set pyinfra subproject to current master commit commit 030dc660e6060ae326c32fba8c2944a10866fbb6 Author: Julius Unverfehrt Date: Thu Mar 16 16:25:19 2023 +0100 adapt serve script to advanced pyinfra API including monitoring of the processing time of images. commit 0fa0c44c376c52653e517d257a35793797f7be31 Author: Julius Unverfehrt Date: Thu Mar 16 15:19:57 2023 +0100 Update dockerfile to work with new pyinfra package setup utilizing pyproject.toml instad of setup.py and requirments.txt commit aad53c4d313f908de93a13e69e2cb150db3be6cb Author: Julius Unverfehrt Date: Thu Mar 16 14:16:04 2023 +0100 remove no longer needed dependencies --- .gitmodules | 3 --- Dockerfile | 6 +----- Dockerfile_tests | 6 +----- incl/pdf2image | 1 - incl/pyinfra | 2 +- src/serve.py | 47 ++++++++--------------------------------------- 6 files changed, 11 insertions(+), 54 deletions(-) delete mode 160000 incl/pdf2image diff --git a/.gitmodules b/.gitmodules index 6f15bbc..8ff9112 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,6 +1,3 @@ [submodule "incl/pyinfra"] path = incl/pyinfra url = ssh://git@git.iqser.com:2222/rr/pyinfra.git -[submodule "incl/pdf2image"] - path = incl/pdf2image - url = ssh://git@git.iqser.com:2222/rr/pdf2image.git diff --git a/Dockerfile b/Dockerfile index fdae322..66d472e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,7 +4,6 @@ WORKDIR /app/service COPY src src COPY incl/pyinfra incl/pyinfra -COPY incl/pdf2image incl/pdf2image COPY data data COPY image_prediction image_prediction COPY setup.py setup.py @@ -14,12 +13,9 @@ COPY banner.txt banner.txt # Install dependencies differing from base image. RUN python3 -m pip install -r requirements.txt -RUN python3 -m pip install -r incl/pyinfra/requirements.txt -RUN python3 -m pip install -r incl/pdf2image/requirements.txt RUN python3 -m pip install -e . -RUN python3 -m pip install -e incl/pyinfra -RUN python3 -m pip install -e incl/pdf2image +RUN python3 -m pip install incl/pyinfra EXPOSE 5000 EXPOSE 8080 diff --git a/Dockerfile_tests b/Dockerfile_tests index 211c238..831f8c6 100644 --- a/Dockerfile_tests +++ b/Dockerfile_tests @@ -7,7 +7,6 @@ WORKDIR /app/service COPY src src COPY incl/pyinfra incl/pyinfra -COPY incl/pdf2image incl/pdf2image COPY data data COPY image_prediction image_prediction COPY setup.py setup.py @@ -17,12 +16,9 @@ COPY banner.txt banner.txt # Install module & dependencies RUN python3 -m pip install -r requirements.txt -RUN python3 -m pip install -r incl/pyinfra/requirements.txt -RUN python3 -m pip install -r incl/pdf2image/requirements.txt RUN python3 -m pip install -e . -RUN python3 -m pip install -e incl/pyinfra -RUN python3 -m pip install -e incl/pdf2image +RUN python3 -m pip install incl/pyinfra RUN apt update --yes RUN apt install vim --yes diff --git a/incl/pdf2image b/incl/pdf2image deleted file mode 160000 index f7292c3..0000000 --- a/incl/pdf2image +++ /dev/null @@ -1 +0,0 @@ -Subproject commit f7292c30ad7c7ae5f07cee6925adda096301b60a diff --git a/incl/pyinfra b/incl/pyinfra index 4615703..ff6f437 160000 --- a/incl/pyinfra +++ b/incl/pyinfra @@ -1 +1 @@ -Subproject commit 46157031b588b4bda03f8ddbb9c2e2fadbb47af5 +Subproject commit ff6f437e8491d48c1e0ccb08ad7b164477e15fbe diff --git a/src/serve.py b/src/serve.py index e123213..cd2417b 100644 --- a/src/serve.py +++ b/src/serve.py @@ -1,7 +1,3 @@ -import gzip -import json -import logging - from image_prediction import logger from image_prediction.config import Config from image_prediction.locations import CONFIG_FILE @@ -9,8 +5,8 @@ from image_prediction.pipeline import load_pipeline from image_prediction.utils.banner import load_banner from image_prediction.utils.process_wrapping import wrap_in_process from pyinfra import config +from pyinfra.payload_processing import make_payload_processor from pyinfra.queue.queue_manager import QueueManager -from pyinfra.storage.storage import get_storage PYINFRA_CONFIG = config.get_config() IMAGE_CONFIG = Config(CONFIG_FILE) @@ -18,50 +14,23 @@ IMAGE_CONFIG = Config(CONFIG_FILE) logger.setLevel(PYINFRA_CONFIG.logging_level_root) -# A component of the callback (probably tensorflow) does not release allocated memory (see RED-4206). +# A component of the processing pipeline (probably tensorflow) does not release allocated memory (see RED-4206). # See: https://stackoverflow.com/questions/39758094/clearing-tensorflow-gpu-memory-after-model-execution -# Workaround: Manage Memory with the operating system, by wrapping the callback in a sub-process. +# Workaround: Manage Memory with the operating system, by wrapping the processing in a sub-process. # FIXME: Find more fine-grained solution or if the problem occurs persistently for python services, -# FIXME: move the process wrapper to a general module (see RED-4929). @wrap_in_process -def process_request(request_message): - dossier_id = request_message["dossierId"] - file_id = request_message["fileId"] - target_file_name = f"{dossier_id}/{file_id}.{request_message['targetFileExtension']}" - response_file_name = f"{dossier_id}/{file_id}.{request_message['responseFileExtension']}" - logger.info("Processing file %s w/ file_id=%s and dossier_id=%s", target_file_name, file_id, dossier_id) - - bucket = PYINFRA_CONFIG.storage_bucket - storage = get_storage(PYINFRA_CONFIG) - - logger.debug("loading model pipeline") +def process_data(data: bytes) -> list: pipeline = load_pipeline(verbose=IMAGE_CONFIG.service.verbose, batch_size=IMAGE_CONFIG.service.batch_size) - - if storage.exists(bucket, target_file_name): - logger.info("fetching file for file_id=%s and dossier_id=%s", file_id, dossier_id) - object_bytes = storage.get_object(bucket, target_file_name) - object_bytes = gzip.decompress(object_bytes) - - classifications = list(pipeline(pdf=object_bytes)) - logger.info("predictions ready for file_id=%s and dossier_id=%s", file_id, dossier_id) - - result = {**request_message, "data": classifications} - storage_bytes = gzip.compress(json.dumps(result).encode("utf-8")) - - logger.info("storing predictions for file_id=%s and dossier_id=%s", file_id, dossier_id) - storage.put_object(bucket, response_file_name, storage_bytes) - - return {"dossierId": dossier_id, "fileId": file_id} - else: - logger.info("no files found for file_id=%s and dossier_id=%s", file_id, dossier_id) - return None + return list(pipeline(data)) def main(): logger.info(load_banner()) + process_payload = make_payload_processor(process_data, config=PYINFRA_CONFIG) + queue_manager = QueueManager(PYINFRA_CONFIG) - queue_manager.start_consuming(process_request) + queue_manager.start_consuming(process_payload) if __name__ == "__main__":