added log messages to all pipelien components; converting pipelien output to list for REST transport; refactoring; added e2e test (flask + pipeline)... but hangs

This commit is contained in:
Matthias Bisping 2022-04-02 02:44:30 +02:00
parent e8d0299e46
commit 5c23898280
18 changed files with 157 additions and 98 deletions

View File

@ -4,7 +4,7 @@ webserver:
mode: $SERVER_MODE|production # webserver mode: {development, production}
service:
logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for service logger
logging_level: DEBUG # Logging level for service logger
progressbar: True # Whether a progress bar over the pages of a document is displayed while processing
batch_size: $BATCH_SIZE|32 # Number of images in memory simultaneously
verbose: $VERBOSE|True # Service prints document processing progress to stdout

View File

@ -6,6 +6,9 @@ from funcy import rcompose
from image_prediction.estimator.adapter.adapter import EstimatorAdapter
from image_prediction.label_mapper.mapper import LabelMapper
from image_prediction.utils import get_logger
logger = get_logger()
class Classifier:
@ -27,4 +30,5 @@ class Classifier:
return list(self.__pipe(batch))
def __call__(self, batch: np.array) -> List[str]:
logger.debug("Classifier.predicting")
return self.predict(batch)

View File

@ -7,7 +7,9 @@ from funcy import rcompose
from image_prediction.classifier.classifier import Classifier
from image_prediction.estimator.preprocessor.preprocessor import Preprocessor
from image_prediction.estimator.preprocessor.preprocessors.identity import IdentityPreprocessor
from image_prediction.utils import chunk_iterable
from image_prediction.utils import chunk_iterable, get_logger
logger = get_logger()
class ImageClassifier:
@ -26,4 +28,5 @@ class ImageClassifier:
return predictions
def __call__(self, images: Iterable[Image], batch_size=16):
logger.debug("ImageClassifier.predict")
yield from self.predict(images, batch_size=batch_size)

View File

@ -1,6 +1,9 @@
from funcy import rcompose
from image_prediction.transformer.transformer import Transformer
from image_prediction.utils import get_logger
logger = get_logger()
class TransformerCompositor(Transformer):
@ -9,4 +12,5 @@ class TransformerCompositor(Transformer):
self.pipe = rcompose(*formatters)
def transform(self, obj):
logger.debug("TransformerCompositor.transform")
return self.pipe(obj)

View File

@ -1,8 +1,17 @@
from image_prediction.config import CONFIG
from image_prediction.locations import MLRUNS_DIR
from funcy import juxt
from image_prediction.classifier.classifier import Classifier
from image_prediction.classifier.image_classifier import ImageClassifier
from image_prediction.compositor.compositor import TransformerCompositor
from image_prediction.estimator.adapter.adapter import EstimatorAdapter
from image_prediction.extractor_classifier.extractor_classifier import ExtractorClassifier
from image_prediction.formatter.formatters.camel_case import Snake2CamelCaseKeyFormatter
from image_prediction.formatter.formatters.enum import EnumFormatter
from image_prediction.formatter.formatters.response import ResponseTransformer
from image_prediction.image_extractor.extractors.parsable import ParsablePDFImageExtractor
from image_prediction.label_mapper.mappers.probability import ProbabilityMapper
from image_prediction.model_loader.loader import ModelLoader
from image_prediction.model_loader.loaders.mlflow import MlflowConnector
from image_prediction.pipeline import Pipeline
from image_prediction.redai_adapter.mlflow import MlflowModelReader
@ -11,10 +20,25 @@ def get_mlflow_model_loader(mlruns_dir):
return model_loader
def load_pipeline(**kwargs):
model_loader = get_mlflow_model_loader(MLRUNS_DIR)
model_identifier = CONFIG.service.run_id
def get_image_classifier(model_loader, model_identifier):
model, classes = juxt(model_loader.load_model, model_loader.load_classes)(model_identifier)
return ImageClassifier(Classifier(EstimatorAdapter(model), ProbabilityMapper(classes)))
pipeline = Pipeline(model_loader, model_identifier, **kwargs)
return pipeline
def get_extractor(**kwargs):
image_extractor = ParsablePDFImageExtractor(**kwargs)
return image_extractor
def get_extractor_classifier(model_loader, model_identifier, **kwargs):
extractor_classifier = ExtractorClassifier(
get_extractor(**kwargs), get_image_classifier(model_loader, model_identifier)
)
return extractor_classifier
def get_formatter():
formatter = TransformerCompositor(EnumFormatter(), ResponseTransformer(), Snake2CamelCaseKeyFormatter())
return formatter

