Update pyinfra for multi-tenancy support
Update serve script with PayloadProcessor from pyinfra
This commit is contained in:
parent
cfbd2e287a
commit
87122ffb96
@ -1 +1 @@
|
||||
Subproject commit 0f24a7f26da3ce8ce326bf02b2d1946b6483be11
|
||||
Subproject commit 793a427c50d150523856c29b79a0000d7cde88ed
|
||||
66
src/serve.py
66
src/serve.py
@ -1,67 +1,37 @@
|
||||
import gzip
|
||||
import json
|
||||
import logging
|
||||
from operator import itemgetter
|
||||
|
||||
from funcy import compose
|
||||
|
||||
from cv_analysis.config import get_config
|
||||
from cv_analysis.server.pipeline import get_analysis_pipeline
|
||||
from cv_analysis.utils.banner import make_art
|
||||
from pyinfra import config as pyinfra_config
|
||||
from pyinfra.payload_processing.monitor import get_monitor
|
||||
from pyinfra.payload_processing import make_payload_processor
|
||||
from pyinfra.queue.queue_manager import QueueManager
|
||||
from pyinfra.storage.storage import get_storage
|
||||
|
||||
PYINFRA_CONFIG = pyinfra_config.get_config()
|
||||
CV_CONFIG = get_config()
|
||||
|
||||
logging.basicConfig(level=PYINFRA_CONFIG.logging_level_root)
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel(PYINFRA_CONFIG.logging_level_root)
|
||||
|
||||
|
||||
# TODO: add kwargs/ operation key passing to processing fn in pyinfra PayloadProcessor be able to use it here.
|
||||
MONITOR = get_monitor(PYINFRA_CONFIG)
|
||||
def make_dispatched_data_analysis(config):
|
||||
skip_pages_without_images = config.table_parsing_skip_pages_without_images
|
||||
|
||||
def inner(data: bytes, operation) -> list:
|
||||
analyse = get_analysis_pipeline(operation, skip_pages_without_images)
|
||||
return list(analyse(data))
|
||||
|
||||
return inner
|
||||
|
||||
|
||||
def analysis_callback(queue_message: dict):
|
||||
def main():
|
||||
logger.info("Application is starting")
|
||||
|
||||
dossier_id, file_id, target_file_ext, response_file_ext, operation = itemgetter(
|
||||
"dossierId", "fileId", "targetFileExtension", "responseFileExtension", "operation"
|
||||
)(queue_message)
|
||||
bucket = PYINFRA_CONFIG.storage_bucket
|
||||
logging.info("running operation %s file_id=%s and dossier_id=%s", operation, file_id, dossier_id)
|
||||
process_data = make_dispatched_data_analysis(config=CV_CONFIG)
|
||||
process_payload = make_payload_processor(process_data, config=PYINFRA_CONFIG)
|
||||
|
||||
storage = get_storage(PYINFRA_CONFIG)
|
||||
object_name = f"{dossier_id}/{file_id}.{target_file_ext}"
|
||||
|
||||
if storage.exists(bucket, object_name):
|
||||
object_bytes = gzip.decompress(storage.get_object(bucket, object_name))
|
||||
analysis_fn = MONITOR(
|
||||
compose(
|
||||
list,
|
||||
get_analysis_pipeline(operation, CV_CONFIG.table_parsing_skip_pages_without_images),
|
||||
)
|
||||
)
|
||||
|
||||
results = list(analysis_fn(object_bytes))
|
||||
logging.info("predictions ready for file_id=%s and dossier_id=%s", file_id, dossier_id)
|
||||
|
||||
response = {**queue_message, "data": results}
|
||||
response = gzip.compress(json.dumps(response).encode())
|
||||
response_name = f"{dossier_id}/{file_id}.{response_file_ext}"
|
||||
|
||||
logging.info("storing predictions for file_id=%s and dossier_id=%s", file_id, dossier_id)
|
||||
storage.put_object(bucket, response_name, response)
|
||||
|
||||
return {"dossierId": dossier_id, "fileId": file_id}
|
||||
|
||||
else:
|
||||
return None
|
||||
queue_manager = QueueManager(PYINFRA_CONFIG)
|
||||
queue_manager.start_consuming(process_payload)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
logging.info(make_art())
|
||||
|
||||
queue_manager = QueueManager(PYINFRA_CONFIG)
|
||||
queue_manager.start_consuming(analysis_callback)
|
||||
main()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user