2023-03-16 17:33:49 +01:00

68 lines
2.3 KiB
Python

import gzip
import json
import logging
from operator import itemgetter
from funcy import compose
from cv_analysis.config import get_config
from cv_analysis.server.pipeline import get_analysis_pipeline
from cv_analysis.utils.banner import make_art
from pyinfra import config as pyinfra_config
from pyinfra.payload_processing.monitor import get_monitor
from pyinfra.queue.queue_manager import QueueManager
from pyinfra.storage.storage import get_storage
PYINFRA_CONFIG = pyinfra_config.get_config()
CV_CONFIG = get_config()
logging.basicConfig(level=PYINFRA_CONFIG.logging_level_root)
# TODO: add kwargs/ operation key passing to processing fn in pyinfra PayloadProcessor be able to use it here.
MONITOR = get_monitor(PYINFRA_CONFIG)
def analysis_callback(queue_message: dict):
dossier_id, file_id, target_file_ext, response_file_ext, operation = itemgetter(
"dossierId", "fileId", "targetFileExtension", "responseFileExtension", "operation"
)(queue_message)
bucket = PYINFRA_CONFIG.storage_bucket
logging.info("running operation %s file_id=%s and dossier_id=%s", operation, file_id, dossier_id)
storage = get_storage(PYINFRA_CONFIG)
object_name = f"{dossier_id}/{file_id}.{target_file_ext}"
if storage.exists(bucket, object_name):
object_bytes = gzip.decompress(storage.get_object(bucket, object_name))
analysis_fn = MONITOR(
compose(
list,
get_analysis_pipeline(operation, CV_CONFIG.table_parsing_skip_pages_without_images),
)
)
results = list(analysis_fn(object_bytes))
logging.info("predictions ready for file_id=%s and dossier_id=%s", file_id, dossier_id)
response = {**queue_message, "data": results}
response = gzip.compress(json.dumps(response).encode())
response_name = f"{dossier_id}/{file_id}.{response_file_ext}"
logging.info("storing predictions for file_id=%s and dossier_id=%s", file_id, dossier_id)
storage.put_object(bucket, response_name, response)
return {"dossierId": dossier_id, "fileId": file_id}
else:
return None
if __name__ == "__main__":
logging.info(make_art())
queue_manager = QueueManager(PYINFRA_CONFIG)
queue_manager.start_consuming(analysis_callback)