86 lines
3.7 KiB
Python
86 lines
3.7 KiB
Python
import gzip
|
|
import io
|
|
import json
|
|
import logging
|
|
import sys
|
|
|
|
from image_prediction.config import Config
|
|
from image_prediction.locations import CONFIG_FILE
|
|
from image_prediction.pipeline import load_pipeline
|
|
from image_prediction.utils.banner import load_banner
|
|
from image_prediction.utils.process_wrapping import wrap_in_process
|
|
from pyinfra import config
|
|
from pyinfra.queue.queue_manager import QueueManager
|
|
from pyinfra.storage.storage import get_storage
|
|
|
|
PYINFRA_CONFIG = config.get_config()
|
|
IMAGE_CONFIG = Config(CONFIG_FILE)
|
|
|
|
LOG_FORMAT = "%(asctime)s [%(levelname)s] - [%(filename)s -> %(funcName)s() -> %(lineno)s] : %(message)s"
|
|
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
|
|
stream_handler = logging.StreamHandler(sys.stdout)
|
|
stream_handler_format = logging.Formatter(LOG_FORMAT, datefmt=DATE_FORMAT)
|
|
stream_handler.setFormatter(stream_handler_format)
|
|
|
|
logger = logging.getLogger("image-prediction/serve")
|
|
logger.setLevel(PYINFRA_CONFIG.logging_level_root)
|
|
logger.addHandler(stream_handler)
|
|
|
|
|
|
# A component of the callback (probably tensorflow) does not release allocated memory (see RED-4206).
|
|
# See: https://stackoverflow.com/questions/39758094/clearing-tensorflow-gpu-memory-after-model-execution
|
|
# Workaround: Manage Memory with the operating system, by wrapping the callback in a sub-process.
|
|
# FIXME: Find more fine-grained solution or if the problem occurs persistently for python services,
|
|
# FIXME: move the process wrapper to a general module (see RED-4929).
|
|
@wrap_in_process
|
|
def process_request(request_message):
|
|
dossier_id = request_message["dossierId"]
|
|
file_id = request_message["fileId"]
|
|
target_file_name = f"{dossier_id}/{file_id}.{request_message['targetFileExtension']}"
|
|
response_file_name = f"{dossier_id}/{file_id}.{request_message['responseFileExtension']}"
|
|
figure_data_file_name = f"{dossier_id}/{file_id}.FIGURE.json.gz"
|
|
logger.info("Processing file %s w/ file_id=%s, and daossier_id=%s", target_file_name, file_id, dossier_id)
|
|
|
|
bucket = PYINFRA_CONFIG.storage_bucket
|
|
storage = get_storage(PYINFRA_CONFIG)
|
|
|
|
logger.debug("loading model pipeline")
|
|
pipeline = load_pipeline(verbose=IMAGE_CONFIG.service.verbose, batch_size=IMAGE_CONFIG.service.batch_size)
|
|
|
|
if storage.exists(bucket, target_file_name):
|
|
logger.info("fetching file for file_id=%s, and daossier_id=%s", file_id, dossier_id)
|
|
# should_publish_result = True
|
|
object_bytes = storage.get_object(bucket, target_file_name)
|
|
object_bytes = gzip.decompress(object_bytes)
|
|
classifications = list(pipeline(pdf=object_bytes))
|
|
|
|
if storage.exists(bucket, figure_data_file_name):
|
|
logger.info("fetching figures for file_id=%s, and daossier_id=%s", file_id, dossier_id)
|
|
metadata_bytes = storage.get_object(bucket, figure_data_file_name)
|
|
metadata_bytes = gzip.decompress(metadata_bytes)
|
|
metadata_per_image = json.load(io.BytesIO(metadata_bytes))["data"]
|
|
classifications_cv = list(pipeline(pdf=object_bytes, metadata_per_image=metadata_per_image))
|
|
else:
|
|
classifications_cv = []
|
|
|
|
result = {**request_message, "data": classifications, "dataCV": classifications_cv}
|
|
logger.info("predictions ready for file_id=%s, and daossier_id=%s", file_id, dossier_id)
|
|
storage_bytes = gzip.compress(json.dumps(result).encode("utf-8"))
|
|
storage.put_object(bucket, response_file_name, storage_bytes)
|
|
|
|
return {"dossierId": dossier_id, "fileId": file_id}
|
|
else:
|
|
logger.info("no files found for file_id=%s, and daossier_id=%s", file_id, dossier_id)
|
|
return None
|
|
|
|
|
|
def main():
|
|
logger.info(load_banner())
|
|
|
|
queue_manager = QueueManager(PYINFRA_CONFIG)
|
|
queue_manager.start_consuming(process_request)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|