Pull request #22: Integrate image extraction new pyinfra

Merge in RR/image-prediction from integrate-image-extraction-new-pyinfra to master

Squashed commit of the following:

commit 8470c065c71ea2a985aadfc399fb32c693e3a90f
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Thu Aug 18 09:19:52 2022 +0200

    add key script

commit 8f6eb1e79083fb32fb7bedac640c10b6fd411899
Merge: 27fd7de c1b9629
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Thu Aug 18 09:17:50 2022 +0200

    Merge branch 'master' of ssh://git.iqser.com:2222/rr/image-prediction into integrate-image-extraction-new-pyinfra

commit 27fd7de39a59d0d88fbddb471dd7797b61223ece
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Wed Aug 17 13:15:09 2022 +0200

    update pyinfra

commit ca58f85642598dc15e286074982e7cedae9a1355
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Tue Aug 16 16:16:10 2022 +0200

    update pdf2image-service

commit f43795cee0e211e14ac5f9296b01d440ae759c55
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Mon Aug 15 10:32:02 2022 +0200

    update pipeline script to also work with figure detection metadata

commit 2b2da1b60ce56fb006cf2f6b65aeda9774391b2a
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Fri Aug 12 13:37:48 2022 +0200

    add new pyinfra, add optional image classifcation under key dataCV if figure metadata is present on storage

commit bae25bedbd3a262a9d00e18a1b19f4ee6f1eb924
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Wed Aug 10 13:27:41 2022 +0200

    tidy-up

commit 287b0ebc8a952e506185d13508eaa386d0420704
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Wed Aug 10 12:57:35 2022 +0200

    update server logic for new pyinfra, add extraction from scanned PDF with figure detection logic

commit 3225cefaa25e4559b105397bc06c867a22806ba8
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Wed Aug 10 10:37:31 2022 +0200

    integrate new pyinfra logic

commit 46926078342b0680a7416560bb69bec037cf8038
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Wed Aug 3 13:15:27 2022 +0200

    add image extraction for scanned PDFs WIP

commit 1b3b11b6f9044d44cb9a822a78197a2ebc6f306a
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Wed Aug 3 09:41:06 2022 +0200

    add pyinfra and pdf2image as git submodule
This commit is contained in:
Julius Unverfehrt 2022-08-18 09:20:48 +02:00
parent c1b96290df
commit 520eee26e3
10 changed files with 124 additions and 37 deletions

6
.gitmodules vendored
View File

@ -0,0 +1,6 @@
[submodule "incl/pyinfra"]
path = incl/pyinfra
url = ssh://git@git.iqser.com:2222/rr/pyinfra.git
[submodule "incl/pdf2image"]
path = incl/pdf2image
url = ssh://git@git.iqser.com:2222/rr/pdf2image.git

View File

@ -0,0 +1,8 @@
#!/bin/bash
set -e
mkdir -p ~/.ssh
echo "${bamboo.bamboo_agent_ssh}" | base64 -d >> ~/.ssh/id_rsa
echo "host vector.iqser.com" > ~/.ssh/config
echo " user bamboo-agent" >> ~/.ssh/config
chmod 600 ~/.ssh/config ~/.ssh/id_rsa

View File

@ -1,3 +1,5 @@
from typing import Iterable
from funcy import juxt
from image_prediction.classifier.classifier import Classifier
@ -5,6 +7,7 @@ from image_prediction.classifier.image_classifier import ImageClassifier
from image_prediction.compositor.compositor import TransformerCompositor
from image_prediction.encoder.encoders.hash_encoder import HashEncoder
from image_prediction.estimator.adapter.adapter import EstimatorAdapter
from image_prediction.formatter.formatter import format_image_plus
from image_prediction.formatter.formatters.camel_case import Snake2CamelCaseKeyFormatter
from image_prediction.formatter.formatters.enum import EnumFormatter
from image_prediction.image_extractor.extractors.parsable import ParsablePDFImageExtractor
@ -14,6 +17,7 @@ from image_prediction.model_loader.loaders.mlflow import MlflowConnector
from image_prediction.redai_adapter.mlflow import MlflowModelReader
from image_prediction.transformer.transformers.coordinate.pdfnet import PDFNetCoordinateTransformer
from image_prediction.transformer.transformers.response import ResponseTransformer
from pdf2img.extraction import extract_images_via_metadata
def get_mlflow_model_loader(mlruns_dir):
@ -26,10 +30,17 @@ def get_image_classifier(model_loader, model_identifier):
return ImageClassifier(Classifier(EstimatorAdapter(model), ProbabilityMapper(classes)))
def get_extractor(**kwargs):
def get_dispatched_extract(**kwargs):
image_extractor = ParsablePDFImageExtractor(**kwargs)
return image_extractor
def extract(pdf: bytes, page_range: range = None, metadata_per_image: Iterable[dict] = None):
if metadata_per_image:
image_pluses = extract_images_via_metadata(pdf, metadata_per_image)
yield from map(format_image_plus, image_pluses)
else:
yield from image_extractor.extract(pdf, page_range)
return extract
def get_formatter():

