feat: remove relextrema because not working; use pure numpy instead

This commit is contained in:
iriley 2024-04-26 14:21:00 +02:00
parent 1d3b077ace
commit f7a0db2651
3 changed files with 145 additions and 84 deletions

View File

@ -1,12 +1,12 @@
from operator import itemgetter
from pathlib import Path
from typing import Callable, Optional
from typing import Tuple
from typing import Callable, Optional, Tuple
import cv2
import matplotlib.pyplot as plt
import numpy as np
from kn_utils.logging import logger
from numpy import ndarray as Array
from scipy.signal import argrelextrema
from scipy.stats import norm
@ -38,14 +38,20 @@ def save_plot(arr: Array, name: str, title: str = "") -> None:
plt.savefig(Path(str(name) + ".png"))
def save_lines(img: Array, lines: list[dict[str, int]]) -> None:
img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)
getter = itemgetter("x1", "y1", "x2", "y2")
for line in lines:
x1, y1, x2, y2 = getter(line)
img = cv2.line(img, (x1, y1), (x2, y2), color=(0, 255, 0), thickness=3)
cv2.imwrite("/tmp/lines.png", img)
def make_gaussian_kernel(kernel_size: int, sd: float) -> Array:
kernel_size += int(not kernel_size % 2)
wing_size = int((kernel_size - 1) / 2)
xvals = np.arange(-wing_size, wing_size + 1)
kernel = norm.pdf(xvals, scale=sd)
# maxval, minval = np.max(kernel), np.min(kernel)
# diff = maxval - minval
# kernel += (diff / (1 - ratio))
kernel /= np.sum(kernel)
return kernel
@ -56,31 +62,21 @@ def make_gaussian_nonpositive_kernel(kernel_size: int, sd: float) -> Array:
wing_size = int((kernel_size - 1) / 2)
xvals = np.arange(-wing_size, wing_size + 1)
kernel = norm.pdf(xvals, scale=sd)
# maxval, minval = np.max(kernel), np.min(kernel)
# diff = maxval - minval
# kernel += (diff / (1 - ratio))
kernel /= np.sum(kernel)
return kernel
def make_quadratic_kernel(kernel_size: int, ratio: float) -> Array:
# print(bound)
# step_size = 2 * bound / (kernel_size - 1)
kernel_size += int(not kernel_size % 2)
# print(kernel_size)
wing_size = int((kernel_size - 1) / 2)
# print(step_size)
# xvals = list(map(lambda i: i * step_size, range(-wing_size, wing_size + 1)))
# print(xvals)
kernel = np.array(list(map(lambda x: float(-(x**2)), range(-wing_size, wing_size + 1))))
# print(kernel)
kernel = np.array(
list(map(lambda x: float(-(x**2)), range(-wing_size, wing_size + 1)))
)
maxval, minval = np.max(kernel), np.min(kernel)
diff = maxval - minval
kernel += diff / (1 - ratio)
# print(kernel)
kernel /= np.sum(kernel)
# print(kernel)
return kernel
@ -93,7 +89,8 @@ def min_avg_for_interval(filtered: Array, interval: int) -> float:
def search_intervals(filtered: Array, min_interval: int, max_interval: int):
performance = [
(interval, *min_avg_for_interval(filtered, interval)) for interval in range(min_interval, max_interval + 1)
(interval, *min_avg_for_interval(filtered, interval))
for interval in range(min_interval, max_interval + 1)
]
best = min(performance, key=lambda x: x[1])
return best[0], best[2]
@ -103,7 +100,7 @@ def filter_array(
array: Array,
sum_filter: Array,
padding: Optional[Array] = None,
pad_value_function: Callable[[Array], float] = np.mean,
pad_value_function: Callable[[Array], float] = lambda x: 255.0, # np.mean,
) -> Array:
if sum_filter is None:
return array
@ -116,24 +113,74 @@ def filter_array(
return np.convolve(np.concatenate((padding, array, padding)), sum_filter, "valid")
ROW_FILTER1_WIDTH = 30
ROW_FILTER1_SD = 6
ROW_FILTER2_WIDTH = 20
ROW_FILTER2_SD = 4
COL_FILTER1_WIDTH = 90
COL_FILTER1_SD = 15
COL_FILTER2_WIDTH = 70
COL_FILTER2_SD = 12
COL_FILTER3_WIDTH = 200
COL_FILTER3_SD = 20
FILTERS = {
"row": {1: make_gaussian_kernel(30, 6), 2: make_gaussian_kernel(20, 4)},
"col": {1: make_gaussian_kernel(70, 10), 2: None},
"row": {
1: make_gaussian_kernel(ROW_FILTER1_WIDTH, ROW_FILTER1_SD),
2: make_gaussian_kernel(ROW_FILTER2_WIDTH, ROW_FILTER2_SD),
3: None,
},
"col": {
1: make_gaussian_kernel(COL_FILTER1_WIDTH, COL_FILTER1_SD),
2: make_gaussian_kernel(COL_FILTER2_WIDTH, COL_FILTER2_SD),
3: make_gaussian_kernel(COL_FILTER3_WIDTH, COL_FILTER3_SD),
},
}
def filter_fp_col_lines(line_list: list[int], filt_sums: Array) -> list[int]:
centers = list(
np.where(
(filt_sums[1:-1] < filt_sums[:-2]) * (filt_sums[1:-1] < filt_sums[2:])
)[0]
+ 1
)
if line_list[0] > centers[0]:
centers = centers[1:] + [len(filt_sums) - 1]
mindiff = np.std(filt_sums)
line_list = [
maxidx
for maxidx, minidx in zip(line_list, centers)
if (filt_sums[maxidx] - filt_sums[minidx]) > mindiff
]
return line_list
def get_lines_either(table_array: Array, horizontal=True) -> Array:
key = "row" if horizontal else "col"
THRESHOLD = 0.4
filters = FILTERS
sums = np.mean(table_array, axis=int(horizontal))
sums = np.maximum(sums, (sums < THRESHOLD))
# save_plot(rows, name=save_path / "rows", title="raw row averages")
filtered_sums = filter_array(sums, FILTERS[key][1]) # ROW_FILTER1)
threshold = 0.3 * 255 # np.mean(sums) - (1 + 2 * horizontal) * np.std(sums)
predicate = 1000.0 * (sums < threshold)
sums = np.maximum(
np.maximum(sums[1:-1], predicate[1:-1]),
np.maximum(predicate[:-2], predicate[2:]),
)
filtered_sums = filter_array(sums, FILTERS[key][1])
filtered_sums = filter_array(filtered_sums, FILTERS[key][2])
filtered_sums = filter_array(filtered_sums, FILTERS[key][3])
lines = list(
np.where(
(filtered_sums[1:-1] > filtered_sums[:-2])
* (filtered_sums[1:-1] > filtered_sums[2:])
)[0]
+ 1
)
if not horizontal:
filtered_sums = filter_array(filtered_sums, FILTERS[key][2]) # ROW_FILTER2)
lines = argrelextrema(filtered_sums, np.greater)[0]
lines = filter_fp_col_lines(lines, filtered_sums)
return lines
@ -143,7 +190,9 @@ def img_bytes_to_array(img_bytes: bytes) -> Array:
def infer_lines(img: Array) -> dict[str, list[dict[str, int]]]:
# cv2.GaussianBlur(img,(15,5),cv2.BORDER_DEFAULT)
cv2.imwrite("/tmp/table.png", img)
_, img = cv2.threshold(img, 220, 255, cv2.THRESH_BINARY)
cv2.imwrite("/tmp/table_bin.png", img)
h, w = map(int, img.shape)
row_vals = map(int, get_lines_either(img, horizontal=True))
col_vals = map(int, get_lines_either(img, horizontal=False))
@ -152,4 +201,6 @@ def infer_lines(img: Array) -> dict[str, list[dict[str, int]]]:
{"x1": c, "y1": 0, "x2": c, "y2": h} for c in col_vals
]
save_lines(img, lines)
return {"tableLines": lines, "imageInfo": {"height": h, "width": w}}

