Matthias Bisping 50b161192d refactoring
2022-04-21 21:20:18 +02:00

56 lines
2.0 KiB
Python

import os
from functools import partial
from itertools import chain, tee
from funcy import rcompose, first, compose, second, chunks, identity
from image_prediction.config import CONFIG
from image_prediction.default_objects import get_formatter, get_mlflow_model_loader, get_image_classifier, get_extractor
from image_prediction.locations import MLRUNS_DIR
from image_prediction.utils.generic import lift, starlift
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
def load_pipeline(**kwargs):
model_loader = get_mlflow_model_loader(MLRUNS_DIR)
model_identifier = CONFIG.service.run_id
pipeline = Pipeline(model_loader, model_identifier, progress_message="Processing document", **kwargs)
return pipeline
def parallel(*fs):
return lambda *args: (f(a) for f, a in zip(fs, args))
def splat(f):
return lambda x: f(*x)
class Pipeline:
def __init__(self, model_loader, model_identifier, batch_size=16, **kwargs):
extract = get_extractor(**kwargs)
classifier = get_image_classifier(model_loader, model_identifier)
reformat = get_formatter()
split = compose(splat(parallel(*map(lift, (first, second)))), tee)
classify = compose(chain.from_iterable, lift(classifier), partial(chunks, batch_size))
join = compose(starlift(lambda prd, mdt: {"classification": prd, **mdt}), splat(zip))
# +>--classify--v
# --extract image metadata pairs-->--split--| |--join-->format
# +>--identity--^
self.pipe = rcompose(
extract, # ... image-metadata-pairs as a stream
split, # ... into an image stream and a metadata stream
splat(parallel(classify, identity)), # ... process streams independently
join, # ... the streams
reformat, # ... the items
)
def __call__(self, pdf: bytes, page_range: range = None):
yield from self.pipe(pdf, page_range=page_range)