Merge branch 'master' of ssh://git.iqser.com:2222/rr/cv-analysis

This commit is contained in:
Isaac Riley 2022-07-26 13:13:29 +02:00
commit 1618909d8e
15 changed files with 299 additions and 119 deletions

View File

@ -8,5 +8,5 @@ webserver:
port: $SERVER_PORT|5000 # webserver port
visual_logging:
level: DEBUG # NOTHING > INFO > DEBUG > ALL
level: DISABLED # NOTHING > INFO > DEBUG > ALL
output_folder: /tmp/debug/

View File

@ -0,0 +1,64 @@
from functools import partial
from typing import Callable
from funcy import lmap
from cv_analysis.figure_detection.figure_detection_pipeline import make_figure_detection_pipeline
from cv_analysis.layout_parsing import parse_layout
from cv_analysis.server.rotate import rotate_rectangle
from cv_analysis.table_parsing import parse_tables
from cv_analysis.utils.logging import get_logger
from cv_analysis.utils.pdf2image import pdf_to_image_metadata_pairs
from cv_analysis.utils.structures import Rectangle
logger = get_logger()
def make_analysis_pipeline(analysis_fn: Callable, dpi=200):
"""Make end-to-end pipeline to analyse a PDF with given analysis function.
The pipeline returns a Generator of dicts containing page information and the analysis results.
Steps:
Convert PDF to Arrays and page information
Analise pages, get list of bboxes per page (e.g. table cells)
Convert pixel values to inches
Rotate results if page is rotated
Format results to stream of dictionaries
"""
def pipeline(pdf: bytes, index=None):
image_metadata_pairs = pdf_to_image_metadata_pairs(pdf, index=index, dpi=dpi)
results = map(image_metadata_pair_to_results, image_metadata_pairs)
results_filtered = filter(lambda x: x["bboxes"], results)
return results_filtered
def image_metadata_pair_to_results(image_metadata_pair):
rectangles = analysis_fn(image_metadata_pair.image)
rectangles = map(partial(pixel_rect_to_inches_rect, dpi=dpi), rectangles)
if image_metadata_pair.metadata["rotation"] != 0:
rotate_rectangle_fn = partial(rotate_rectangle, metadata=image_metadata_pair.metadata)
rectangles = map(rotate_rectangle_fn, rectangles)
bboxes = lmap(lambda x: x.json_xyxy(), rectangles)
return {**image_metadata_pair.metadata, "bboxes": bboxes}
return pipeline
def get_analysis_fn(analysis_type):
if analysis_type == "table":
return parse_tables
elif analysis_type == "layout":
return parse_layout
elif analysis_type == "figure":
return make_figure_detection_pipeline()
else:
raise
def pixel_rect_to_inches_rect(rect, dpi):
def convert_pixel_to_inch(pixel):
return pixel / dpi * 72
bbox = rect.x1, rect.y1, rect.x2, rect.y2
bbox_inches = tuple(map(convert_pixel_to_inch, bbox))
return Rectangle.from_xyxy(bbox_inches, discrete=False)

View File

