Julius Unverfehrt 2995d5ee48 refactoring
2023-02-03 11:14:14 +01:00

246 lines
8.2 KiB
Python

import atexit
import json
import traceback
from _operator import itemgetter
from functools import partial, lru_cache
from itertools import chain, starmap, filterfalse
from operator import itemgetter, truth
from typing import Iterable, Iterator, List, Union
import fitz
import numpy as np
from PIL import Image
from funcy import merge, pluck, curry, compose, rcompose, remove, keep
from image_prediction.config import CONFIG
from image_prediction.exceptions import InvalidBox
from image_prediction.formatter.formatters.enum import EnumFormatter
from image_prediction.image_extractor.extractor import ImageExtractor, ImageMetadataPair
from image_prediction.info import Info
from image_prediction.stitching.stitching import stitch_pairs
from image_prediction.stitching.utils import validate_box
from image_prediction.transformer.transformers.response import compute_geometric_quotient
from image_prediction.utils import get_logger
from image_prediction.utils.generic import lift
logger = get_logger()
class ParsablePDFImageExtractor(ImageExtractor):
def __init__(self, verbose=False, tolerance=0):
"""
Args:
verbose: Whether to show progressbar
tolerance: The tolerance in pixels for the distance between images, beyond which they will not be stitched
together
"""
self.doc: fitz.fitz.Document = None
self.verbose = verbose
self.tolerance = tolerance
def extract(self, pdf: bytes, page_range: range = None):
self.doc = fitz.Document(stream=pdf)
pages = extract_pages(self.doc, page_range) if page_range else self.doc
image_metadata_pairs = chain.from_iterable(map(self.__process_images_on_page, pages))
yield from image_metadata_pairs
def __process_images_on_page(self, page: fitz.fitz.Page):
metadata = extract_valid_metadata(self.doc, page)
images = get_images_on_page(self.doc, metadata)
clear_caches()
image_metadata_pairs = starmap(ImageMetadataPair, filter(all, zip(images, metadata)))
# TODO: In the future, consider to introduce an image validator as a pipeline component rather than doing the
# validation here. Invalid images can then be split into a different stream and joined with the intact images
# again for the formatting step.
image_metadata_pairs = self.__filter_valid_images(image_metadata_pairs)
image_metadata_pairs = stitch_pairs(list(image_metadata_pairs), tolerance=self.tolerance)
yield from image_metadata_pairs
@staticmethod
def __filter_valid_images(image_metadata_pairs: Iterable[ImageMetadataPair]) -> Iterator[ImageMetadataPair]:
def validate(image: Image.Image, metadata: dict):
try:
# TODO: stand-in heuristic for testing if image is valid => find cleaner solution (RED-5148)
image.resize((100, 100)).convert("RGB")
return ImageMetadataPair(image, metadata)
except (OSError, Exception) as err:
metadata = json.dumps(EnumFormatter()(metadata), indent=2)
logger.warning(f"Invalid image encountered. Image metadata:\n{metadata}\n\n{traceback.format_exc()}")
return None
return filter(truth, starmap(validate, image_metadata_pairs))
def extract_pages(doc, page_range):
page_range = range(page_range.start + 1, page_range.stop + 1)
pages = map(doc.load_page, page_range)
yield from pages
def get_images_on_page(doc, metadata):
xrefs = pluck(Info.XREF, metadata)
images = map(partial(xref_to_image, doc), xrefs)
yield from images
def extract_valid_metadata(doc: fitz.fitz.Document, page: fitz.fitz.Page):
return compose(
list,
partial(add_alpha_channel_info, doc),
filter_valid_metadata,
get_metadata_for_images_on_page,
)(page)
def get_metadata_for_images_on_page(page: fitz.Page):
metadata = map(get_image_metadata, get_image_infos(page))
metadata = add_page_metadata(page, metadata)
yield from metadata
def filter_valid_metadata(metadata):
yield from compose(
# TODO: Disabled for now, since atm since the backend needs atm the metadata and the hash of every image, even
# scanned pages. In the future, this should be resolved differently, e.g. by filtering all page-sized images
# and giving the user the ability to reclassify false positives with a separate call.
# filter_out_page_sized_images,
filter_out_tiny_images,
filter_out_invalid_metadata,
)(metadata)
def filter_out_invalid_metadata(metadata):
def __validate_box(box):
try:
return validate_box(box)
except InvalidBox as err:
logger.debug(f"Dropping invalid metadatum, reason: {err}")
yield from keep(__validate_box, metadata)
def filter_out_page_sized_images(metadata):
yield from remove(breaches_image_to_page_quotient, metadata)
def filter_out_tiny_images(metadata):
yield from filterfalse(tiny, metadata)
@lru_cache(maxsize=None)
def get_image_infos(page: fitz.Page) -> List[dict]:
return page.get_image_info(xrefs=True)
@lru_cache(maxsize=None)
def xref_to_image(doc, xref) -> Union[Image.Image, None]:
# NOTE: image extraction is done via pixmap to array, as this method is twice as fast as extraction via bytestream
try:
pixmap = fitz.Pixmap(doc, xref)
array = np.frombuffer(pixmap.samples, dtype=np.uint8).reshape(pixmap.h, pixmap.w, pixmap.n)
# TODO: Find a better solution: PIL.Image.fromarray doesn't take grayscale images of the shape (h, w, 1) but (h, w)
array = array[:, :, 0] if array.shape[2] == 1 else array
return Image.fromarray(array)
except ValueError:
logger.debug(f"Xref {xref} is invalid, skipping extraction ...")
return
def get_image_metadata(image_info):
xref, coords = itemgetter("xref", "bbox")(image_info)
x1, y1, x2, y2 = map(rounder, coords)
width = abs(x2 - x1)
height = abs(y2 - y1)
return {
Info.WIDTH: width,
Info.HEIGHT: height,
Info.X1: x1,
Info.X2: x2,
Info.Y1: y1,
Info.Y2: y2,
Info.XREF: xref,
}
def add_page_metadata(page, metadata):
yield from map(partial(merge, get_page_metadata(page)), metadata)
def add_alpha_channel_info(doc, metadata):
def add_alpha_value_to_metadatum(metadatum):
alpha = metadatum_to_alpha_value(metadatum)
return {**metadatum, Info.ALPHA: alpha}
xref_to_alpha = partial(has_alpha_channel, doc)
metadatum_to_alpha_value = compose(xref_to_alpha, itemgetter(Info.XREF))
yield from map(add_alpha_value_to_metadatum, metadata)
@lru_cache(maxsize=None)
def load_image_handle_from_xref(doc, xref):
return doc.extract_image(xref)
rounder = rcompose(round, int)
def get_page_metadata(page):
page_width, page_height = map(rounder, page.mediabox_size)
return {
Info.PAGE_WIDTH: page_width,
Info.PAGE_HEIGHT: page_height,
Info.PAGE_IDX: page.number,
}
def has_alpha_channel(doc, xref):
maybe_image = load_image_handle_from_xref(doc, xref)
maybe_smask = maybe_image["smask"] if maybe_image else None
if maybe_smask:
return any([doc.extract_image(maybe_smask) is not None, bool(fitz.Pixmap(doc, maybe_smask).alpha)])
else:
try:
return bool(fitz.Pixmap(doc, xref).alpha)
except ValueError:
logger.debug(f"Encountered invalid xref `{xref}` in {doc.metadata.get('title', '<no title>')}.")
return False
def tiny(metadata):
return metadata[Info.WIDTH] * metadata[Info.HEIGHT] <= 4
def clear_caches():
get_image_infos.cache_clear()
load_image_handle_from_xref.cache_clear()
xref_to_image.cache_clear()
atexit.register(clear_caches)
def breaches_image_to_page_quotient(metadatum):
page_width, page_height, x1, x2, y1, y2, width, height = itemgetter(
Info.PAGE_WIDTH, Info.PAGE_HEIGHT, Info.X1, Info.X2, Info.Y1, Info.Y2, Info.WIDTH, Info.HEIGHT
)(metadatum)
geometric_quotient = compute_geometric_quotient(page_width, page_height, x2, x1, y2, y1)
quotient_breached = bool(geometric_quotient > CONFIG.filters.image_to_page_quotient.max)
return quotient_breached