refactorig WIP 4: callback factories

This commit is contained in:
Matthias Bisping 2022-02-16 17:16:45 +01:00
parent 61e03f9c47
commit 1cca0455da
3 changed files with 52 additions and 28 deletions

View File

@ -24,6 +24,7 @@ service:
logfile_path: $LOGFILE_PATH|null # Overwrites the default path for the service logfile (image_service/log.log)
name: $SERVICE_NAME|mini-queue-service-v1 # Name of the service in the kubernetes cluster
storage_backend: $STORAGE_BACKEND|s3 # The storage to pull files to be processed from
analysis_endpoint: $ANALYSIS_ENDPOINT|localhost
webserver:
host: $SANIC_HOST|"0.0.0.0" # Sanic webserver host address

View File

@ -1,43 +1,27 @@
import json
import logging
import tempfile
from operator import itemgetter
from flask import jsonify
from pyinfra.rabbitmq import make_connection, make_channel, declare_queue
from pyinfra.storage.storage import StorageHandle
from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name, download, unzip
def make_callback(body_processor, output_queue_name):
def make_callback_for_output_queue(json_wrapped_body_processor, output_queue_name):
connection = make_connection()
channel = make_channel(connection)
declare_queue(channel, output_queue_name)
def callback(channel, method, _, body):
channel.basic_publish(exchange="", routing_key=output_queue_name, body=body_processor(body))
channel.basic_publish(exchange="", routing_key=output_queue_name, body=json_wrapped_body_processor(body))
channel.basic_ack(delivery_tag=method.delivery_tag)
return callback
def make_storage_callback(output_queue_name, storage: StorageHandle):
def processor_to_callback(body_processor, output_queue_name):
def process(payload):
def wrap_body_processor_on_json_decode_encode_operations(payload):
return json.dumps(body_processor(json.loads(payload)))
payload = json.loads(payload)
logging.debug(f"Received {payload}")
with tempfile.TemporaryDirectory() as pdf_compressed_dir:
with tempfile.TemporaryDirectory() as pdf_dir:
dossier_id, file_id = itemgetter("dossierId", "fileId")(payload)
object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id)
downloaded_file_path = download(storage, object_name, pdf_compressed_dir)
unzipped_file_path = unzip(downloaded_file_path, pdf_dir)
return json.dumps(unzipped_file_path)
callback = make_callback(process, output_queue_name)
callback = make_callback_for_output_queue(wrap_body_processor_on_json_decode_encode_operations, output_queue_name)
return callback

View File

@ -1,17 +1,56 @@
import json
import logging
import logging
import tempfile
from multiprocessing import Process
from operator import itemgetter
from flask import Flask, jsonify
from waitress import serve
from pyinfra.callback import processor_to_callback
from pyinfra.config import CONFIG
from pyinfra.consume import consume, ConsumerError
from pyinfra.callback import make_callback, make_storage_callback
from pyinfra.storage.azure_blob_storage import AzureBlobStorageHandle
from pyinfra.storage.minio import MinioHandle
# from image_service.storage.azure_blob_storage import AzureBlobStorageHandle
from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name, download, unzip
def make_file_getter(storage):
def get_file(payload):
with tempfile.TemporaryDirectory() as pdf_compressed_dir:
with tempfile.TemporaryDirectory() as pdf_dir:
dossier_id, file_id = itemgetter("dossierId", "fileId")(payload)
object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id)
downloaded_file_path = download(storage, object_name, pdf_compressed_dir)
unzipped_file_path = unzip(downloaded_file_path, pdf_dir)
return unzipped_file_path
return get_file
def make_file_analyzer(analysis_endpoint):
def analyze_file(analysis_endpoint):
# REST call
return 1
return analyze_file
def make_payload_processor(analysis_endpoint):
get_file = make_file_getter(get_storage())
analyze_file = make_file_analyzer(analysis_endpoint)
def process(payload: dict):
logging.info(f"Processing {payload}")
file_path = get_file(payload)
predictions = analyze_file(file_path)
return json.dumps(predictions)
return process
# TODO: implement meaningful checks
@ -56,7 +95,7 @@ def get_storage():
def main():
callback = make_storage_callback(output_queue_name=CONFIG.rabbitmq.queues.output, storage=get_storage())
callback = processor_to_callback(make_payload_processor(CONFIG.service.analysis_endpoint), output_queue_name=CONFIG.rabbitmq.queues.output)
webserver = Process(target=start_integrity_checks_webserver, args=("debug", ))
webserver.start()