import gzip import json import logging from image_prediction.config import Config from image_prediction.locations import CONFIG_FILE from image_prediction.pipeline import load_pipeline from image_prediction.utils.banner import load_banner from image_prediction.utils.process_wrapping import wrap_in_process from pyinfra import config from pyinfra.queue.queue_manager import QueueManager from pyinfra.storage.storage import get_storage PYINFRA_CONFIG = config.get_config() IMAGE_CONFIG = Config(CONFIG_FILE) logging.getLogger().addHandler(logging.StreamHandler()) logger = logging.getLogger("main") logger.setLevel(PYINFRA_CONFIG.logging_level_root) # A component of the callback (probably tensorflow) does not release allocated memory (see RED-4206). # See: https://stackoverflow.com/questions/39758094/clearing-tensorflow-gpu-memory-after-model-execution # Workaround: Manage Memory with the operating system, by wrapping the callback in a sub-process. # FIXME: Find more fine-grained solution or if the problem occurs persistently for python services, # FIXME: move the process wrapper to a general module (see RED-4929). @wrap_in_process def process_request(request_message): dossier_id = request_message["dossierId"] file_id = request_message["fileId"] logger.info(f"Processing {dossier_id=} {file_id=} ...") target_file_name = f"{dossier_id}/{file_id}.{request_message['targetFileExtension']}" response_file_name = f"{dossier_id}/{file_id}.{request_message['responseFileExtension']}" bucket = PYINFRA_CONFIG.storage_bucket storage = get_storage(PYINFRA_CONFIG) pipeline = load_pipeline(verbose=IMAGE_CONFIG.service.verbose, batch_size=IMAGE_CONFIG.service.batch_size) if not storage.exists(bucket, target_file_name): publish_result = False else: publish_result = True object_bytes = storage.get_object(bucket, target_file_name) object_bytes = gzip.decompress(object_bytes) classifications = list(pipeline(pdf=object_bytes)) result = {**request_message, "data": classifications} storage_bytes = gzip.compress(json.dumps(result).encode("utf-8")) storage.put_object(bucket, response_file_name, storage_bytes) return publish_result, {"dossierId": dossier_id, "fileId": file_id} def main(): logger.info(load_banner()) queue_manager = QueueManager(PYINFRA_CONFIG) queue_manager.start_consuming(process_request) if __name__ == "__main__": main()