View File

@ -1,3 +1,8 @@
from image_prediction.utils import get_logger
logger = get_logger()
class EstimatorAdapter:
def __init__(self, estimator):
self.estimator = estimator
@ -6,4 +11,5 @@ class EstimatorAdapter:
return self.estimator(batch)
def __call__(self, batch):
logger.debug("EstimatorAdapter.predict")
return self.predict(batch)

View File

@ -1,6 +1,9 @@
from enum import Enum
from image_prediction.formatter.formatters.key_formatter import KeyFormatter
from image_prediction.utils import get_logger
logger = get_logger()
class EnumFormatter(KeyFormatter):
@ -8,4 +11,5 @@ class EnumFormatter(KeyFormatter):
return key.value if isinstance(key, Enum) else key
def transform(self, obj):
logger.debug("EnumFormatter.transform")
raise NotImplementedError

View File

@ -3,10 +3,14 @@ from operator import itemgetter
from image_prediction.config import CONFIG
from image_prediction.transformer.transformer import Transformer
from image_prediction.utils import get_logger
logger = get_logger()
class ResponseTransformer(Transformer):
def transform(self, data):
logger.debug("ResponseTransformer.transform")
try:
return build_image_info(data)
except TypeError:

View File

@ -2,8 +2,12 @@ import abc
from collections import namedtuple
from typing import Iterable
from image_prediction.utils import get_logger
ImageMetadataPair = namedtuple("ImageMetadataPair", ["image", "metadata"])
logger = get_logger()
class ImageExtractor(abc.ABC):
@abc.abstractmethod
@ -11,4 +15,5 @@ class ImageExtractor(abc.ABC):
raise NotImplementedError
def __call__(self, obj):
logger.debug("ImageExtractor.extract")
return self.extract(obj)

View File

