refactoring; testing of prediction model handel redai adapter

This commit is contained in:
Matthias Bisping 2022-03-30 19:01:54 +02:00
parent cc8d87338c
commit 20718996bd
5 changed files with 43 additions and 189 deletions

View File

@ -1,122 +0,0 @@
from itertools import chain
from operator import itemgetter
from typing import List, Dict, Iterable
import numpy as np
from image_prediction.config import CONFIG
from image_prediction.locations import MLRUNS_DIR, BASE_WEIGHTS
from image_prediction.utils import temporary_pdf_file, get_logger
from incl.redai_image.redai.redai.backend.model.model_handle import ModelHandle
from incl.redai_image.redai.redai.backend.pdf.image_extraction import extract_and_stitch
from incl.redai_image.redai.redai.utils.mlflow_reader import MlflowModelReader
from incl.redai_image.redai.redai.utils.shared import chunk_iterable
logger = get_logger()
class Predictor:
"""`ModelHandle` wrapper. Forwards to wrapped service_estimator handle for prediction and produces structured output that is
interpretable independently of the wrapped service_estimator (e.g. with regard to a .classes_ attribute).
"""
def __init__(self, model_handle: ModelHandle = None):
"""Initializes a ServiceEstimator.
Args:
model_handle: ModelHandle object to forward to for prediction. By default, a service_estimator handle is loaded from the
mlflow database via CONFIG.service.run_id.
"""
try:
if model_handle is None:
reader = MlflowModelReader(run_id=CONFIG.service.run_id, mlruns_dir=MLRUNS_DIR)
self.model_handle = reader.get_model_handle(BASE_WEIGHTS)
else:
self.model_handle = model_handle
self.classes = self.model_handle.model.classes_
self.classes_readable = np.array(self.model_handle.classes)
self.classes_readable_aligned = self.classes_readable[self.classes[list(range(len(self.classes)))]]
except Exception as e:
logger.info(f"Service estimator initialization failed: {e}")
def __make_predictions_human_readable(self, probs: np.ndarray) -> List[Dict[str, float]]:
"""Translates an n x m matrix of probabilities over classes into an n-element list of mappings from classes to
probabilities.
Args:
probs: probability matrix (items x classes)
Returns:
list of mappings from classes to probabilities.
"""
classes = np.argmax(probs, axis=1)
classes = self.classes[classes]
classes_readable = [self.model_handle.classes[c] for c in classes]
return classes_readable
def predict(self, images: List, probabilities: bool = False, **kwargs):
"""Gathers predictions for list of images. Assigns each image a class and optionally a probability distribution
over all classes.
Args:
images (List[PIL.Image]) : Images to gather predictions for.
probabilities: Whether to return dictionaries of the following form instead of strings:
{
"class": predicted class,
"probabilities": {
"class 1" : class 1 probability,
"class 2" : class 2 probability,
...
}
}
Returns:
By default the return value is a list of classes (meaningful class name strings). Alternatively a list of
dictionaries with an additional probability field for estimated class probabilities per image can be
returned.
"""
X = self.model_handle.prep_images(list(images))
probs_per_item = self.model_handle.model.predict_proba(X, **kwargs).astype(float)
classes = self.__make_predictions_human_readable(probs_per_item)
class2prob_per_item = [dict(zip(self.classes_readable_aligned, probs)) for probs in probs_per_item]
class2prob_per_item = [
dict(sorted(c2p.items(), key=itemgetter(1), reverse=True)) for c2p in class2prob_per_item
]
predictions = [{"class": c, "probabilities": c2p} for c, c2p in zip(classes, class2prob_per_item)]
return predictions if probabilities else classes
def predict_pdf(self, pdf, verbose=False):
with temporary_pdf_file(pdf) as pdf_path:
image_metadata_pairs = self.__extract_image_metadata_pairs(pdf_path, verbose=verbose)
return self.__predict_images(image_metadata_pairs)
def __predict_images(self, image_metadata_pairs: Iterable, batch_size: int = CONFIG.service.batch_size):
def process_chunk(chunk):
images, metadata = zip(*chunk)
predictions = self.predict(images, probabilities=True)
return predictions, metadata
def predict(image_metadata_pair_generator):
chunks = chunk_iterable(image_metadata_pair_generator, n=batch_size)
return map(chain.from_iterable, zip(*map(process_chunk, chunks)))
try:
predictions, metadata = predict(image_metadata_pairs)
return predictions, metadata
except ValueError:
return [], []
@staticmethod
def __extract_image_metadata_pairs(pdf_path: str, **kwargs):
def image_is_large_enough(metadata: dict):
x1, x2, y1, y2 = itemgetter("x1", "x2", "y1", "y2")(metadata)
return abs(x1 - x2) > 2 and abs(y1 - y2) > 2
yield from extract_and_stitch(pdf_path, convert_to_rgb=True, filter_fn=image_is_large_enough, **kwargs)

View File