View File

@ -7,11 +7,12 @@ import fitz
from kn_utils.logging import logger
def annotate_pdf(pdf: Union[str, bytes, Path], annotations, output_path: Union[str, Path] = None):
def annotate_pdf(
pdf: Union[str, bytes, Path], annotations, output_path: Union[str, Path] = None
):
pdf_bytes = provide_byte_stream(pdf)
with fitz.open(stream=pdf_bytes) as pdf_handle:
for page_annotations in annotations:
# FIXME: Adapt to line drawing
index = page_annotations["pageNum"]
annotate_page(pdf_handle[index], page_annotations)
output_path = output_path or "/tmp/annotated.pdf"
@ -20,16 +21,21 @@ def annotate_pdf(pdf: Union[str, bytes, Path], annotations, output_path: Union[s
def annotate_page(page: fitz.Page, prediction):
# for box in prediction["boxes"]:
# bbox = itemgetter("x1", "y1", "x2", "y2")(box["box"])
# label, probability, uuid = itemgetter("label", "probability", "uuid")(box)
for box in prediction.get("boxes", []):
bbox = itemgetter("x1", "y1", "x2", "y2")(box["box"])
label, probability, uuid = itemgetter("label", "probability", "uuid")(box)
# bbox = mirror_on_x_axis(bbox, page.bound().height)
# x0, y0, x1, y1 = bbox
# page.draw_rect(fitz.Rect(x0, y0, x1, y1), color=(0, 0, 1), width=2)
# label_x, label_y = x0, y0 - 5
# page.insert_text((label_x, label_y), f"{label} ({probability:.2f}), {uuid}", fontsize=12, color=(0.4, 0.4, 1))
for line in prediction["tableLines"]:
bbox = mirror_on_x_axis(bbox, page.bound().height)
x0, y0, x1, y1 = bbox
page.draw_rect(fitz.Rect(x0, y0, x1, y1), color=(0, 0, 1), width=2)
label_x, label_y = x0, y0 - 5
page.insert_text(
(label_x, label_y),
f"{label} ({probability:.2f}), {uuid}",
fontsize=12,
color=(0.4, 0.4, 1),
)
for line in prediction.get("tableLines", []):
start = itemgetter("x1", "y1")(line)
end = itemgetter("x2", "y2")(line)
page.draw_line(start, end, color=(1, 0, 0.5), width=1)

View File

@ -1,15 +1,13 @@
from dataclasses import dataclass
from functools import partial
from operator import itemgetter
from typing import Iterable
from typing import Tuple
from typing import Iterable, Tuple
import fitz
import numpy as np
from funcy import compose, lfilter
from numpy import ndarray as Array
from kn_utils.logging import logger
from numpy import ndarray as Array
@dataclass
@ -23,12 +21,17 @@ class PageInfo:
image_width: int | float
image_height: int | float
rotation: int
def transform_image_coordinates_to_pdf_coordinates(
bbox: Iterable[int | float], rotation_matrix: fitz.Matrix, transformation_matrix: fitz.Matrix, dpi: int = None
bbox: Iterable[int | float],
rotation_matrix: fitz.Matrix,
transformation_matrix: fitz.Matrix,
dpi: int = None,
) -> Tuple:
x1, y1, x2, y2 = map(lambda x: (x / dpi) * 72, bbox) if dpi else bbox # Convert to points, can be done before
x1, y1, x2, y2 = (
map(lambda x: (x / dpi) * 72, bbox) if dpi else bbox
) # Convert to points, can be done before
rect = fitz.Rect(x1, y1, x2, y2)
rect = rect * rotation_matrix * transformation_matrix
@ -42,51 +45,44 @@ def rescale_to_pdf(bbox: Iterable[int | float], page_info: PageInfo) -> Iterable
pix_h, pix_w = page_info.image_height, page_info.image_width
ratio_h, ratio_w = pdf_h / pix_h, pdf_w / pix_w
round3 = lambda x: tuple(map(lambda y: round(y, 3), x))
ratio_w, ratio_h, pdf_w, pdf_h, pix_w, pix_h = round3((ratio_w, ratio_h, pdf_w, pdf_h, pix_w, pix_h))
new_bbox = round3((bbox[0] * ratio_w, bbox[1] * ratio_h, bbox[2] * ratio_w, bbox[3] * ratio_h))
# logger.info(f"{pdf_h=}, {pix_h=}, {pdf_w=}, {pix_w=}, {ratio_w=}, {ratio_h=}")
# logger.info(round3(bbox))
# logger.info(new_bbox)
ratio_w, ratio_h, pdf_w, pdf_h, pix_w, pix_h = round3(
(ratio_w, ratio_h, pdf_w, pdf_h, pix_w, pix_h)
)
new_bbox = round3(
(bbox[0] * ratio_w, bbox[1] * ratio_h, bbox[2] * ratio_w, bbox[3] * ratio_h)
)
return new_bbox
def transform_table_lines_by_page_info(bboxes: dict, offsets: tuple, page_info: PageInfo) -> dict:
# FIXME: Also convert image info? Is image info necessary?
# Also, the resulting lines are not in the table bbox, is this okay?
# transform = partial(
# transform_image_coordinates_to_pdf_coordinates,
# rotation_matrix=page_info.rotation_matrix,
# transformation_matrix=page_info.transformation_matrix,
# dpi=page_info.dpi,
# )
def transform_table_lines_by_page_info(
bboxes: dict, offsets: tuple, page_info: PageInfo
) -> dict:
transform = partial(rescale_to_pdf, page_info=page_info)
logger.info(f"{offsets=}")
logger.debug(f"{offsets=}")
def apply_offsets(line: tuple) -> tuple:
x1, y1, x2, y2 = line
offset_x, offset_y = offsets
offset_y = page_info.height - offset_y # - (y2 * (y1 != y2))
logger.info((f"new offsets: {offset_x}, {offset_y}"))
x1, y1, x2, y2 = line
offset_x, offset_y = offsets
offset_y = page_info.height - offset_y
logger.debug((f"new offsets: {offset_x}, {offset_y}"))
return (x1 + offset_x, y1 + offset_y, x2 + offset_x, y2 + offset_y)
unpack = itemgetter("x1", "y1", "x2", "y2")
pack = lambda x: {"x1": x[0], "y1": x[1], "x2": x[2], "y2": x[3]}
# convert = compose(pack, transform, apply_offsets, unpack)
convert = compose(pack, apply_offsets, transform, unpack)
# convert = compose(pack, transform, unpack)
table_lines = bboxes.get("tableLines", [])
transformed_lines = list(map(convert, table_lines))
bboxes["tableLines"] = transformed_lines #lfilter(lambda b: b['y1']==b['y2'], transformed_lines)
bboxes[
"tableLines"
] = transformed_lines # lfilter(lambda b: b['y1']==b['y2'], transformed_lines)
import json
for i in range(len(table_lines)):
logger.info(json.dumps(table_lines[i], indent=4))
logger.info(json.dumps(transformed_lines[i], indent=4))
logger.info('')
# exit()
logger.debug(json.dumps(table_lines[i], indent=4))
logger.debug(json.dumps(transformed_lines[i], indent=4))
logger.debug("")
return bboxes
@ -106,11 +102,10 @@ def extract_images_from_pdf(
boxes = page_dict["boxes"]
boxes = filter(lambda box_obj: box_obj["label"] == "table", boxes)
page = fh[page_num] # pages[int(page_num)]
page = fh[page_num]
page.wrap_contents()
# TODO: Workaround to be able to transform the image coordinates to pdf coordinates in a later step.
page_image = page.get_pixmap(dpi=200)
# import IPython; IPython.embed()
current_page_info = PageInfo(
page_num,
page.rotation_matrix,
@ -131,12 +126,21 @@ def extract_images_from_pdf(
# current_page_info object to include the derotation_matrix.
rect = rect * page.transformation_matrix * page.rotation_matrix
pixmap = page.get_pixmap(clip=rect, dpi=dpi, colorspace=fitz.csGRAY)
shape = (pixmap.h, pixmap.w, pixmap.n) if pixmap.n > 1 else (pixmap.h, pixmap.w)
shape = (
(pixmap.h, pixmap.w, pixmap.n)
if pixmap.n > 1
else (pixmap.h, pixmap.w)
)
image = np.frombuffer(pixmap.samples, dtype=np.uint8).reshape(*shape)
table_images.append(image)
table_info.append(
{"pageNum": page_num, "bbox": bbox, "uuid": box_obj["uuid"], "label": box_obj["label"]}
{
"pageNum": page_num,
"bbox": bbox,
"uuid": box_obj["uuid"],
"label": box_obj["label"],
}
)
page_info.append(current_page_info)