@ -9,9 +9,6 @@ from tqdm import tqdm
from image_prediction.image_extractor.extractor import ImageExtractor, ImageMetadataPair
from image_prediction.info import Info
from image_prediction.utils import get_logger
logger = get_logger()
class ParsablePDFImageExtractor(ImageExtractor):
@ -50,8 +47,6 @@ class ParsablePDFImageExtractor(ImageExtractor):
return starmap(ImageMetadataPair, zip(images, metadata))
def extract(self, pdf: bytes):
logger.debug("Extracting")
self.doc = fitz.Document(stream=pdf)
image_metadata_pairs = chain.from_iterable(

View File

@ -1,44 +1,21 @@
import os
from funcy import rcompose, juxt
from funcy import rcompose
from image_prediction.classifier.classifier import Classifier
from image_prediction.classifier.image_classifier import ImageClassifier
from image_prediction.compositor.compositor import TransformerCompositor
from image_prediction.estimator.adapter.adapter import EstimatorAdapter
from image_prediction.extractor_classifier.extractor_classifier import ExtractorClassifier
from image_prediction.formatter.formatters.camel_case import Snake2CamelCaseKeyFormatter
from image_prediction.formatter.formatters.enum import EnumFormatter
from image_prediction.formatter.formatters.response import ResponseTransformer
from image_prediction.image_extractor.extractors.parsable import ParsablePDFImageExtractor
from image_prediction.label_mapper.mappers.probability import ProbabilityMapper
from image_prediction.config import CONFIG
from image_prediction.default_objects import get_extractor_classifier, get_formatter, get_mlflow_model_loader
from image_prediction.locations import MLRUNS_DIR
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
def get_image_classifier(model_loader, model_identifier):
model, classes = juxt(model_loader.load_model, model_loader.load_classes)(model_identifier)
return ImageClassifier(Classifier(EstimatorAdapter(model), ProbabilityMapper(classes)))
def load_pipeline(**kwargs):
model_loader = get_mlflow_model_loader(MLRUNS_DIR)
model_identifier = CONFIG.service.run_id
pipeline = Pipeline(model_loader, model_identifier, **kwargs)
def get_extractor(**kwargs):
image_extractor = ParsablePDFImageExtractor(**kwargs)
return image_extractor
def get_extractor_classifier(model_loader, model_identifier, **kwargs):
extractor_classifier = ExtractorClassifier(
get_extractor(**kwargs), get_image_classifier(model_loader, model_identifier)
)
return extractor_classifier
def get_formatter():
formatter = TransformerCompositor(EnumFormatter(), ResponseTransformer(), Snake2CamelCaseKeyFormatter())
return formatter
return pipeline
class Pipeline:

View File

@ -5,19 +5,19 @@ from image_prediction.config import CONFIG
def make_logger_getter():
logger = logging.getLogger("imclf")
logger.propagate = False
handler = logging.StreamHandler()
handler.setLevel(CONFIG.service.logging_level)
log_format = "[%(levelname)s]: %(message)s"
formatter = logging.Formatter(log_format)
handler.setFormatter(formatter)
logger.addHandler(handler)
def get_logger():
logger = logging.getLogger("imclf")
logger.propagate = False
handler = logging.StreamHandler()
handler.setLevel(CONFIG.service.logging_level)
log_format = "[%(levelname)s]: %(message)s"
formatter = logging.Formatter(log_format)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
return get_logger

View File

@ -1,8 +1,10 @@
import argparse
import json
from image_prediction.default_objects import load_pipeline
from image_prediction.config import CONFIG
from image_prediction.pipeline import load_pipeline
from image_prediction.utils import get_logger
import logging
def parse_args():
parser = argparse.ArgumentParser()
@ -14,18 +16,21 @@ def parse_args():
def main(args):
pipeline = load_pipeline()
pipeline = load_pipeline(verbose=True)
with open(args.pdf, "rb") as f:
predictions = pipeline(f.read())
with open("/tmp/f2dc689ca794fccb8cd38b95f2bf6ba9_predictions.json", "w") as f:
json.dump(list(predictions), f, indent=2)
for prd in predictions:
print(json.dumps(prd, indent=2))
if __name__ == "__main__":
# logging.basicConfig(level=logging.DEBUG)
logger = get_logger()
logger.info("111111111111111111111111111111111")
print(logger.level)
print(logging.DEBUG)
args = parse_args()
main(args)
# main(args)

View File

@ -3,8 +3,8 @@ import logging
from waitress import serve
from image_prediction.config import CONFIG
from image_prediction.default_objects import load_pipeline
from image_prediction.flask import make_prediction_server
from image_prediction.pipeline import load_pipeline
from image_prediction.utils import get_logger
from image_prediction.utils.banner import show_banner
@ -19,7 +19,7 @@ def main():
logger.debug("Loading pipeline...")
pipeline = load_pipeline(verbose=CONFIG.service.verbose)
logger.debug("Running pipeline...")
return pipeline(pdf)
return list(pipeline(pdf))
prediction_server = make_prediction_server(predict)
serve(prediction_server, host=CONFIG.webserver.host, port=CONFIG.webserver.port, _quiet=False)

View File

@ -1,4 +1,5 @@
import random
import json
import os
import random
import string
import tempfile
@ -28,9 +29,11 @@ from image_prediction.image_extractor.extractors.parsable import ParsablePDFImag
from image_prediction.info import Info
from image_prediction.label_mapper.mappers.numeric import IndexMapper
from image_prediction.label_mapper.mappers.probability import ProbabilityMapper, ProbabilityMapperKeys
from image_prediction.locations import TEST_DATA_DIR
from image_prediction.model_loader.database.connectors.mock import DatabaseConnectorMock
from image_prediction.model_loader.loader import ModelLoader
from image_prediction.model_loader.loaders.mlflow import MlflowConnector
from image_prediction.pipeline import load_pipeline
from image_prediction.redai_adapter.mlflow import MlflowModelReader
from image_prediction.redai_adapter.model import PredictionModelHandle
@ -92,7 +95,7 @@ def label_format(request):
@pytest.fixture
def expected_predictions_mapped(
label_format, batch_of_expected_string_labels, batch_of_expected_label_to_probability_mappings
label_format, batch_of_expected_string_labels, batch_of_expected_label_to_probability_mappings
):
if label_format == "index":
return batch_of_expected_string_labels
@ -114,7 +117,7 @@ def expected_predictions(label_format, batch_of_expected_numeric_labels, batch_o
@pytest.fixture
def estimator_adapter(
estimator_type, estimator_mock, keras_model, model_handle_mock, output_batch_generator, monkeypatch
estimator_type, estimator_mock, keras_model, model_handle_mock, output_batch_generator, monkeypatch
):
if estimator_type == "mock":
estimator_adapter = EstimatorAdapter(estimator_mock)
@ -412,3 +415,21 @@ def model_handle_mock(estimator_mock):
return [None for _ in batch]
return ModelHandleMock()
@pytest.fixture
def real_pdf():
with open(os.path.join(TEST_DATA_DIR, "f2dc689ca794fccb8cd38b95f2bf6ba9.pdf"), "rb") as f:
yield f.read()
@pytest.fixture
def real_expected_service_response():
with open(os.path.join(TEST_DATA_DIR, "f2dc689ca794fccb8cd38b95f2bf6ba9_predictions.json"), "r") as f:
yield json.load(f)
@pytest.fixture
def pipeline():
pipeline = load_pipeline(verbose=True)
return pipeline

View File

@ -3,7 +3,7 @@ from multiprocessing import Process
import pytest
import requests
from funcy import retry
from funcy import retry, compose
from waitress import serve
from image_prediction.flask import make_prediction_server
@ -30,10 +30,22 @@ def url(host, port):
return f"http://{host}:{port}"
@pytest.fixture(params=["dummy", "actual"])
def server_type(request):
return request.param
@pytest.fixture
def server():
server = make_prediction_server(lambda _: 42)
return server
def server(server_type, pipeline):
if server_type == "dummy":
return make_prediction_server(lambda x: int(x.decode()) // 2)
elif server_type == "actual":
return make_prediction_server(compose(list, pipeline))
else:
raise ValueError(f"Unknown server type {server_type}.")
@pytest.fixture
@ -64,17 +76,27 @@ def server_process(server, host_and_port, url):
server.close()
def test_server_predict(url):
response = requests.post(f"{url}/predict")
@pytest.mark.parametrize("server_type", ["actual"])
def test_server_predict(url, real_pdf, real_expected_service_response):
response = requests.post(f"{url}/predict", data=real_pdf)
response.raise_for_status()
assert response.json() == 42
assert response.json() == real_expected_service_response
@pytest.mark.parametrize("server_type", ["dummy"])
def test_server_dummy_operation(url):
response = requests.post(f"{url}/predict", data=b"42")
response.raise_for_status()
assert response.json() == 21
@pytest.mark.parametrize("server_type", ["dummy"])
def test_server_health_check(url):
response = requests.get(f"{url}/health")
response.raise_for_status()
assert response.status_code == 200
@pytest.mark.parametrize("server_type", ["dummy"])
def test_server_ready_check(url):
assert server_ready(url)

View File

@ -7,7 +7,7 @@ from image_prediction.flask import make_prediction_server
from image_prediction.utils import get_logger
logger = get_logger()
logger.setLevel(logging.CRITICAL + 1)
# logger.setLevel(logging.CRITICAL + 1)
def predict_fn(x: bytes):
@ -15,7 +15,7 @@ def predict_fn(x: bytes):
if x == 42:
return True
else:
raise Exception("intentional test exception")
raise RuntimeError("intentional test exception")
@pytest.fixture

View File

@ -1,18 +1,3 @@
import json
import os
from image_prediction.default_objects import load_pipeline
from image_prediction.locations import TEST_DATA_DIR
def test_pipeline():
pipeline = load_pipeline(verbose=False)
with open(os.path.join(TEST_DATA_DIR, "f2dc689ca794fccb8cd38b95f2bf6ba9.pdf"), "rb") as f:
predictions = list(pipeline(f.read()))
with open(os.path.join(TEST_DATA_DIR, "f2dc689ca794fccb8cd38b95f2bf6ba9_predictions.json"), "r") as f:
expectations = json.load(f)
assert predictions == expectations
def test_pipeline(pipeline, real_pdf, real_expected_service_response):
response = list(pipeline(real_pdf))
assert response == real_expected_service_response