import gzip from operator import itemgetter from typing import Callable from funcy import lmap from pyinfra.server.utils import make_streamable_and_wrap_in_packing_logic from cv_analysis.server.format import make_formatter from cv_analysis.utils.logging import get_logger from cv_analysis.utils.open_img import open_img logger = get_logger() def make_streamable_analysis_fn(analysis_fn: Callable): """Makes an analysis function streamable for pyinfra server logic. The wrapped function then works with data and metadata and returns a tuple or generator of tuples with data and metadata. For more information about the server logic, see the PyInfra documentation. Args: analysis_fn: cv-analysis function Returns: wrapped function """ def analyse(data: bytes, metadata: dict): image = open_img(gzip.decompress(data))[0] dpi = metadata["image_info"]["dpi"] width, height, rotation = itemgetter("width", "height", "rotation")(metadata["page_info"]) formatter = make_formatter(dpi, (width, height), rotation) results = map(lambda x: x.json_xywh(), analysis_fn(image)) results = {"cells": (lmap(formatter, results))} logger.debug(f"Page {metadata['page_info'].get('index', '')}: Found {len(results['cells'])} cells.") return b"", {**metadata, **results} return make_streamable_and_wrap_in_packing_logic(analyse, batched=False)