@ -1,35 +1,25 @@
from _operator import itemgetter
from functools import partial
import numpy as np
from cv_analysis.utils.structures import Rectangle
def make_formatter(dpi, page_size, rotation):
def rotate_rectangle(rectangle, metadata):
width, height, rotation = itemgetter("width", "height", "rotation")(metadata)
rotation = rotation // 90 if rotation not in [0, 1, 2, 3] else rotation
def format_(key2pixel):
convert = partial(convert_pixel_to_inch, dpi=dpi)
x, y, w, h = map(convert, itemgetter("x", "y", "width", "height")(key2pixel))
x1, y1 = x + w, y + h
matrix = np.vstack([[x, y], [x1, y1]]).T
new_matrix = rotate_and_shift(matrix, rotation, page_size)
x1, x2 = sorted(new_matrix[0, :])
y1, y2 = sorted(new_matrix[1, :])
return Rectangle.from_xyxy((x1, y1, x2, y2), discrete=False).json_xywh()
if rotation in [1, 3]:
width, height = height, width
return format_
x1, y1, x2, y2 = rectangle.xyxy()
matrix = np.vstack([[x1, y1], [x2, y2]]).T
new_matrix = rotate_and_shift(matrix, rotation, (width, height))
x1, x2 = sorted(new_matrix[0, :])
y1, y2 = sorted(new_matrix[1, :])
def convert_pixel_to_inch(pixel, dpi):
return pixel / dpi * 72
def rotate(input_matrix, radians):
rotation_matrix = np.vstack([[np.cos(radians), -np.sin(radians)], [np.sin(radians), np.cos(radians)]])
return np.dot(rotation_matrix, input_matrix)
return Rectangle.from_xyxy((x1, y1, x2, y2), discrete=False)
def rotate_and_shift(matrix, rotation, size, debug=False):
@ -109,3 +99,9 @@ def __show_matrices(size, radians, matrix, matrix_rotated, matrix_rotated_and_sh
axes[1].quiver([0, 0], [0, 0], m3[0, :], m3[1, :], scale=5, scale_units="inches", color="blue")
plt.show()
def rotate(input_matrix, radians):
rotation_matrix = np.vstack([[np.cos(radians), -np.sin(radians)], [np.sin(radians), np.cos(radians)]])
return np.dot(rotation_matrix, input_matrix)

View File

@ -1,43 +0,0 @@
import gzip
from operator import itemgetter
from typing import Callable
from funcy import lmap
from pyinfra.server.utils import make_streamable_and_wrap_in_packing_logic
from cv_analysis.server.format import make_formatter
from cv_analysis.utils.logging import get_logger
from cv_analysis.utils.open_pdf import open_pdf
logger = get_logger()
def make_streamable_analysis_fn(analysis_fn: Callable):
"""Makes an analysis function streamable for pyinfra server logic. The wrapped function then
works with data and metadata and returns a tuple or generator of tuples with data and metadata.
For more information about the server logic, see the PyInfra documentation.
Args:
analysis_fn: cv-analysis function
Returns:
wrapped function
"""
def analyse(data: bytes, metadata: dict):
image = open_pdf(gzip.decompress(data))[0]
dpi = metadata["image_info"]["dpi"]
width, height, rotation = itemgetter("width", "height", "rotation")(metadata["page_info"])
formatter = make_formatter(dpi, (width, height), rotation)
results = map(lambda x: x.json_xywh(), analysis_fn(image))
results = {"cells": (lmap(formatter, results))}
logger.debug(f"Page {metadata['page_info'].get('index', '')}: Found {len(results['cells'])} cells.")
return b"", {**metadata, **results}
return make_streamable_and_wrap_in_packing_logic(analyse, batched=False)

View File

@ -0,0 +1,45 @@
from dataclasses import dataclass
from functools import partial
from typing import Iterator, Tuple
import fitz
import numpy as np
@dataclass
class ImageMetadataPair:
image: np.ndarray
metadata: dict
def pdf_to_image_metadata_pairs(pdf: bytes, index=None, dpi=200) -> Iterator[ImageMetadataPair]:
"""Streams PDF as pairs of image (matrix) and metadata.
Note: If Index is not given or evaluates to None, the whole PDF will be processed."""
convert_fn = partial(page_to_image_metadata_pair, dpi=dpi)
yield from map(convert_fn, stream_pages(pdf, index))
def page_to_image_metadata_pair(page: fitz.Page, dpi):
metadata = get_page_info(page)
pixmap = page.get_pixmap(dpi=dpi)
array = np.frombuffer(pixmap.samples, dtype=np.uint8).reshape(pixmap.h, pixmap.w, pixmap.n)
return ImageMetadataPair(array, metadata)
def stream_pages(pdf: bytes, index=None) -> Iterator[fitz.Page]:
with fitz.open(stream=pdf) as pdf_handle:
if not index:
yield from pdf_handle
else:
for i in index:
yield pdf_handle[i]
def get_page_info(page):
return {
"index": page.number,
"rotation": page.rotation,
"width": page.rect.width, # rotated page width in inches
"height": page.rect.height, # rotated page height in inches
}

View File

@ -16,4 +16,7 @@ coverage~=5.5
dependency-check~=0.6.0
prometheus-client~=0.13.1
prometheus_flask_exporter~=0.19.0
lorem-text==2.1
lorem-text==2.1
# pdf2array
PyMuPDF==1.19.6

View File

@ -1,38 +0,0 @@
import argparse
from itertools import starmap
from pathlib import Path
import numpy as np
import pdf2image
from PIL import Image
from funcy import lmap
from cv_analysis.figure_detection.figure_detection_pipeline import make_figure_detection_pipeline
from cv_analysis.utils.draw import draw_rectangles
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--pdf_path", "-p", required=True)
parser.add_argument("--output_folder", "-o", required=True)
return parser.parse_args()
def annotate_figures(images):
pipeline = make_figure_detection_pipeline()
result = map(pipeline, images)
annotated_images = starmap(draw_rectangles, zip(images, result))
return annotated_images
def save_as_pdf(images, output_folder, file_name):
Path(output_folder).mkdir(parents=True, exist_ok=True)
images = lmap(Image.fromarray, images)
images[0].save(f"{output_folder}/{file_name}_annotated_figures.pdf", save_all=True, append_images=images)
if __name__ == "__main__":
args = parse_args()
pages = lmap(np.array, pdf2image.convert_from_path(args.pdf_path))
annotated_pages = annotate_figures(images=pages)
save_as_pdf(annotated_pages, args.output_folder, Path(args.pdf_path).stem)

47
scripts/annotate_pdf.py Normal file
View File

@ -0,0 +1,47 @@
import argparse
import json
from operator import itemgetter
from pathlib import Path
import fitz
from cv_analysis.server.pipeline import get_analysis_fn, make_analysis_pipeline
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("pdf_path")
parser.add_argument("output_folder")
parser.add_argument("--type", "-t", choices=["table", "layout", "figure"], required=True)
parser.add_argument("--verbose", action="store_true")
parser.add_argument("--silent", dest="verbose", action="store_false")
parser.set_defaults(verbose=False)
return parser.parse_args()
def analyse_annotate_save(pdf, analysis_type, output_path, verbose):
pipe = make_analysis_pipeline(get_analysis_fn(analysis_type))
results = list(pipe(pdf))
if verbose:
print(json.dumps(results, indent=2))
with fitz.open(stream=pdf) as pdf_handle:
for result in results:
page = pdf_handle[result["index"]]
for rect in result["bboxes"]:
x1, y1, x2, y2 = itemgetter("x1", "y1", "x2", "y2")(rect)
page.draw_rect((x1, y1, x2, y2), color=(0.5, 0.7, 0.2), width=2)
pdf_handle.save(output_path)
if __name__ == "__main__":
args = parse_args()
with open(args.pdf_path, "rb") as f:
pdf_bytes = f.read()
Path(args.output_folder).mkdir(parents=True, exist_ok=True)
output_path = f"{args.output_folder}/{Path(args.pdf_path).stem}_annotated_{args.type}.pdf"
analyse_annotate_save(pdf_bytes, args.type, output_path, args.verbose)

View File

@ -0,0 +1,96 @@
import argparse
import time
from functools import partial
from pathlib import Path
import fitz
import numpy as np
from funcy import lmap
from matplotlib import pyplot as plt
from cv_analysis.server.pipeline import make_analysis_pipeline, get_analysis_fn
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("pdf_folder", help="Path to folder with PDFs to evaluate")
parser.add_argument("output_folder", help="Path to folder where the Runtime plot should be stored")
parser.add_argument("n_runs", help="Number of runs per test")
return parser.parse_args()
def measure(fn, n_runs):
def run(*args, **kwargs):
def _run():
start = time.time()
results = list(fn(*args, **kwargs)) # Evaluate generators
end = time.time()
return end - start
runtimes = [_run() for _ in range(n_runs)]
return np.mean(runtimes), np.std(runtimes)
return run
def run_tests(pdf, test_cases, n_runs):
def measure_analysis_pipe(test_case):
timed_analysis_pipe = measure(make_analysis_pipeline(get_analysis_fn(test_case)), n_runs)
return timed_analysis_pipe(pdf)
return lmap(measure_analysis_pipe, test_cases)
def to_ms_per_page(runtime, page_count):
ms_per_page = runtime / page_count * 1000
return round(ms_per_page, 0)
def measure_pdf(pdf_path, n_runs):
with open(pdf_path, "rb") as f:
pdf = f.read()
page_count = fitz.open(stream=pdf).page_count
format_fn = partial(to_ms_per_page, page_count=page_count)
means, std = zip(*run_tests(pdf, test_cases, n_runs=n_runs))
means, std = lmap(format_fn, means), lmap(format_fn, std)
return means, std
def plot_results_and_save(results, labels, n_runs, test_pdf_paths):
fig, ax = plt.subplots()
width = 0.2
x_labels = np.arange(len(labels))
plt.xticks(ticks=x_labels, labels=labels, rotation=90)
plt.grid(linestyle="dotted")
for idx, (result, test_pdf_path) in enumerate(zip(results, test_pdf_paths)):
x = x_labels + idx * width
means, std = result
bars = ax.bar(x, means, width, yerr=std, label=f"{test_pdf_path.stem}")
ax.bar_label(bars)
ax.set_ylabel("ms/page")
ax.set_xlabel("Cv-analysis operation")
ax.set_title(f"Cv-analysis runtime estimation {n_runs=}")
ax.legend(loc=0)
Path(args.output_folder).mkdir(parents=True, exist_ok=True)
output_path = f"{args.output_folder}/cv_analysis_runtime_{n_runs=}.png"
plt.savefig(output_path, dpi=200, bbox_inches="tight", pad_inches=0.5)
plt.close()
def measure_and_save_plot(args, test_cases):
n_runs = int(args.n_runs)
measure_pdf_fn = partial(measure_pdf, n_runs=n_runs)
test_pdf_paths = list(Path(args.pdf_folder).glob("*.pdf"))
results = lmap(measure_pdf_fn, test_pdf_paths)
plot_results_and_save(results, test_cases, n_runs, test_pdf_paths)
if __name__ == "__main__":
test_cases = ["table", "layout", "figure"]
args = parse_args()
measure_and_save_plot(args, test_cases)

View File

@ -1,6 +1,5 @@
pytest_plugins = [
"test.fixtures.table_parsing",
"test.fixtures.server",
"test.fixtures.figure_detection",
]

View File

@ -4,7 +4,7 @@ import cv2
import numpy as np
import pytest
from lorem_text import lorem
from funcy import first
from cv_analysis.figure_detection.figure_detection_pipeline import (
make_figure_detection_pipeline,
)

View File

@ -14,7 +14,7 @@ class TestFindPrimaryTextRegions:
assert not list(results)
@pytest.mark.parametrize("image_size", [(200, 200), (500, 500), (800, 800)])
@pytest.mark.parametrize("n_images", [1, 2])
@pytest.mark.parametrize("n_images", [1])
def test_page_without_text_yields_figures(self, figure_detection_pipeline, page_with_images, image_size):
results = figure_detection_pipeline(page_with_images)
result_figures_size = map(lambda x: (x.w, x.h), results)

View File

@ -0,0 +1,24 @@
import fitz
import numpy as np
import pytest
from cv_analysis.utils.pdf2image import pdf_to_image_metadata_pairs
@pytest.fixture
def pdf(n_pages):
doc = fitz.open()
for n in range(n_pages):
page = doc.new_page()
where = fitz.Point(50, 100)
page.insert_text(where, "De gustibus non est disputandum.", fontsize=30)
return doc.write()
@pytest.mark.parametrize("n_pages", [1])
def test_pdf_to_array_and_metadata(pdf):
for image_metadata_pair in pdf_to_image_metadata_pairs(pdf):
assert isinstance(image_metadata_pair.image, np.ndarray)
assert image_metadata_pair.image.shape == (2339, 1653, 3) # Height, Width, Color channels
assert isinstance(image_metadata_pair.metadata, dict)

View File

@ -1,13 +0,0 @@
import pytest
from funcy import first
from cv_analysis.server.stream import make_streamable_analysis_fn
@pytest.mark.parametrize("operation", ["mock"])
@pytest.mark.parametrize("image_size", [(200, 200), (500, 500), (800, 800)])
def test_make_analysis_fn(analysis_fn_mock, random_image_metadata_package, expected_analyse_metadata):
analyse = make_streamable_analysis_fn(analysis_fn_mock)
results = first(analyse(random_image_metadata_package))
assert results["metadata"] == expected_analyse_metadata