Merge branch 'table_lines' into 'master'

Table lines

See merge request redactmanager/cv-analysis-service!9
This commit is contained in:
Isaac Riley 2024-03-08 10:42:54 +01:00
commit 95abb5d5fb
4 changed files with 195 additions and 38 deletions

1
.gitignore vendored
View File

@ -8,6 +8,7 @@ venv/
.DS_Store
# Project folders
scratch/
*.vscode/
.idea
*_app

View File

@ -2,31 +2,42 @@ import sys
from dataclasses import asdict
from operator import truth
from funcy import lmap, flatten
from funcy import flatten, lmap
from pdf2img.conversion import convert_pages_to_images
from pdf2img.default_objects.image import ImageInfo, ImagePlus
from pdf2img.default_objects.rectangle import RectanglePlus
from cv_analysis.figure_detection.figure_detection import detect_figures
from cv_analysis.table_parsing import parse_tables
from cv_analysis.table_parsing import parse_lines, parse_tables
from cv_analysis.utils.structures import Rectangle
from pdf2img.conversion import convert_pages_to_images
from pdf2img.default_objects.image import ImagePlus, ImageInfo
from pdf2img.default_objects.rectangle import RectanglePlus
def get_analysis_pipeline(operation, table_parsing_skip_pages_without_images=True):
if operation == "table":
return make_analysis_pipeline(
parse_tables,
parse_lines,
table_parsing_formatter,
dpi=200,
skip_pages_without_images=table_parsing_skip_pages_without_images,
)
elif operation == "figure":
return make_analysis_pipeline(detect_figures, figure_detection_formatter, dpi=200)
if operation == "table_cells":
return make_analysis_pipeline(
parse_tables,
table_parsing_cells_formatter,
dpi=200,
skip_pages_without_images=table_parsing_skip_pages_without_images,
)
if operation == "figure":
return make_analysis_pipeline(
detect_figures, figure_detection_formatter, dpi=200
)
else:
raise
def make_analysis_pipeline(analysis_fn, formatter, dpi, skip_pages_without_images=False):
def make_analysis_pipeline(
analysis_fn, formatter, dpi, skip_pages_without_images=False
):
def analyse_pipeline(pdf: bytes, index=None):
def parse_page(page: ImagePlus):
image = page.asarray()
@ -36,7 +47,12 @@ def make_analysis_pipeline(analysis_fn, formatter, dpi, skip_pages_without_image
infos = formatter(rects, page, dpi)
return infos
pages = convert_pages_to_images(pdf, index=index, dpi=dpi, skip_pages_without_images=skip_pages_without_images)
pages = convert_pages_to_images(
pdf,
index=index,
dpi=dpi,
skip_pages_without_images=skip_pages_without_images,
)
results = map(parse_page, pages)
yield from flatten(filter(truth, results))
@ -44,9 +60,15 @@ def make_analysis_pipeline(analysis_fn, formatter, dpi, skip_pages_without_image
return analyse_pipeline
def table_parsing_formatter(rects, page: ImagePlus, dpi):
def table_parsing_formatter(lines: list[dict[str, float]], page: ImagePlus, dpi):
return {"pageInfo": page.asdict(natural_index=True), "tableLines": lines}
def table_parsing_cells_formatter(rects, page: ImagePlus, dpi):
def format_rect(rect: Rectangle):
rect_plus = RectanglePlus.from_pixels(*rect.xyxy(), page.info, alpha=False, dpi=dpi)
rect_plus = RectanglePlus.from_pixels(
*rect.xyxy(), page.info, alpha=False, dpi=dpi
)
return rect_plus.asdict(derotate=True)
bboxes = lmap(format_rect, rects)
@ -56,7 +78,11 @@ def table_parsing_formatter(rects, page: ImagePlus, dpi):
def figure_detection_formatter(rects, page, dpi):
def format_rect(rect: Rectangle):
rect_plus = RectanglePlus.from_pixels(*rect.xyxy(), page.info, alpha=False, dpi=dpi)
return asdict(ImageInfo(page.info, rect_plus.asbbox(derotate=False), rect_plus.alpha))
rect_plus = RectanglePlus.from_pixels(
*rect.xyxy(), page.info, alpha=False, dpi=dpi
)
return asdict(
ImageInfo(page.info, rect_plus.asbbox(derotate=False), rect_plus.alpha)
)
return lmap(format_rect, rects)

View File

@ -1,20 +1,19 @@
from functools import partial
from itertools import chain, starmap
from operator import attrgetter
import cv2
import numpy as np
from funcy import lmap, lfilter
from funcy import lfilter, lmap
from cv_analysis.layout_parsing import parse_layout
from cv_analysis.utils.postprocessing import remove_isolated # xywh_to_vecs, xywh_to_vec_rect, adjacent1d
from cv_analysis.utils.postprocessing import (
remove_isolated,
) # xywh_to_vecs, xywh_to_vec_rect, adjacent1d
from cv_analysis.utils.structures import Rectangle
from cv_analysis.utils.visual_logging import vizlogger
def add_external_contours(image, image_h_w_lines_only):
contours, _ = cv2.findContours(image_h_w_lines_only, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
contours, _ = cv2.findContours(
image_h_w_lines_only, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE
)
for cnt in contours:
x, y, w, h = cv2.boundingRect(cnt)
cv2.rectangle(image, (x, y), (x + w, y + h), 255, 1)
@ -78,8 +77,10 @@ def isolate_vertical_and_horizontal_components(img_bin):
img_bin_extended = img_bin_h | img_bin_v
th1, img_bin_extended = cv2.threshold(img_bin_extended, 120, 255, cv2.THRESH_BINARY)
img_bin_final = cv2.dilate(img_bin_extended, np.ones((1, 1), np.uint8), iterations=1)
_, img_bin_extended = cv2.threshold(img_bin_extended, 120, 255, cv2.THRESH_BINARY)
img_bin_final = cv2.dilate(
img_bin_extended, np.ones((1, 1), np.uint8), iterations=1
)
# add contours before lines are extended by blurring
img_bin_final = add_external_contours(img_bin_final, img_lines_raw)
@ -88,7 +89,7 @@ def isolate_vertical_and_horizontal_components(img_bin):
def find_table_layout_boxes(image: np.array):
def is_large_enough(box):
(x, y, w, h) = box
(_, _, w, h) = box
if w * h >= 100000:
return Rectangle.from_xywh(box)
@ -108,7 +109,9 @@ def turn_connected_components_into_rects(image: np.array):
x1, y1, w, h, area = stat
return area > 2000 and w > 35 and h > 25
_, _, stats, _ = cv2.connectedComponentsWithStats(~image, connectivity=8, ltype=cv2.CV_32S)
_, _, stats, _ = cv2.connectedComponentsWithStats(
~image, connectivity=8, ltype=cv2.CV_32S
)
stats = lfilter(is_large_enough, stats)
if stats:
@ -130,10 +133,114 @@ def parse_tables(image: np.array, show=False):
image = preprocess(image)
image = isolate_vertical_and_horizontal_components(image)
rects = turn_connected_components_into_rects(image)
#print(rects, "\n\n")
rects = list(map(Rectangle.from_xywh, rects))
#print(rects, "\n\n")
rects = remove_isolated(rects)
#print(rects, "\n\n")
return rects
# def make_lines(image: np.array, horizontal=True, kernel_length=40)
def detect_horizontal_lines(image_bin: np.array, kernel_length=40):
line_min_width = 48
kernel_h = np.ones((1, line_min_width), np.uint8)
img_bin_h = cv2.morphologyEx(image_bin, cv2.MORPH_OPEN, kernel_h)
kernel_h = np.ones((1, 30), np.uint8)
img_bin_h = cv2.dilate(img_bin_h, kernel_h, iterations=2)
img_bin_h = apply_motion_blur(img_bin_h, 0)
_, img_bin_h = cv2.threshold(img_bin_h, 120, 255, cv2.THRESH_BINARY)
# img_bin_h = cv2.dilate(img_bin_h, np.ones((1, 1), np.uint8), iterations=1)
return img_bin_h
def detect_vertical_lines(image_bin: np.array, kernel_length=40):
line_min_width = 48
kernel_v = np.ones((line_min_width, 1), np.uint8)
img_bin_v = cv2.morphologyEx(image_bin, cv2.MORPH_OPEN, kernel_v)
kernel_v = np.ones((30, 1), np.uint8)
img_bin_v = cv2.dilate(img_bin_v, kernel_v, iterations=2)
img_bin_v = apply_motion_blur(img_bin_v, 90)
_, img_bin_v = cv2.threshold(img_bin_v, 120, 255, cv2.THRESH_BINARY)
# img_bin_v = cv2.dilate(img_bin_v, np.ones((1, 1), np.uint8), iterations=1)
return img_bin_v
def detect_endpoints(
image: np.array, is_horizontal: bool
) -> list[tuple[int, int, int, int]]:
def are_collinear(
quad1: tuple[int, int, int, int], quad2: tuple[int, int, int, int], index: int
) -> bool:
dist_a = abs(quad1[index] - quad2[index])
dist_b = abs(quad1[index + 2] - quad2[index + 2])
overlap = True if index else (quad1[1] >= quad2[3] or quad1[3] >= quad2[1])
return (dist_a < 15) and (dist_b < 15) and overlap
points = cv2.HoughLinesP(
image, # Input edge image
1, # Distance resolution in pixels
np.pi / 180, # Angle resolution in radians
threshold=100, # Min number of votes for valid line
minLineLength=200, # Min allowed length of line
maxLineGap=10, # Max allowed gap between line for joining them
)
points = points if points is not None else []
lines = list(map(lambda x: tuple(x[0]), points))
if not lines:
return lines
index = int(is_horizontal)
lines.sort(key=lambda q: q[index])
corrected = [lines[0]]
for quad in lines[1:]:
if are_collinear(corrected[-1], quad, bool(is_horizontal)):
prev = corrected.pop(-1)
corrected.append(
(
min(prev[0], quad[0]),
min(prev[1], quad[1]),
max(prev[2], quad[2]),
min(prev[3], quad[3]),
)
if is_horizontal
else (
min(prev[0], quad[0]),
max(prev[1], quad[1]),
min(prev[2], quad[2]),
min(prev[3], quad[3]),
)
)
else:
corrected.append(quad)
return corrected
def parse_lines(image: np.array, show=False) -> list[dict[str, list[int]]]:
image = preprocess(image)
# kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2))
# image = cv2.dilate(image, kernel, iterations=4)
horizontal_line_img = detect_horizontal_lines(image)
vertical_line_img = detect_vertical_lines(image)
horizontal_endpoints = detect_endpoints(horizontal_line_img, is_horizontal=True)
vertical_endpoints = detect_endpoints(vertical_line_img, is_horizontal=False)
def format_quad(
quad: tuple[int, int, int, int], max_x: int, max_y: int
) -> tuple[int, int, int, int]:
x1, y1, x2, y2 = quad
if x1 > (x2 + 5):
x1, y1, x2, y2 = x2, y2, x1, y1
elif y1 > (y2 + 5):
x1, y1, x2, y2 = x2, y2, x1, y1
return {"x1": x1 / max_x, "y1": y1 / max_y, "x2": x2 / max_x, "y2": y2 / max_y}
ymax, xmax = image.shape
return list(
map(
lambda quad: format_quad(quad, xmax, ymax),
horizontal_endpoints + vertical_endpoints,
)
)

View File

@ -2,7 +2,11 @@ import fitz
import numpy as np
import pytest
from cv_analysis.server.pipeline import table_parsing_formatter, figure_detection_formatter, make_analysis_pipeline
from cv_analysis.server.pipeline import (
figure_detection_formatter,
make_analysis_pipeline,
table_parsing_formatter,
)
from cv_analysis.utils.structures import Rectangle
@ -21,10 +25,15 @@ def empty_pdf():
@pytest.fixture
def expected_formatted_analysis_result(operation):
if operation == "table":
if operation == "table_cells":
return [
{
"pageInfo": {"number": 1, "rotation": 0, "width": 595.0, "height": 842.0},
"pageInfo": {
"number": 1,
"rotation": 0,
"width": 595.0,
"height": 842.0,
},
"tableCells": [
{
"x0": 0.0,
@ -40,8 +49,20 @@ def expected_formatted_analysis_result(operation):
if operation == "figure":
return [
{
"pageInfo": {"number": 0, "rotation": 0, "width": 595.0, "height": 842.0},
"boundingBox": {"x0": 0.0, "y0": 0.0, "x1": 15.12, "y1": 15.12, "width": 15.12, "height": 15.12},
"pageInfo": {
"number": 0,
"rotation": 0,
"width": 595.0,
"height": 842.0,
},
"boundingBox": {
"x0": 0.0,
"y0": 0.0,
"x1": 15.12,
"y1": 15.12,
"width": 15.12,
"height": 15.12,
},
"alpha": False,
}
]
@ -49,7 +70,7 @@ def expected_formatted_analysis_result(operation):
@pytest.fixture
def formatter(operation):
if operation == "table":
if operation == "table_cells":
return table_parsing_formatter
elif operation == "figure":
return figure_detection_formatter
@ -57,8 +78,10 @@ def formatter(operation):
raise
@pytest.mark.parametrize("operation", ["table", "figure"])
@pytest.mark.parametrize("operation", ["table_cells", "figure"])
def test_analysis_pipeline(empty_pdf, formatter, expected_formatted_analysis_result):
analysis_pipeline = make_analysis_pipeline(analysis_fn_mock, formatter, dpi=200, skip_pages_without_images=False)
analysis_pipeline = make_analysis_pipeline(
analysis_fn_mock, formatter, dpi=200, skip_pages_without_images=False
)
results = list(analysis_pipeline(empty_pdf))
assert list(results) == expected_formatted_analysis_result