Matthias Bisping 26ef5fce8a Pull request #8: Pipeline refactoring
Merge in RR/image-prediction from pipeline_refactoring to tdd_refactoring

Squashed commit of the following:

commit 6989fcb3313007b7eecf4bba39077fcde6924a9a
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Apr 25 09:49:49 2022 +0200

    removed obsolete module

commit 7428aeee37b11c31cffa597c85b018ba71e79a1d
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Apr 25 09:45:45 2022 +0200

    refactoring

commit 0dcd3894154fdf34bd3ba4ef816362434474f472
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Apr 25 08:57:21 2022 +0200

    refactoring; removed obsolete extractor-classifier

commit 1078aa81144f4219149b3fcacdae8b09c4b905c0
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Fri Apr 22 17:18:10 2022 +0200

    removed obsolete imports

commit 71f61fc5fc915da3941cf5ed5d9cc90fccc49031
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Fri Apr 22 17:16:25 2022 +0200

    comment changed

commit b582726cd1de233edb55c5a76c91e99f9dd3bd13
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Fri Apr 22 17:12:11 2022 +0200

    refactoring

commit 8abc9010048078868b235d6793ac6c8b20abb985
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Apr 21 21:25:47 2022 +0200

    formatting

commit 2c87c419fe3185a25c27139e7fcf79f60971ad24
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Apr 21 21:24:05 2022 +0200

    formatting

commit 50b161192db43a84464125c6d79650225e1010d6
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Apr 21 21:20:18 2022 +0200

    refactoring

commit 9a1446cccfa070852a5d9c0bdbc36037b82541fc
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Apr 21 21:04:57 2022 +0200

    refactoring

commit 6c10b55ff8e61412cb2fe5a5625e660ecaf1d7d1
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Apr 21 19:48:05 2022 +0200

    refactoring

commit 72e785e3e31c132ab352119e9921725f91fac9e2
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Apr 21 19:43:39 2022 +0200

    refactoring

commit f036ee55e6747daf31e3929bdc2d93dc5f2a56ca
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Apr 20 18:30:41 2022 +0200

    refactoring pipeline WIP
2022-04-25 10:08:49 +02:00

59 lines
2.1 KiB
Python

import os
from functools import partial
from itertools import chain, tee
from funcy import rcompose, first, compose, second, chunks, identity
from tqdm import tqdm
from image_prediction.config import CONFIG
from image_prediction.default_objects import get_formatter, get_mlflow_model_loader, get_image_classifier, get_extractor
from image_prediction.locations import MLRUNS_DIR
from image_prediction.utils.generic import lift, starlift
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
def load_pipeline(**kwargs):
model_loader = get_mlflow_model_loader(MLRUNS_DIR)
model_identifier = CONFIG.service.run_id
pipeline = Pipeline(model_loader, model_identifier, progress_message="Processing document", **kwargs)
return pipeline
def parallel(*fs):
return lambda *args: (f(a) for f, a in zip(fs, args))
def star(f):
return lambda x: f(*x)
class Pipeline:
def __init__(self, model_loader, model_identifier, batch_size=16, **kwargs):
extract = get_extractor(**kwargs)
classifier = get_image_classifier(model_loader, model_identifier)
reformat = get_formatter()
split = compose(star(parallel(*map(lift, (first, second)))), tee)
classify = compose(chain.from_iterable, lift(classifier), partial(chunks, batch_size))
pairwise_apply = compose(star, parallel)
join = compose(starlift(lambda prd, mdt: {"classification": prd, **mdt}), star(zip))
# +>--classify--v
# --extract-->--split--| |--join-->reformat
# +>--identity--^
self.pipe = rcompose(
extract, # ... image-metadata-pairs as a stream
split, # ... into an image stream and a metadata stream
pairwise_apply(classify, identity), # ... apply functions to the streams pairwise
join, # ... the streams by zipping
reformat, # ... the items
)
def __call__(self, pdf: bytes, page_range: range = None):
yield from tqdm(self.pipe(pdf, page_range=page_range), desc="Processing images from document", unit=" images")