From b1ade1fb659da75594621e8ff94347a858c223b3 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Wed, 16 Feb 2022 16:02:06 +0100 Subject: [PATCH] refactorig WIP 2: storage callback --- pyinfra/callback.py | 31 +++++++++++++++++++++++++++++++ pyinfra/utils/file.py | 3 +-- src/serve.py | 8 ++------ 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/pyinfra/callback.py b/pyinfra/callback.py index c9e3667..20548c5 100644 --- a/pyinfra/callback.py +++ b/pyinfra/callback.py @@ -1,4 +1,11 @@ +import json +import logging +import tempfile +from operator import itemgetter + +from pyinfra.minio import MinioHandle from pyinfra.rabbitmq import make_connection, make_channel, declare_queue +from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name, download, unzip def make_callback(body_processor, output_queue_name): @@ -12,3 +19,27 @@ def make_callback(body_processor, output_queue_name): channel.basic_ack(delivery_tag=method.delivery_tag) return callback + + +def make_storage_callback(output_queue_name): + + storage_client = MinioHandle() + + def process(payload): + + payload = json.loads(payload) + + logging.debug(f"Received {payload}") + + with tempfile.TemporaryDirectory() as pdf_compressed_dir: + with tempfile.TemporaryDirectory() as pdf_dir: + + dossier_id, file_id = itemgetter("dossierId", "fileId")(payload) + object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id) + downloaded_file_path = download(storage_client, object_name, pdf_compressed_dir) + unzipped_file_path = unzip(downloaded_file_path, pdf_dir) + + return json.dumps(unzipped_file_path) + + callback = make_callback(process, output_queue_name) + return callback diff --git a/pyinfra/utils/file.py b/pyinfra/utils/file.py index 9692bea..66727ce 100644 --- a/pyinfra/utils/file.py +++ b/pyinfra/utils/file.py @@ -47,8 +47,6 @@ def unzip(gz_path, pdf_dir): assert ext == ".gz" - logging.debug(f"unzipping {gz_path} into {target_path}") - with gzip.open(gz_path, "rb") as f_in: with open(target_path, "wb") as f_out: shutil.copyfileobj(f_in, f_out) @@ -66,6 +64,7 @@ def unzip(gz_path, pdf_dir): def download(storage_client, object_name, target_root_dir): + logging.debug(f"Downloading {object_name}...") downloaded_file_path = storage_client.download_file(object_name, target_root_dir=target_root_dir) logging.debug(f"Downloaded {object_name} into {downloaded_file_path}.") return downloaded_file_path diff --git a/src/serve.py b/src/serve.py index 020b003..1772242 100644 --- a/src/serve.py +++ b/src/serve.py @@ -7,7 +7,7 @@ from waitress import serve from pyinfra.config import CONFIG from pyinfra.consume import consume, ConsumerError -from pyinfra.callback import make_callback +from pyinfra.callback import make_callback, make_storage_callback # TODO: implement meaningful checks @@ -35,11 +35,7 @@ def start_integrity_checks_webserver(mode="debug"): def main(): - def processor(body): - print(f"[OUT]: {json.loads(body)}") - return body - - callback = make_callback(body_processor=processor, output_queue_name=CONFIG.rabbitmq.queues.output) + callback = make_storage_callback(output_queue_name=CONFIG.rabbitmq.queues.output) webserver = Process(target=start_integrity_checks_webserver, args=("debug", )) webserver.start()