@ -5,15 +5,10 @@ class PredictionModelHandle:
"""Simplifies usage of ModelHandle instances for prediction purposes."""
def __init__(self, model_handle):
self.__model_handle = model_handle
self.__predict = rcompose(self.__model_handle.prep_images, self.__model_handle.model.predict)
self.__predict_proba = rcompose(self.__model_handle.prep_images, self.__model_handle.model.predict_proba)
self.__predict = rcompose(model_handle.prep_images, model_handle.model.predict_proba)
def predict(self, *args, **kwargs):
return self.__predict(*args, **kwargs)
def predict_proba(self, *args, **kwargs):
return self.__predict_proba(*args, **kwargs)
def __call__(self, *args, **kwargs):
return self.predict_proba(*args, **kwargs)
return self.predict(*args, **kwargs)

View File

@ -1,49 +0,0 @@
import logging
from waitress import serve
from image_prediction.config import CONFIG
from image_prediction.flask import make_prediction_server
from image_prediction.predictor import Predictor
from image_prediction.response import build_response
from image_prediction.utils import get_logger, show_banner
logger = get_logger()
def main():
def predict(pdf):
# Keras service_estimator.predict stalls when service_estimator was loaded in different process
# https://stackoverflow.com/questions/42504669/keras-tensorflow-and-multiprocessing-in-python
predictor = Predictor()
predictions, metadata = predictor.predict_pdf(pdf, verbose=CONFIG.service.progressbar)
response = build_response(predictions, metadata)
return response
logger.info("Predictor ready.")
prediction_server = make_prediction_server(predict)
run_prediction_server(prediction_server, mode=CONFIG.webserver.mode)
def run_prediction_server(app, mode="development"):
if mode == "development":
app.run(host=CONFIG.webserver.host, port=CONFIG.webserver.port, debug=True)
elif mode == "production":
serve(app, host=CONFIG.webserver.host, port=CONFIG.webserver.port)
if __name__ == "__main__":
logging_level = CONFIG.service.logging_level
logging.basicConfig(level=logging_level)
logging.getLogger("flask").setLevel(logging.ERROR)
logging.getLogger("urllib3").setLevel(logging.ERROR)
logging.getLogger("werkzeug").setLevel(logging.ERROR)
logging.getLogger("waitress").setLevel(logging.ERROR)
logging.getLogger("PIL").setLevel(logging.ERROR)
logging.getLogger("h5py").setLevel(logging.ERROR)
show_banner()
main()

View File

@ -1,7 +1,7 @@
import pytest
@pytest.mark.parametrize("estimator_type", ["mock", "keras"])
@pytest.mark.parametrize("estimator_type", ["mock", "keras", "redai"])
@pytest.mark.parametrize("label_format", ["index", "probability"])
def test_classifier(classifier, input_batch, expected_predictions_mapped):
predictions = classifier(input_batch)

View File

@ -25,6 +25,7 @@ from image_prediction.model_loader.database.connectors.mock import DatabaseConne
from image_prediction.model_loader.loader import ModelLoader
from image_prediction.model_loader.loaders.mlflow import MlflowConnector
from image_prediction.redai_adapter.mlflow import MlflowModelReader
from image_prediction.redai_adapter.model import PredictionModelHandle
@pytest.fixture
@ -50,13 +51,21 @@ def classifier(estimator_adapter, label_mapper):
return classifier
class EstimatorMock:
@staticmethod
def predict(batch):
return [None for _ in batch]
@pytest.fixture
def estimator_mock():
class EstimatorMock:
@staticmethod
def predict(batch):
return [None for _ in batch]
def __call__(self, batch):
return self.predict(batch)
@staticmethod
def predict_proba(batch):
return [None for _ in batch]
def __call__(self, batch):
return self.predict(batch)
return EstimatorMock()
@pytest.fixture
@ -99,11 +108,15 @@ def expected_predictions(
@pytest.fixture
def estimator_adapter(estimator_type, keras_model, output_batch_generator, monkeypatch):
def estimator_adapter(
estimator_type, estimator_mock, keras_model, model_handle_mock, output_batch_generator, monkeypatch
):
if estimator_type == "mock":
estimator_adapter = EstimatorAdapter(EstimatorMock())
estimator_adapter = EstimatorAdapter(estimator_mock)
elif estimator_type == "keras":
estimator_adapter = EstimatorAdapter(keras_model)
elif estimator_type == "redai":
estimator_adapter = EstimatorAdapter(PredictionModelHandle(model_handle_mock))
else:
raise UnknownEstimatorAdapter(f"No adapter for estimator type {estimator_type} was specified.")
@ -182,7 +195,6 @@ def batch_of_expected_numeric_labels(batch_size, classes):
@pytest.fixture
def batch_of_expected_label_to_probability_mappings(batch_of_expected_probability_arrays, classes):
def map_probabilities(probabilities):
lbl2prob = dict(sorted(zip(classes, probabilities), key=itemgetter(1), reverse=True))
most_likely = [*lbl2prob][0]
@ -250,7 +262,6 @@ def info_label_map():
@pytest.fixture
def metadata_formatted(metadata):
def format_metadata(metadata):
return {key.value: val for key, val in metadata.items()}
@ -358,3 +369,22 @@ def mlruns_dir():
@pytest.fixture
def mlflow_reader(mlruns_dir):
return MlflowModelReader(mlruns_dir)
@pytest.fixture
def model_handle_mock(estimator_mock):
class ModelHandleMock:
def __init__(self):
self.model = estimator_mock
def prep_images(self, batch):
return [None for _ in batch]
def predict(self, batch):
return [None for _ in batch]
def predict_proba(self, batch):
return [None for _ in batch]
return ModelHandleMock()