From 1cca0455da7fbf95bfe44cd02a66c02ab805721d Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Wed, 16 Feb 2022 17:16:45 +0100 Subject: [PATCH] refactorig WIP 4: callback factories --- config.yaml | 1 + pyinfra/callback.py | 32 ++++++++---------------------- src/serve.py | 47 +++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 52 insertions(+), 28 deletions(-) diff --git a/config.yaml b/config.yaml index f9a0671..840469e 100755 --- a/config.yaml +++ b/config.yaml @@ -24,6 +24,7 @@ service: logfile_path: $LOGFILE_PATH|null # Overwrites the default path for the service logfile (image_service/log.log) name: $SERVICE_NAME|mini-queue-service-v1 # Name of the service in the kubernetes cluster storage_backend: $STORAGE_BACKEND|s3 # The storage to pull files to be processed from + analysis_endpoint: $ANALYSIS_ENDPOINT|localhost webserver: host: $SANIC_HOST|"0.0.0.0" # Sanic webserver host address diff --git a/pyinfra/callback.py b/pyinfra/callback.py index cbf76e6..5937e76 100644 --- a/pyinfra/callback.py +++ b/pyinfra/callback.py @@ -1,43 +1,27 @@ import json -import logging -import tempfile -from operator import itemgetter + +from flask import jsonify from pyinfra.rabbitmq import make_connection, make_channel, declare_queue -from pyinfra.storage.storage import StorageHandle -from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name, download, unzip -def make_callback(body_processor, output_queue_name): +def make_callback_for_output_queue(json_wrapped_body_processor, output_queue_name): connection = make_connection() channel = make_channel(connection) declare_queue(channel, output_queue_name) def callback(channel, method, _, body): - channel.basic_publish(exchange="", routing_key=output_queue_name, body=body_processor(body)) + channel.basic_publish(exchange="", routing_key=output_queue_name, body=json_wrapped_body_processor(body)) channel.basic_ack(delivery_tag=method.delivery_tag) return callback -def make_storage_callback(output_queue_name, storage: StorageHandle): +def processor_to_callback(body_processor, output_queue_name): - def process(payload): + def wrap_body_processor_on_json_decode_encode_operations(payload): + return json.dumps(body_processor(json.loads(payload))) - payload = json.loads(payload) - - logging.debug(f"Received {payload}") - - with tempfile.TemporaryDirectory() as pdf_compressed_dir: - with tempfile.TemporaryDirectory() as pdf_dir: - - dossier_id, file_id = itemgetter("dossierId", "fileId")(payload) - object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id) - downloaded_file_path = download(storage, object_name, pdf_compressed_dir) - unzipped_file_path = unzip(downloaded_file_path, pdf_dir) - - return json.dumps(unzipped_file_path) - - callback = make_callback(process, output_queue_name) + callback = make_callback_for_output_queue(wrap_body_processor_on_json_decode_encode_operations, output_queue_name) return callback diff --git a/src/serve.py b/src/serve.py index da7fd5b..2be3cdc 100644 --- a/src/serve.py +++ b/src/serve.py @@ -1,17 +1,56 @@ import json import logging +import logging +import tempfile from multiprocessing import Process +from operator import itemgetter from flask import Flask, jsonify from waitress import serve +from pyinfra.callback import processor_to_callback from pyinfra.config import CONFIG from pyinfra.consume import consume, ConsumerError -from pyinfra.callback import make_callback, make_storage_callback from pyinfra.storage.azure_blob_storage import AzureBlobStorageHandle - from pyinfra.storage.minio import MinioHandle -# from image_service.storage.azure_blob_storage import AzureBlobStorageHandle +from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name, download, unzip + + +def make_file_getter(storage): + def get_file(payload): + with tempfile.TemporaryDirectory() as pdf_compressed_dir: + with tempfile.TemporaryDirectory() as pdf_dir: + dossier_id, file_id = itemgetter("dossierId", "fileId")(payload) + object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id) + downloaded_file_path = download(storage, object_name, pdf_compressed_dir) + unzipped_file_path = unzip(downloaded_file_path, pdf_dir) + + return unzipped_file_path + + return get_file + + +def make_file_analyzer(analysis_endpoint): + + def analyze_file(analysis_endpoint): + # REST call + return 1 + + return analyze_file + + +def make_payload_processor(analysis_endpoint): + + get_file = make_file_getter(get_storage()) + analyze_file = make_file_analyzer(analysis_endpoint) + + def process(payload: dict): + logging.info(f"Processing {payload}") + file_path = get_file(payload) + predictions = analyze_file(file_path) + return json.dumps(predictions) + + return process # TODO: implement meaningful checks @@ -56,7 +95,7 @@ def get_storage(): def main(): - callback = make_storage_callback(output_queue_name=CONFIG.rabbitmq.queues.output, storage=get_storage()) + callback = processor_to_callback(make_payload_processor(CONFIG.service.analysis_endpoint), output_queue_name=CONFIG.rabbitmq.queues.output) webserver = Process(target=start_integrity_checks_webserver, args=("debug", )) webserver.start()