diff --git a/image_prediction/classifier/classifier.py b/image_prediction/classifier/classifier.py index b537aa1..8e752cf 100644 --- a/image_prediction/classifier/classifier.py +++ b/image_prediction/classifier/classifier.py @@ -24,10 +24,11 @@ class Classifier: self.__pipe = rcompose(self.__estimator_adapter, self.__label_mapper) def predict(self, batch: Union[np.array, Tuple[Image]]) -> List[str]: - if not isinstance(batch, tuple) and batch.shape[0] == 0: + + if isinstance(batch, np.ndarray) and batch.shape[0] == 0: return [] - return list(self.__pipe(batch)) + return self.__pipe(batch) def __call__(self, batch: np.array) -> List[str]: logger.debug("Classifier.predict") diff --git a/image_prediction/default_objects.py b/image_prediction/default_objects.py index 16ad466..97c089e 100644 --- a/image_prediction/default_objects.py +++ b/image_prediction/default_objects.py @@ -4,16 +4,15 @@ from image_prediction.classifier.classifier import Classifier from image_prediction.classifier.image_classifier import ImageClassifier from image_prediction.compositor.compositor import TransformerCompositor from image_prediction.estimator.adapter.adapter import EstimatorAdapter -from image_prediction.extractor_classifier.extractor_classifier import ExtractorClassifier from image_prediction.formatter.formatters.camel_case import Snake2CamelCaseKeyFormatter from image_prediction.formatter.formatters.enum import EnumFormatter -from image_prediction.transformer.transformers.response import ResponseTransformer from image_prediction.image_extractor.extractors.parsable import ParsablePDFImageExtractor from image_prediction.label_mapper.mappers.probability import ProbabilityMapper from image_prediction.model_loader.loader import ModelLoader from image_prediction.model_loader.loaders.mlflow import MlflowConnector from image_prediction.redai_adapter.mlflow import MlflowModelReader from image_prediction.transformer.transformers.coordinate.pdfnet import PDFNetCoordinateTransformer +from image_prediction.transformer.transformers.response import ResponseTransformer def get_mlflow_model_loader(mlruns_dir): @@ -32,14 +31,6 @@ def get_extractor(**kwargs): return image_extractor -def get_extractor_classifier(model_loader, model_identifier, **kwargs): - extractor_classifier = ExtractorClassifier( - get_extractor(**kwargs), get_image_classifier(model_loader, model_identifier) - ) - - return extractor_classifier - - def get_formatter(): formatter = TransformerCompositor( PDFNetCoordinateTransformer(), EnumFormatter(), ResponseTransformer(), Snake2CamelCaseKeyFormatter() diff --git a/image_prediction/extractor_classifier/__init__.py b/image_prediction/extractor_classifier/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/image_prediction/extractor_classifier/extractor_classifier.py b/image_prediction/extractor_classifier/extractor_classifier.py deleted file mode 100644 index 1b1b5b1..0000000 --- a/image_prediction/extractor_classifier/extractor_classifier.py +++ /dev/null @@ -1,33 +0,0 @@ -from itertools import chain -from typing import Iterable - -from funcy import chunks, rpartial - -from image_prediction.classifier.image_classifier import ImageClassifier -from image_prediction.image_extractor.extractor import ImageExtractor - - -class ExtractorClassifier: - """This class is responsible for orchestrating the pairing of classifications and image metadata. It extracts images - from an object and classifies them. Then it ties the classification together with the metadata. It returns an - iterable of dictionaries, where each dictionary has a field 'label' for the classification and possibly additional - fields for metadata -- metadata could be void. - """ - - def __init__(self, image_extractor: ImageExtractor, image_classifier: ImageClassifier): - self.classifier = image_classifier - self.extractor = image_extractor - - def __process_batch(self, batch, batch_size): - images, metadata = zip(*batch) - - predictions = self.classifier(images, batch_size) - responses = ({"classification": prd, **mdt} for prd, mdt in zip(predictions, metadata)) - return responses - - def __call__(self, obj, batch_size=16, **kwargs) -> Iterable[dict]: - - image_metadata_pairs = self.extractor(obj, **kwargs) - batches = chunks(batch_size, image_metadata_pairs) - predictions = chain.from_iterable(map(rpartial(self.__process_batch, batch_size), batches)) - yield from predictions diff --git a/image_prediction/image_extractor/extractors/parsable.py b/image_prediction/image_extractor/extractors/parsable.py index 3448f6f..00d3d1d 100644 --- a/image_prediction/image_extractor/extractors/parsable.py +++ b/image_prediction/image_extractor/extractors/parsable.py @@ -7,8 +7,7 @@ from typing import List import fitz from PIL import Image -from funcy import rcompose, merge, pluck, curry, compose, rpartial -from tqdm import tqdm +from funcy import rcompose, merge, pluck, curry, compose from image_prediction.image_extractor.extractor import ImageExtractor, ImageMetadataPair from image_prediction.info import Info @@ -35,18 +34,10 @@ class ParsablePDFImageExtractor(ImageExtractor): pages = extract_pages(self.doc, page_range) if page_range else self.doc - pages = self.__maybe_show_progress(pages, page_range) - image_metadata_pairs = chain.from_iterable(map(self.__process_images_on_page, pages)) yield from image_metadata_pairs - def __maybe_show_progress(self, iterable, page_range): - return self.__progressbar(page_range)(iterable) if self.verbose else iterable - - def __progressbar(self, page_range): - return rpartial(tqdm, desc=self.progress_message, total=len(page_range) if page_range else None) - def __process_images_on_page(self, page: fitz.fitz.Page): images = get_images_on_page(self.doc, page) metadata = get_metadata_for_images_on_page(self.doc, page) diff --git a/image_prediction/pipeline.py b/image_prediction/pipeline.py index fad4145..89785bc 100644 --- a/image_prediction/pipeline.py +++ b/image_prediction/pipeline.py @@ -1,10 +1,14 @@ import os +from functools import partial +from itertools import chain, tee -from funcy import rcompose +from funcy import rcompose, first, compose, second, chunks, identity +from tqdm import tqdm from image_prediction.config import CONFIG -from image_prediction.default_objects import get_extractor_classifier, get_formatter, get_mlflow_model_loader +from image_prediction.default_objects import get_formatter, get_mlflow_model_loader, get_image_classifier, get_extractor from image_prediction.locations import MLRUNS_DIR +from image_prediction.utils.generic import lift, starlift os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" @@ -18,9 +22,37 @@ def load_pipeline(**kwargs): return pipeline +def parallel(*fs): + return lambda *args: (f(a) for f, a in zip(fs, args)) + + +def star(f): + return lambda x: f(*x) + + class Pipeline: - def __init__(self, model_loader, model_identifier, **kwargs): - self.pipe = rcompose(get_extractor_classifier(model_loader, model_identifier, **kwargs), get_formatter()) + def __init__(self, model_loader, model_identifier, batch_size=16, **kwargs): + + extract = get_extractor(**kwargs) + classifier = get_image_classifier(model_loader, model_identifier) + reformat = get_formatter() + + split = compose(star(parallel(*map(lift, (first, second)))), tee) + classify = compose(chain.from_iterable, lift(classifier), partial(chunks, batch_size)) + pairwise_apply = compose(star, parallel) + join = compose(starlift(lambda prd, mdt: {"classification": prd, **mdt}), star(zip)) + + # +>--classify--v + # --extract-->--split--| |--join-->reformat + # +>--identity--^ + + self.pipe = rcompose( + extract, # ... image-metadata-pairs as a stream + split, # ... into an image stream and a metadata stream + pairwise_apply(classify, identity), # ... apply functions to the streams pairwise + join, # ... the streams by zipping + reformat, # ... the items + ) def __call__(self, pdf: bytes, page_range: range = None): - yield from self.pipe(pdf, page_range=page_range) + yield from tqdm(self.pipe(pdf, page_range=page_range), desc="Processing images from document", unit=" images") diff --git a/image_prediction/utils/generic.py b/image_prediction/utils/generic.py index 9b25640..de71a5c 100644 --- a/image_prediction/utils/generic.py +++ b/image_prediction/utils/generic.py @@ -1,3 +1,5 @@ +from itertools import starmap + from funcy import iterate, first, curry, map @@ -7,3 +9,7 @@ def until(cond, func, *args, **kwargs): def lift(fn): return curry(map)(fn) + + +def starlift(fn): + return curry(starmap)(fn) diff --git a/test/fixtures/extractor.py b/test/fixtures/extractor.py index d51e1bd..8aa6db2 100644 --- a/test/fixtures/extractor.py +++ b/test/fixtures/extractor.py @@ -14,4 +14,4 @@ def image_extractor(extractor_type): elif extractor_type == "default": return None else: - raise UnknownImageExtractor(f"No image extractor for type {extractor_type} was specified.") \ No newline at end of file + raise UnknownImageExtractor(f"No image extractor for type {extractor_type} was specified.") diff --git a/test/unit_tests/classifier_test.py b/test/unit_tests/classifier_test.py index c0e2baf..eaea50b 100644 --- a/test/unit_tests/classifier_test.py +++ b/test/unit_tests/classifier_test.py @@ -4,7 +4,7 @@ import pytest @pytest.mark.parametrize("estimator_type", ["mock", "keras", "redai"]) @pytest.mark.parametrize("label_format", ["index", "probability"]) def test_classifier(classifier, input_batch, expected_predictions_mapped): - predictions = classifier(input_batch) + predictions = list(classifier(input_batch)) assert predictions == expected_predictions_mapped diff --git a/test/unit_tests/extractor_classifier_test.py b/test/unit_tests/extractor_classifier_test.py deleted file mode 100644 index 3153ef4..0000000 --- a/test/unit_tests/extractor_classifier_test.py +++ /dev/null @@ -1,14 +0,0 @@ -from operator import itemgetter - -import pytest - -from image_prediction.extractor_classifier.extractor_classifier import ExtractorClassifier - - -@pytest.mark.parametrize("extractor_type", ["mock"]) -@pytest.mark.parametrize("estimator_type", ["mock", "keras"]) -def test_extractor_classifier(image_extractor, image_classifier, images, batch_of_expected_string_labels): - extractor_classifier = ExtractorClassifier(image_extractor, image_classifier) - results = extractor_classifier(images) - labels = list(map(itemgetter("classification"), results)) - assert labels == batch_of_expected_string_labels diff --git a/test/unit_tests/image_extractor_test.py b/test/unit_tests/image_extractor_test.py index 9c3d03d..e52b2b5 100644 --- a/test/unit_tests/image_extractor_test.py +++ b/test/unit_tests/image_extractor_test.py @@ -11,8 +11,8 @@ from image_prediction.extraction import extract_images_from_pdf from image_prediction.image_extractor.extractor import ImageMetadataPair from image_prediction.image_extractor.extractors.parsable import extract_pages, get_image_infos, has_alpha_channel from image_prediction.info import Info +from test.utils.comparison import metadata_equal, image_sets_equal from test.utils.generation.pdf import add_image, pdf_stream -from test.utils.comparison import images_equal, metadata_equal, image_sets_equal @pytest.mark.parametrize("extractor_type", ["mock"]) diff --git a/test/unit_tests/mocked_server_test.py b/test/unit_tests/mocked_server_test.py index 8b8a692..d64c937 100644 --- a/test/unit_tests/mocked_server_test.py +++ b/test/unit_tests/mocked_server_test.py @@ -37,12 +37,12 @@ def test_server_predict_failure(client, mute_logger): def test_server_health_check(client): - response = client.get("/ready") + response = client.get("/health") assert response.status_code == 200 assert response.json == "OK" def test_server_ready_check(client): - response = client.get("/health") + response = client.get("/ready") assert response.status_code == 200 assert response.json == "OK"