RED-4206 wrap queue callback in process to manage memory allocation with the operating system and force deallocation after processing.
This commit is contained in:
parent
48dd52131d
commit
3dfe7b8618
@ -1,38 +1,14 @@
|
|||||||
import multiprocessing
|
|
||||||
import traceback
|
|
||||||
from typing import Callable
|
from typing import Callable
|
||||||
|
|
||||||
from flask import Flask, request, jsonify
|
from flask import Flask, request, jsonify
|
||||||
from prometheus_client import generate_latest, CollectorRegistry, Summary
|
from prometheus_client import generate_latest, CollectorRegistry, Summary
|
||||||
|
|
||||||
from image_prediction.utils import get_logger
|
from image_prediction.utils import get_logger
|
||||||
|
from image_prediction.utils.process_wrapping import wrap_in_process
|
||||||
|
|
||||||
logger = get_logger()
|
logger = get_logger()
|
||||||
|
|
||||||
|
|
||||||
def run_in_process(func):
|
|
||||||
p = multiprocessing.Process(target=func)
|
|
||||||
p.start()
|
|
||||||
p.join()
|
|
||||||
|
|
||||||
|
|
||||||
def wrap_in_process(func_to_wrap):
|
|
||||||
def build_function_and_run_in_process(*args, **kwargs):
|
|
||||||
def func():
|
|
||||||
try:
|
|
||||||
result = func_to_wrap(*args, **kwargs)
|
|
||||||
return_dict["result"] = result
|
|
||||||
except:
|
|
||||||
logger.error(traceback.format_exc())
|
|
||||||
|
|
||||||
manager = multiprocessing.Manager()
|
|
||||||
return_dict = manager.dict()
|
|
||||||
run_in_process(func)
|
|
||||||
return return_dict.get("result", None)
|
|
||||||
|
|
||||||
return build_function_and_run_in_process
|
|
||||||
|
|
||||||
|
|
||||||
def make_prediction_server(predict_fn: Callable):
|
def make_prediction_server(predict_fn: Callable):
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
registry = CollectorRegistry(auto_describe=True)
|
registry = CollectorRegistry(auto_describe=True)
|
||||||
|
|||||||
25
image_prediction/utils/process_wrapping.py
Normal file
25
image_prediction/utils/process_wrapping.py
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
import logging
|
||||||
|
import multiprocessing
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger("main")
|
||||||
|
|
||||||
|
|
||||||
|
def wrap_in_process(fn):
|
||||||
|
manager = multiprocessing.Manager()
|
||||||
|
return_queue = manager.list()
|
||||||
|
|
||||||
|
def process_fn(*args, **kwargs):
|
||||||
|
return_queue.append(fn(*args, **kwargs))
|
||||||
|
|
||||||
|
def wrapped_fn(*args, **kwargs):
|
||||||
|
logger.debug("Starting new subprocess")
|
||||||
|
process = multiprocessing.Process(target=process_fn, args=args, kwargs=kwargs)
|
||||||
|
process.start()
|
||||||
|
process.join()
|
||||||
|
try:
|
||||||
|
return return_queue.pop(0)
|
||||||
|
except IndexError:
|
||||||
|
logger.warning("No results returned by subprocess.")
|
||||||
|
|
||||||
|
return wrapped_fn
|
||||||
@ -7,6 +7,7 @@ from image_prediction.config import Config
|
|||||||
from image_prediction.locations import CONFIG_FILE
|
from image_prediction.locations import CONFIG_FILE
|
||||||
from image_prediction.pipeline import load_pipeline
|
from image_prediction.pipeline import load_pipeline
|
||||||
from image_prediction.utils.banner import load_banner
|
from image_prediction.utils.banner import load_banner
|
||||||
|
from image_prediction.utils.process_wrapping import wrap_in_process
|
||||||
from pyinfra import config
|
from pyinfra import config
|
||||||
from pyinfra.queue.queue_manager import QueueManager
|
from pyinfra.queue.queue_manager import QueueManager
|
||||||
from pyinfra.storage.storage import get_storage
|
from pyinfra.storage.storage import get_storage
|
||||||
@ -19,6 +20,12 @@ logger = logging.getLogger("main")
|
|||||||
logger.setLevel(PYINFRA_CONFIG.logging_level_root)
|
logger.setLevel(PYINFRA_CONFIG.logging_level_root)
|
||||||
|
|
||||||
|
|
||||||
|
# A component of the callback (probably tensorflow) does not release allocated memory (see RED-4206).
|
||||||
|
# See: https://stackoverflow.com/questions/39758094/clearing-tensorflow-gpu-memory-after-model-execution
|
||||||
|
# Workaround: Manage Memory with the operating system, by wrapping the callback in a sub-process.
|
||||||
|
# FIXME: Find more fine-grained solution or if the problem occurs persistently for python services,
|
||||||
|
# FIXME: move the process wrapper to a general module (see RED-4929).
|
||||||
|
@wrap_in_process
|
||||||
def process_request(request_message):
|
def process_request(request_message):
|
||||||
dossier_id = request_message["dossierId"]
|
dossier_id = request_message["dossierId"]
|
||||||
file_id = request_message["fileId"]
|
file_id = request_message["fileId"]
|
||||||
|
|||||||
24
test/unit_tests/process_wrapping_test.py
Normal file
24
test/unit_tests/process_wrapping_test.py
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
import pytest
|
||||||
|
|
||||||
|
from image_prediction.utils.process_wrapping import wrap_in_process
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def process_fn_mock():
|
||||||
|
def _process(a: str, b: float, c: dict, d: list):
|
||||||
|
return a * 2, b + 3, c, d[0]
|
||||||
|
|
||||||
|
return _process
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def parameter():
|
||||||
|
return {"a": "A", "b": 0.42, "c": {"x": 1, "y": 2}, "d": [1, 2, 3]}
|
||||||
|
|
||||||
|
|
||||||
|
def test_process_wrapper_with_args(process_fn_mock, parameter):
|
||||||
|
assert process_fn_mock(*parameter.values()) == wrap_in_process(process_fn_mock)(*parameter.values())
|
||||||
|
|
||||||
|
|
||||||
|
def test_process_wrapper_with_kwargs(process_fn_mock, parameter):
|
||||||
|
assert process_fn_mock(**parameter) == wrap_in_process(process_fn_mock)(**parameter)
|
||||||
Loading…
x
Reference in New Issue
Block a user