From 06adedac579ccb5a958e5e1199d9de94e12a968e Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Tue, 29 Mar 2022 19:50:43 +0200 Subject: [PATCH] reimplemented model loader logic and moved base weights into mlflow run dir --- .dvc/config | 3 +- .gitignore | 2 +- data/.gitignore | 1 + data/base_weights.h5.dvc | 4 - data/mlruns.dvc | 6 +- .../model_loader/loaders/mlflow.py | 233 +++++++++--------- pytest.ini | 4 +- test/unit_tests/conftest.py | 65 ++--- test/unit_tests/model_loader_test.py | 18 +- 9 files changed, 153 insertions(+), 183 deletions(-) create mode 100644 data/.gitignore delete mode 100644 data/base_weights.h5.dvc diff --git a/.dvc/config b/.dvc/config index 9277694..45a3243 100644 --- a/.dvc/config +++ b/.dvc/config @@ -1,5 +1,6 @@ [core] remote = vector + autostage = true ['remote "vector"'] - url = ssh://vector.iqser.com/research/image_service/ + url = ssh://vector.iqser.com/research/image-prediction/ port = 22 diff --git a/.gitignore b/.gitignore index a14b81f..6912504 100644 --- a/.gitignore +++ b/.gitignore @@ -172,4 +172,4 @@ fabric.properties # End of https://www.toptal.com/developers/gitignore/api/linux,pycharm /image_prediction/data/mlruns/ -/data/mlruns/ +#/data/mlruns/ diff --git a/data/.gitignore b/data/.gitignore new file mode 100644 index 0000000..c9213f4 --- /dev/null +++ b/data/.gitignore @@ -0,0 +1 @@ +/mlruns diff --git a/data/base_weights.h5.dvc b/data/base_weights.h5.dvc deleted file mode 100644 index 9f07d13..0000000 --- a/data/base_weights.h5.dvc +++ /dev/null @@ -1,4 +0,0 @@ -outs: -- md5: 6d0186c1f25e889d531788f168fa6cf0 - size: 16727296 - path: base_weights.h5 diff --git a/data/mlruns.dvc b/data/mlruns.dvc index d390fed..1219323 100644 --- a/data/mlruns.dvc +++ b/data/mlruns.dvc @@ -1,5 +1,5 @@ outs: -- md5: d1c708270bab6fcd344d4a8b05d1103d.dir - size: 150225383 - nfiles: 178 +- md5: 4219c52caf5f87f5a94f1ae00c60fb91.dir + size: 166952679 + nfiles: 179 path: mlruns diff --git a/image_prediction/model_loader/loaders/mlflow.py b/image_prediction/model_loader/loaders/mlflow.py index d0b98b9..ada9899 100644 --- a/image_prediction/model_loader/loaders/mlflow.py +++ b/image_prediction/model_loader/loaders/mlflow.py @@ -1,123 +1,110 @@ -# """This module translates between the new ModelLoader API and the inconsistent and historically grown redai model and -# MLflow API as well as the circumstance, that the model artifacts are currently not stored at a single place, due to the -# need of loading the base weights of the pre-trained model, that became apparent at a later point than the design of the -# MLflow storage and MlflowModelReader class; that is why the code in this module is so unclean. In the future, a -# non-adhoc solution should be used that offers a clean API and storage solution. Either implement a well-designed MLflow -# based solution or look into an alternative such as WandB or use a platform solution such as AWS. -# """ -# import importlib -# import json -# import os -# import warnings -# from typing import Mapping -# -# import numpy as np -# from funcy import rcompose -# -# from image_prediction.exceptions import IncorrectInstantiation -# from image_prediction.model_loader.loader import ModelLoader -# -# warnings.filterwarnings("ignore", category=DeprecationWarning, module="pkg_resources") -# -# import mlflow -# -# -# def load_object(object_path): -# path_fragments = object_path.split(".") -# -# module_path = ".".join(path_fragments[:-1]) -# object_name = path_fragments[-1] -# -# module = importlib.import_module(module_path) -# return getattr(module, object_name) -# -# -# def to_local_path(uri): -# return uri[7:] -# -# -# class MlflowModelReader: -# -# def __init__(self, run_id, mlruns_dir=None): -# mlflow.set_tracking_uri(mlruns_dir) -# -# self.run_id = run_id -# self.run = mlflow.get_run(run_id) -# self.artifact_uri = self.__correct_artifact_uri(self.run.info.to_proto().artifact_uri, mlruns_dir) -# -# @staticmethod -# def __correct_artifact_uri(run_artifact_uri, base_path): -# _, suffix = run_artifact_uri.split("mlruns/") -# return os.path.join(base_path, suffix) -# -# def get_weights_path(self, prefix="tt"): -# path = os.path.join(self.artifact_uri, prefix, "train_dev", "estimator", "weights.h5") -# return path -# -# def get_classes(self, prefix="tt"): -# classes = json.loads( -# self.run.data.params[os.path.join(prefix, "train_dev/estimator/classes")].replace("'", '"') -# ) -# return classes -# -# def get_model_handle(self, base_weights=None): -# weights_path = self.get_weights_path() -# model_handle_builder = load_object(self.run.data.params["model_handle_builder"].strip()) -# model_handle = model_handle_builder(self.get_classes(), base_weights=base_weights) -# model_handle.load_top_weights(weights_path) -# return model_handle -# -# -# class PredictionModelHandle: -# """Simplifies usage of ModelHandle instances for prediction purposes.""" -# -# def __init__(self, model_handle, classes_readable: Mapping[int, str]): -# self.__model_handle = model_handle -# self.__classes_readable = classes_readable -# -# @property -# def classes(self): -# return self.__classes_readable -# -# def predict(self, *args, **kwargs): -# predict = rcompose(self.__model_handle.prep_images, self.__model_handle.model.predict) -# return predict(*args, **kwargs) -# -# def predict_proba(self, *args, **kwargs): -# predict = rcompose(self.__model_handle.prep_images, self.__model_handle.model.predict_proba) -# return predict(*args, **kwargs) -# -# -# class MlflowLoader(ModelLoader): -# -# def __init__(self, mlruns_dir): -# self.__mlruns_dir = mlruns_dir -# self._base_weights = None -# -# def load_model(self, run_id, base_weights=None) -> PredictionModelHandle: -# -# # TODO: refac https://stackoverflow.com/questions/42735421/how-to-restrict-object-instantiation-only-via-a-factory-in-python -# if not base_weights: -# -# if not self._base_weights: -# raise IncorrectInstantiation("MlflowReader needs to be initialized via get_model_loader.") -# -# base_weights = self._base_weights -# -# mlflow_reader = MlflowModelReader(run_id, mlruns_dir=self.__mlruns_dir) -# model_handel = mlflow_reader.get_model_handle(base_weights) -# model_handle = model_handel -# classes_readable = self.__load_classes(model_handle) -# -# model = PredictionModelHandle(model_handle, classes_readable) -# -# return model -# -# @staticmethod -# def __load_classes(model_handle): -# -# classes = model_handle.model.classes_ -# classes_readable = np.array(model_handle.classes) -# classes_readable_aligned = classes_readable[classes[list(range(len(classes)))]] -# -# return classes_readable_aligned +"""This module translates between the new ModelLoader API and the inconsistent and historically grown redai model and +MLflow API as well as the circumstance, that the model artifacts are currently not stored at a single place, due to the +need of loading the base weights of the pre-trained model, that became apparent at a later point than the design of the +MLflow storage and MlflowModelReader class; that is why the code in this module is so unclean. In the future, a +non-adhoc solution should be used that offers a clean API and storage solution. Either implement a well-designed MLflow +based solution or look into an alternative such as WandB or use a platform solution such as AWS. +""" + +import os + +os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" + +import importlib +import json +import os +from functools import lru_cache + +from funcy import rcompose + +from image_prediction.model_loader.database.connector import DatabaseConnector + +import mlflow + + +class PredictionModelHandle: + """Simplifies usage of ModelHandle instances for prediction purposes.""" + + def __init__(self, model_handle): + self.__model_handle = model_handle + + def predict(self, *args, **kwargs): + predict = rcompose(self.__model_handle.prep_images, self.__model_handle.model.predict) + return predict(*args, **kwargs) + + def predict_proba(self, *args, **kwargs): + predict = rcompose(self.__model_handle.prep_images, self.__model_handle.model.predict_proba) + return predict(*args, **kwargs) + + +class MlflowModelReader: + + def __init__(self, mlruns_dir=None): + self.mlruns_dir = mlruns_dir + mlflow.set_tracking_uri(self.mlruns_dir) + + @staticmethod + def __correct_artifact_uri(run_artifact_uri, base_path): + _, suffix = run_artifact_uri.split("mlruns/") + return os.path.join(base_path, suffix) + + def __get_weights_path(self, run_id, prefix="tt"): + run = self.__get_run(run_id) + + artifact_uri = self.__correct_artifact_uri(run.info.to_proto().artifact_uri, self.mlruns_dir) + path = os.path.join(artifact_uri, prefix, "train_dev", "estimator") + + base_path = os.path.join(path, "base_weights.h5") + weights_path = os.path.join(path, "weights.h5") + + return base_path, weights_path + + @lru_cache(maxsize=None) + def __get_run(self, run_id): + return mlflow.get_run(run_id) + + def __get_classes(self, run_id, prefix="tt"): + run = self.__get_run(run_id) + + classes = json.loads(run.data.params[os.path.join(prefix, "train_dev/estimator/classes")].replace("'", '"')) + + return classes + + def __get_model_handle(self, run_id): + run = self.__get_run(run_id) + + model_handle_builder = load_object(run.data.params["model_handle_builder"].strip()) + + base_weights_path, weights_path = self.__get_weights_path(run_id) + + model_handle = model_handle_builder(self.__get_classes(run_id), base_weights=base_weights_path) + model_handle.load_top_weights(weights_path) + + return model_handle + + def __get_model(self, run_id) -> PredictionModelHandle: + model_handle = self.__get_model_handle(run_id) + model = PredictionModelHandle(model_handle) + return model + + def __getitem__(self, run_id): + return {"model": self.__get_model(run_id), "classes": self.__get_classes(run_id)} + + +def load_object(object_path): + path_fragments = object_path.split(".") + + module_path = ".".join(path_fragments[:-1]) + object_name = path_fragments[-1] + + module = importlib.import_module(module_path) + return getattr(module, object_name) + + +class MlflowConnector(DatabaseConnector): + + def __init__(self, mlflow_reader: MlflowModelReader): + self.mlflow_reader = mlflow_reader + + def get_object(self, run_id): + return self.mlflow_reader[run_id] diff --git a/pytest.ini b/pytest.ini index 5922a79..bae23b6 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,2 +1,4 @@ [pytest] -norecursedirs = incl \ No newline at end of file +norecursedirs = incl +filterwarnings = + ignore:.*imp.*:DeprecationWarning diff --git a/test/unit_tests/conftest.py b/test/unit_tests/conftest.py index b420753..a81d7e0 100644 --- a/test/unit_tests/conftest.py +++ b/test/unit_tests/conftest.py @@ -20,6 +20,7 @@ from image_prediction.image_extractor.extractors.mock import ImageExtractorMock from image_prediction.image_extractor.extractors.parsable import ParsablePDFImageExtractor from image_prediction.model_loader.database.connectors.mock import DatabaseConnectorMock from image_prediction.model_loader.loader import ModelLoader +from image_prediction.model_loader.loaders.mlflow import MlflowConnector, MlflowModelReader @pytest.fixture @@ -65,10 +66,6 @@ def estimator_adapter(estimator_type, keras_model, output_batch_generator, monke @pytest.fixture def keras_model(input_size): - import warnings - - warnings.filterwarnings("ignore", category=DeprecationWarning) - import os os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" @@ -173,7 +170,6 @@ def image_metadata_pairs(images, metadata): @pytest.fixture def pdf(image_metadata_pairs): - pdf = fpdf.FPDF(unit="pt") for pair in image_metadata_pairs: @@ -183,7 +179,6 @@ def pdf(image_metadata_pairs): def add_image(pdf, image_metadata_pair): - while fewer_pages_then_required(image_metadata_pair.metadata["page_idx"], pdf): pdf.add_page() @@ -207,29 +202,10 @@ def add_image_to_last_page(pdf: fpdf.fpdf.FPDF, image_metadata_pair): pdf.image(temp_image.name, x=x, y=y, w=w, h=h, type="png") -# @pytest.fixture -# def model_handle_mock(classes, classifier): -# -# class ModelHandleMock: -# -# def __init__(self, classes): -# classifier.classes_ = np.array(list(range(len(classes)))) -# self.classes = classes -# self.model = classifier -# -# return ModelHandleMock(classes) -# -# -# @pytest.fixture -# def prediction_model_handle_mock(model_handle_mock, classes): -# return PredictionModelHandle(model_handle_mock, classes) - - @pytest.fixture def model(): - class Model: - + @staticmethod def predict(*args): return True @@ -239,7 +215,7 @@ def model(): return True return Model() - + @pytest.fixture def model_database_record_identifier(): @@ -257,9 +233,13 @@ def model_database(model_database_record, model_database_record_identifier): @pytest.fixture -def database_connector(database_type, model_database): +def database_connector(database_type, model_database, mlflow_reader): if database_type == "mock": return DatabaseConnectorMock(model_database) + + elif database_type == "mlflow": + return MlflowConnector(mlflow_reader) + else: raise UnknownDatabaseType(f"No connector for database type {database_type} was specified.") @@ -269,17 +249,18 @@ def model_loader(database_connector): return ModelLoader(database_connector) -# @pytest.fixture -# def model_loader(loader_type, monkeypatch, model_handle_mock, classes): -# if loader_type == "mock": -# loader = ModelLoaderMock() -# monkeypatch.setattr(loader, "model", model_handle_mock) -# -# # elif loader_type == "mlflow": -# # loader = get_mlflow_loader() -# # monkeypatch.setattr(loader, "_model_handle", model_handle_mock) -# -# else: -# raise UnknownModelLoader(f"No model loader for type {loader_type} was specified.") -# -# return loader +@pytest.fixture +def mlflow_run_id(): + from image_prediction.config import CONFIG + return CONFIG.service.run_id + + +@pytest.fixture +def mlruns_dir(): + from image_prediction.locations import MLRUNS_DIR + return MLRUNS_DIR + + +@pytest.fixture +def mlflow_reader(mlruns_dir): + return MlflowModelReader(mlruns_dir) diff --git a/test/unit_tests/model_loader_test.py b/test/unit_tests/model_loader_test.py index ecba32b..0216e36 100644 --- a/test/unit_tests/model_loader_test.py +++ b/test/unit_tests/model_loader_test.py @@ -1,14 +1,7 @@ -import numpy as np import pytest +from image_prediction.model_loader.loaders.mlflow import PredictionModelHandle -# @pytest.mark.parametrize("loader_type", ["mock"]) -# @pytest.mark.parametrize("estimator_type", ["mock"]) -# @pytest.mark.parametrize("batch_size", [3]) -# def test_load_model_and_classes(model_loader, model_handle_mock, classes): -# model_loaded, classes_loaded = model_loader.load_model_and_classes("an identifier") -# assert model_loaded == model_handle_mock -# assert np.all(classes_loaded == classes) @pytest.mark.parametrize("database_type", ["mock"]) def test_load_model_and_classes(model_loader, model_database_record_identifier, model, classes): @@ -17,3 +10,12 @@ def test_load_model_and_classes(model_loader, model_database_record_identifier, assert model_loaded == model assert classes_loaded == classes + + +@pytest.mark.parametrize("database_type", ["mlflow"]) +def test_load_model_and_classes_from_mlflow_store(model_loader, mlflow_run_id): + model_loaded = model_loader.load_model(mlflow_run_id) + classes_loaded = model_loader.load_classes(mlflow_run_id) + + assert type(model_loaded) == PredictionModelHandle + assert classes_loaded == ['formula', 'logo', 'other', 'signature']