57 lines
1.9 KiB
Python
57 lines
1.9 KiB
Python
import gzip
|
|
import json
|
|
import logging
|
|
from operator import itemgetter
|
|
|
|
from cv_analysis.config import get_config
|
|
from cv_analysis.server.pipeline import make_analysis_pipeline_for_segment_type
|
|
from cv_analysis.utils.banner import make_art
|
|
from pyinfra import config as pyinfra_config
|
|
from pyinfra.queue.queue_manager import QueueManager
|
|
from pyinfra.storage.storage import get_storage
|
|
|
|
PYINFRA_CONFIG = pyinfra_config.get_config()
|
|
CV_CONFIG = get_config()
|
|
|
|
logging.basicConfig(level=PYINFRA_CONFIG.logging_level_root)
|
|
|
|
|
|
def analysis_callback(queue_message: dict):
|
|
|
|
dossier_id, file_id, target_file_ext, response_file_ext, operation = itemgetter(
|
|
"dossierId", "fileId", "targetFileExtension", "responseFileExtension", "operation"
|
|
)(queue_message)
|
|
bucket = PYINFRA_CONFIG.storage_bucket
|
|
logging.info(f"Processing {dossier_id=}/{file_id=}, {operation=}.")
|
|
|
|
storage = get_storage(PYINFRA_CONFIG)
|
|
object_name = f"{dossier_id}/{file_id}.{target_file_ext}"
|
|
|
|
if storage.exists(bucket, object_name):
|
|
should_publish_result = True
|
|
|
|
object_bytes = gzip.decompress(storage.get_object(bucket, object_name))
|
|
analysis_fn = make_analysis_pipeline_for_segment_type(
|
|
operation,
|
|
skip_pages_without_images=CV_CONFIG.table_parsing_skip_pages_without_images,
|
|
)
|
|
|
|
results = analysis_fn(object_bytes)
|
|
response = {**queue_message, "data": list(results)}
|
|
response = gzip.compress(json.dumps(response).encode())
|
|
response_name = f"{dossier_id}/{file_id}.{response_file_ext}"
|
|
|
|
storage.put_object(bucket, response_name, response)
|
|
else:
|
|
should_publish_result = False
|
|
|
|
return should_publish_result, {"dossierId": dossier_id, "fileId": file_id}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
logging.info(make_art())
|
|
|
|
queue_manager = QueueManager(PYINFRA_CONFIG)
|
|
queue_manager.start_consuming(analysis_callback)
|