diff --git a/.gitmodules b/.gitmodules index e69de29..6f15bbc 100644 --- a/.gitmodules +++ b/.gitmodules @@ -0,0 +1,6 @@ +[submodule "incl/pyinfra"] + path = incl/pyinfra + url = ssh://git@git.iqser.com:2222/rr/pyinfra.git +[submodule "incl/pdf2image"] + path = incl/pdf2image + url = ssh://git@git.iqser.com:2222/rr/pdf2image.git diff --git a/bamboo-specs/src/main/resources/scripts/key-prepare.sh b/bamboo-specs/src/main/resources/scripts/key-prepare.sh new file mode 100755 index 0000000..715b13a --- /dev/null +++ b/bamboo-specs/src/main/resources/scripts/key-prepare.sh @@ -0,0 +1,8 @@ +#!/bin/bash +set -e + +mkdir -p ~/.ssh +echo "${bamboo.bamboo_agent_ssh}" | base64 -d >> ~/.ssh/id_rsa +echo "host vector.iqser.com" > ~/.ssh/config +echo " user bamboo-agent" >> ~/.ssh/config +chmod 600 ~/.ssh/config ~/.ssh/id_rsa \ No newline at end of file diff --git a/image_prediction/default_objects.py b/image_prediction/default_objects.py index 1c40d56..d66d477 100644 --- a/image_prediction/default_objects.py +++ b/image_prediction/default_objects.py @@ -1,3 +1,5 @@ +from typing import Iterable + from funcy import juxt from image_prediction.classifier.classifier import Classifier @@ -5,6 +7,7 @@ from image_prediction.classifier.image_classifier import ImageClassifier from image_prediction.compositor.compositor import TransformerCompositor from image_prediction.encoder.encoders.hash_encoder import HashEncoder from image_prediction.estimator.adapter.adapter import EstimatorAdapter +from image_prediction.formatter.formatter import format_image_plus from image_prediction.formatter.formatters.camel_case import Snake2CamelCaseKeyFormatter from image_prediction.formatter.formatters.enum import EnumFormatter from image_prediction.image_extractor.extractors.parsable import ParsablePDFImageExtractor @@ -14,6 +17,7 @@ from image_prediction.model_loader.loaders.mlflow import MlflowConnector from image_prediction.redai_adapter.mlflow import MlflowModelReader from image_prediction.transformer.transformers.coordinate.pdfnet import PDFNetCoordinateTransformer from image_prediction.transformer.transformers.response import ResponseTransformer +from pdf2img.extraction import extract_images_via_metadata def get_mlflow_model_loader(mlruns_dir): @@ -26,10 +30,17 @@ def get_image_classifier(model_loader, model_identifier): return ImageClassifier(Classifier(EstimatorAdapter(model), ProbabilityMapper(classes))) -def get_extractor(**kwargs): +def get_dispatched_extract(**kwargs): image_extractor = ParsablePDFImageExtractor(**kwargs) - return image_extractor + def extract(pdf: bytes, page_range: range = None, metadata_per_image: Iterable[dict] = None): + if metadata_per_image: + image_pluses = extract_images_via_metadata(pdf, metadata_per_image) + yield from map(format_image_plus, image_pluses) + else: + yield from image_extractor.extract(pdf, page_range) + + return extract def get_formatter(): diff --git a/image_prediction/formatter/formatter.py b/image_prediction/formatter/formatter.py index 3f3a1f8..53306a9 100644 --- a/image_prediction/formatter/formatter.py +++ b/image_prediction/formatter/formatter.py @@ -1,6 +1,10 @@ import abc +from image_prediction.image_extractor.extractor import ImageMetadataPair +from image_prediction.info import Info + from image_prediction.transformer.transformer import Transformer +from pdf2img.default_objects.image import ImagePlus class Formatter(Transformer): @@ -13,3 +17,19 @@ class Formatter(Transformer): def __call__(self, obj): return self.format(obj) + + +def format_image_plus(image: ImagePlus) -> ImageMetadataPair: + enum_metadata = { + Info.PAGE_WIDTH: image.info.pageInfo.width, + Info.PAGE_HEIGHT: image.info.pageInfo.height, + Info.PAGE_IDX: image.info.pageInfo.number, + Info.ALPHA: image.info.alpha, + Info.WIDTH: image.info.boundingBox.width, + Info.HEIGHT: image.info.boundingBox.height, + Info.X1: image.info.boundingBox.x0, + Info.X2: image.info.boundingBox.x1, + Info.Y1: image.info.boundingBox.y0, + Info.Y2: image.info.boundingBox.y1, + } + return ImageMetadataPair(image.aspil(), enum_metadata) diff --git a/image_prediction/pipeline.py b/image_prediction/pipeline.py index 6d29ac7..f9383a1 100644 --- a/image_prediction/pipeline.py +++ b/image_prediction/pipeline.py @@ -1,6 +1,7 @@ import os from functools import partial from itertools import chain, tee +from typing import Iterable from funcy import rcompose, first, compose, second, chunks, identity, rpartial from tqdm import tqdm @@ -10,8 +11,8 @@ from image_prediction.default_objects import ( get_formatter, get_mlflow_model_loader, get_image_classifier, - get_extractor, get_encoder, + get_dispatched_extract, ) from image_prediction.locations import MLRUNS_DIR from image_prediction.utils.generic import lift, starlift @@ -40,7 +41,7 @@ class Pipeline: def __init__(self, model_loader, model_identifier, batch_size=16, verbose=True, **kwargs): self.verbose = verbose - extract = get_extractor(**kwargs) + extract = get_dispatched_extract(**kwargs) classifier = get_image_classifier(model_loader, model_identifier) reformat = get_formatter() represent = get_encoder() @@ -62,9 +63,9 @@ class Pipeline: reformat, # ... the items ) - def __call__(self, pdf: bytes, page_range: range = None): + def __call__(self, pdf: bytes, page_range: range = None, metadata_per_image: Iterable[dict] = None): yield from tqdm( - self.pipe(pdf, page_range=page_range), + self.pipe(pdf, page_range=page_range, metadata_per_image=metadata_per_image), desc="Processing images from document", unit=" images", disable=not self.verbose, diff --git a/image_prediction/utils/banner.py b/image_prediction/utils/banner.py index 6a17d93..1cf5ede 100644 --- a/image_prediction/utils/banner.py +++ b/image_prediction/utils/banner.py @@ -4,8 +4,7 @@ from image_prediction.locations import BANNER_FILE def show_banner(): - with open(BANNER_FILE) as f: - banner = "\n" + "".join(f.readlines()) + "\n" + banner = load_banner() logger = logging.getLogger(__name__) logger.propagate = False @@ -19,3 +18,9 @@ def show_banner(): logger.addHandler(handler) logger.info(banner) + + +def load_banner(): + with open(BANNER_FILE) as f: + banner = "\n" + "".join(f.readlines()) + "\n" + return banner diff --git a/incl/pdf2image b/incl/pdf2image new file mode 160000 index 0000000..9bb5a86 --- /dev/null +++ b/incl/pdf2image @@ -0,0 +1 @@ +Subproject commit 9bb5a86310f065b852e16679cf37d5c939c0cacd diff --git a/incl/pyinfra b/incl/pyinfra new file mode 160000 index 0000000..be82114 --- /dev/null +++ b/incl/pyinfra @@ -0,0 +1 @@ +Subproject commit be82114f8302ffedecf950c6ca9fecf01ece5573 diff --git a/scripts/run_pipeline.py b/scripts/run_pipeline.py index c2b4bb0..29d3199 100644 --- a/scripts/run_pipeline.py +++ b/scripts/run_pipeline.py @@ -2,6 +2,7 @@ import argparse import json import os from glob import glob +from operator import truth from image_prediction.pipeline import load_pipeline from image_prediction.utils import get_logger @@ -14,6 +15,7 @@ def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("input", help="pdf file or directory") + parser.add_argument("--metadata", help="optional figure detection metadata") parser.add_argument("--print", "-p", help="print output to terminal", action="store_true", default=False) parser.add_argument("--page_interval", "-i", help="page interval [i, j), min index = 0", nargs=2, type=int) @@ -22,13 +24,17 @@ def parse_args(): return args -def process_pdf(pipeline, pdf_path, page_range=None): +def process_pdf(pipeline, pdf_path, metadata=None, page_range=None): + if metadata: + with open(metadata) as f: + metadata = json.load(f) + with open(pdf_path, "rb") as f: logger.info(f"Processing {pdf_path}") - predictions = list(pipeline(f.read(), page_range=page_range)) + predictions = list(pipeline(f.read(), page_range=page_range, metadata_per_image=metadata)) annotate_pdf( - pdf_path, predictions, os.path.join("/tmp", os.path.basename(pdf_path.replace(".pdf", "_annotated.pdf"))) + pdf_path, predictions, os.path.join("/tmp", os.path.basename(pdf_path.replace(".pdf", f"_{truth(metadata)}_annotated.pdf"))) ) return predictions @@ -42,9 +48,10 @@ def main(args): else: pdf_paths = glob(os.path.join(args.input, "*.pdf")) page_range = range(*args.page_interval) if args.page_interval else None + metadata = args.metadata if args.metadata else None for pdf_path in pdf_paths: - predictions = process_pdf(pipeline, pdf_path, page_range=page_range) + predictions = process_pdf(pipeline, pdf_path, metadata, page_range=page_range) if args.print: print(pdf_path) print(json.dumps(predictions, indent=2)) diff --git a/src/serve.py b/src/serve.py index 37a7906..719e88e 100644 --- a/src/serve.py +++ b/src/serve.py @@ -1,36 +1,63 @@ +import gzip +import io +import json import logging -from waitress import serve - -from image_prediction.config import CONFIG -from image_prediction.flask import make_prediction_server +from image_prediction.config import Config +from image_prediction.locations import CONFIG_FILE from image_prediction.pipeline import load_pipeline -from image_prediction.utils import get_logger -from image_prediction.utils.banner import show_banner +from image_prediction.utils.banner import load_banner +from pyinfra import config +from pyinfra.queue.queue_manager import QueueManager +from pyinfra.storage.storage import get_storage + +PYINFRA_CONFIG = config.get_config() +IMAGE_CONFIG = Config(CONFIG_FILE) + +logging.getLogger().addHandler(logging.StreamHandler()) +logger = logging.getLogger("main") +logger.setLevel(PYINFRA_CONFIG.logging_level_root) + + +def process_request(request_message): + pipeline = load_pipeline(verbose=IMAGE_CONFIG.service.verbose, batch_size=IMAGE_CONFIG.service.batch_size) + + target_file_extension = request_message["targetFileExtension"] + dossier_id = request_message["dossierId"] + file_id = request_message["fileId"] + + storage = get_storage(PYINFRA_CONFIG) + + object_bytes = storage.get_object(PYINFRA_CONFIG.storage_bucket, f"{dossier_id}/{file_id}.{target_file_extension}") + object_bytes = gzip.decompress(object_bytes) + + if storage.exists(PYINFRA_CONFIG.storage_bucket, f"{dossier_id}/{file_id}.FIGURE.json.gz"): + metadata_bytes = storage.get_object(PYINFRA_CONFIG.storage_bucket, f"{dossier_id}/{file_id}.FIGURE.json.gz") + metadata_bytes = gzip.decompress(metadata_bytes) + metadata_per_image = json.load(io.BytesIO(metadata_bytes))["data"] + classifications_cv = list(pipeline(pdf=object_bytes, metadata_per_image=metadata_per_image)) + else: + classifications_cv = [] + + classifications = list(pipeline(pdf=object_bytes)) + + result = {**request_message, "data": classifications, "dataCV": classifications_cv} + + response_file_extension = request_message["responseFileExtension"] + storage_bytes = gzip.compress(json.dumps(result).encode("utf-8")) + storage.put_object( + PYINFRA_CONFIG.storage_bucket, f"{dossier_id}/{file_id}.{response_file_extension}", storage_bytes + ) + + return {"dossierId": dossier_id, "fileId": file_id} def main(): + logger.info(load_banner()) - def predict(pdf): - # Keras service_estimator.predict stalls when service_estimator was loaded in different process; - # therefore, we re-load the model (part of the pipeline) every time we process a new document. - # https://stackoverflow.com/questions/42504669/keras-tensorflow-and-multiprocessing-in-python - logger.debug("Loading pipeline...") - pipeline = load_pipeline(verbose=CONFIG.service.verbose, batch_size=CONFIG.service.batch_size) - logger.debug("Running pipeline...") - return list(pipeline(pdf)) - - prediction_server = make_prediction_server(predict) - serve(prediction_server, host=CONFIG.webserver.host, port=CONFIG.webserver.port, _quiet=False) + queue_manager = QueueManager(PYINFRA_CONFIG) + queue_manager.start_consuming(process_request) if __name__ == "__main__": - logging.basicConfig(level=CONFIG.service.logging_level) - logging.getLogger("PIL").setLevel(logging.ERROR) - logging.getLogger("h5py").setLevel(logging.ERROR) - logging.getLogger("pillow").setLevel(logging.ERROR) - logger = get_logger() - - show_banner() - main()