View File

@ -1,6 +1,10 @@
import abc
from image_prediction.image_extractor.extractor import ImageMetadataPair
from image_prediction.info import Info
from image_prediction.transformer.transformer import Transformer
from pdf2img.default_objects.image import ImagePlus
class Formatter(Transformer):
@ -13,3 +17,19 @@ class Formatter(Transformer):
def __call__(self, obj):
return self.format(obj)
def format_image_plus(image: ImagePlus) -> ImageMetadataPair:
enum_metadata = {
Info.PAGE_WIDTH: image.info.pageInfo.width,
Info.PAGE_HEIGHT: image.info.pageInfo.height,
Info.PAGE_IDX: image.info.pageInfo.number,
Info.ALPHA: image.info.alpha,
Info.WIDTH: image.info.boundingBox.width,
Info.HEIGHT: image.info.boundingBox.height,
Info.X1: image.info.boundingBox.x0,
Info.X2: image.info.boundingBox.x1,
Info.Y1: image.info.boundingBox.y0,
Info.Y2: image.info.boundingBox.y1,
}
return ImageMetadataPair(image.aspil(), enum_metadata)

View File

@ -1,6 +1,7 @@
import os
from functools import partial
from itertools import chain, tee
from typing import Iterable
from funcy import rcompose, first, compose, second, chunks, identity, rpartial
from tqdm import tqdm
@ -10,8 +11,8 @@ from image_prediction.default_objects import (
get_formatter,
get_mlflow_model_loader,
get_image_classifier,
get_extractor,
get_encoder,
get_dispatched_extract,
)
from image_prediction.locations import MLRUNS_DIR
from image_prediction.utils.generic import lift, starlift
@ -40,7 +41,7 @@ class Pipeline:
def __init__(self, model_loader, model_identifier, batch_size=16, verbose=True, **kwargs):
self.verbose = verbose
extract = get_extractor(**kwargs)
extract = get_dispatched_extract(**kwargs)
classifier = get_image_classifier(model_loader, model_identifier)
reformat = get_formatter()
represent = get_encoder()
@ -62,9 +63,9 @@ class Pipeline:
reformat, # ... the items
)
def __call__(self, pdf: bytes, page_range: range = None):
def __call__(self, pdf: bytes, page_range: range = None, metadata_per_image: Iterable[dict] = None):
yield from tqdm(
self.pipe(pdf, page_range=page_range),
self.pipe(pdf, page_range=page_range, metadata_per_image=metadata_per_image),
desc="Processing images from document",
unit=" images",
disable=not self.verbose,

View File

@ -4,8 +4,7 @@ from image_prediction.locations import BANNER_FILE
def show_banner():
with open(BANNER_FILE) as f:
banner = "\n" + "".join(f.readlines()) + "\n"
banner = load_banner()
logger = logging.getLogger(__name__)
logger.propagate = False
@ -19,3 +18,9 @@ def show_banner():
logger.addHandler(handler)
logger.info(banner)
def load_banner():
with open(BANNER_FILE) as f:
banner = "\n" + "".join(f.readlines()) + "\n"
return banner

1
incl/pdf2image Submodule

@ -0,0 +1 @@
Subproject commit 9bb5a86310f065b852e16679cf37d5c939c0cacd

1
incl/pyinfra Submodule

@ -0,0 +1 @@
Subproject commit be82114f8302ffedecf950c6ca9fecf01ece5573

View File

@ -2,6 +2,7 @@ import argparse
import json
import os
from glob import glob
from operator import truth
from image_prediction.pipeline import load_pipeline
from image_prediction.utils import get_logger
@ -14,6 +15,7 @@ def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("input", help="pdf file or directory")
parser.add_argument("--metadata", help="optional figure detection metadata")
parser.add_argument("--print", "-p", help="print output to terminal", action="store_true", default=False)
parser.add_argument("--page_interval", "-i", help="page interval [i, j), min index = 0", nargs=2, type=int)
@ -22,13 +24,17 @@ def parse_args():
return args
def process_pdf(pipeline, pdf_path, page_range=None):
def process_pdf(pipeline, pdf_path, metadata=None, page_range=None):
if metadata:
with open(metadata) as f:
metadata = json.load(f)
with open(pdf_path, "rb") as f:
logger.info(f"Processing {pdf_path}")
predictions = list(pipeline(f.read(), page_range=page_range))
predictions = list(pipeline(f.read(), page_range=page_range, metadata_per_image=metadata))
annotate_pdf(
pdf_path, predictions, os.path.join("/tmp", os.path.basename(pdf_path.replace(".pdf", "_annotated.pdf")))
pdf_path, predictions, os.path.join("/tmp", os.path.basename(pdf_path.replace(".pdf", f"_{truth(metadata)}_annotated.pdf")))
)
return predictions
@ -42,9 +48,10 @@ def main(args):
else:
pdf_paths = glob(os.path.join(args.input, "*.pdf"))
page_range = range(*args.page_interval) if args.page_interval else None
metadata = args.metadata if args.metadata else None
for pdf_path in pdf_paths:
predictions = process_pdf(pipeline, pdf_path, page_range=page_range)
predictions = process_pdf(pipeline, pdf_path, metadata, page_range=page_range)
if args.print:
print(pdf_path)
print(json.dumps(predictions, indent=2))

View File

@ -1,36 +1,63 @@
import gzip
import io
import json
import logging
from waitress import serve
from image_prediction.config import CONFIG
from image_prediction.flask import make_prediction_server
from image_prediction.config import Config
from image_prediction.locations import CONFIG_FILE
from image_prediction.pipeline import load_pipeline
from image_prediction.utils import get_logger
from image_prediction.utils.banner import show_banner
from image_prediction.utils.banner import load_banner
from pyinfra import config
from pyinfra.queue.queue_manager import QueueManager
from pyinfra.storage.storage import get_storage
PYINFRA_CONFIG = config.get_config()
IMAGE_CONFIG = Config(CONFIG_FILE)
logging.getLogger().addHandler(logging.StreamHandler())
logger = logging.getLogger("main")
logger.setLevel(PYINFRA_CONFIG.logging_level_root)
def process_request(request_message):
pipeline = load_pipeline(verbose=IMAGE_CONFIG.service.verbose, batch_size=IMAGE_CONFIG.service.batch_size)
target_file_extension = request_message["targetFileExtension"]
dossier_id = request_message["dossierId"]
file_id = request_message["fileId"]
storage = get_storage(PYINFRA_CONFIG)
object_bytes = storage.get_object(PYINFRA_CONFIG.storage_bucket, f"{dossier_id}/{file_id}.{target_file_extension}")
object_bytes = gzip.decompress(object_bytes)
if storage.exists(PYINFRA_CONFIG.storage_bucket, f"{dossier_id}/{file_id}.FIGURE.json.gz"):
metadata_bytes = storage.get_object(PYINFRA_CONFIG.storage_bucket, f"{dossier_id}/{file_id}.FIGURE.json.gz")
metadata_bytes = gzip.decompress(metadata_bytes)
metadata_per_image = json.load(io.BytesIO(metadata_bytes))["data"]
classifications_cv = list(pipeline(pdf=object_bytes, metadata_per_image=metadata_per_image))
else:
classifications_cv = []
classifications = list(pipeline(pdf=object_bytes))
result = {**request_message, "data": classifications, "dataCV": classifications_cv}
response_file_extension = request_message["responseFileExtension"]
storage_bytes = gzip.compress(json.dumps(result).encode("utf-8"))
storage.put_object(
PYINFRA_CONFIG.storage_bucket, f"{dossier_id}/{file_id}.{response_file_extension}", storage_bytes
)
return {"dossierId": dossier_id, "fileId": file_id}
def main():
logger.info(load_banner())
def predict(pdf):
# Keras service_estimator.predict stalls when service_estimator was loaded in different process;
# therefore, we re-load the model (part of the pipeline) every time we process a new document.
# https://stackoverflow.com/questions/42504669/keras-tensorflow-and-multiprocessing-in-python
logger.debug("Loading pipeline...")
pipeline = load_pipeline(verbose=CONFIG.service.verbose, batch_size=CONFIG.service.batch_size)
logger.debug("Running pipeline...")
return list(pipeline(pdf))
prediction_server = make_prediction_server(predict)
serve(prediction_server, host=CONFIG.webserver.host, port=CONFIG.webserver.port, _quiet=False)
queue_manager = QueueManager(PYINFRA_CONFIG)
queue_manager.start_consuming(process_request)
if __name__ == "__main__":
logging.basicConfig(level=CONFIG.service.logging_level)
logging.getLogger("PIL").setLevel(logging.ERROR)
logging.getLogger("h5py").setLevel(logging.ERROR)
logging.getLogger("pillow").setLevel(logging.ERROR)
logger = get_logger()
show_banner()
main()