refactorig WIP 2: storage callback

This commit is contained in:
Matthias Bisping 2022-02-16 16:02:06 +01:00
parent 5cd52928ce
commit b1ade1fb65
3 changed files with 34 additions and 8 deletions

View File

@ -1,4 +1,11 @@
import json
import logging
import tempfile
from operator import itemgetter
from pyinfra.minio import MinioHandle
from pyinfra.rabbitmq import make_connection, make_channel, declare_queue
from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name, download, unzip
def make_callback(body_processor, output_queue_name):
@ -12,3 +19,27 @@ def make_callback(body_processor, output_queue_name):
channel.basic_ack(delivery_tag=method.delivery_tag)
return callback
def make_storage_callback(output_queue_name):
storage_client = MinioHandle()
def process(payload):
payload = json.loads(payload)
logging.debug(f"Received {payload}")
with tempfile.TemporaryDirectory() as pdf_compressed_dir:
with tempfile.TemporaryDirectory() as pdf_dir:
dossier_id, file_id = itemgetter("dossierId", "fileId")(payload)
object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id)
downloaded_file_path = download(storage_client, object_name, pdf_compressed_dir)
unzipped_file_path = unzip(downloaded_file_path, pdf_dir)
return json.dumps(unzipped_file_path)
callback = make_callback(process, output_queue_name)
return callback

View File

@ -47,8 +47,6 @@ def unzip(gz_path, pdf_dir):
assert ext == ".gz"
logging.debug(f"unzipping {gz_path} into {target_path}")
with gzip.open(gz_path, "rb") as f_in:
with open(target_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
@ -66,6 +64,7 @@ def unzip(gz_path, pdf_dir):
def download(storage_client, object_name, target_root_dir):
logging.debug(f"Downloading {object_name}...")
downloaded_file_path = storage_client.download_file(object_name, target_root_dir=target_root_dir)
logging.debug(f"Downloaded {object_name} into {downloaded_file_path}.")
return downloaded_file_path

View File

@ -7,7 +7,7 @@ from waitress import serve
from pyinfra.config import CONFIG
from pyinfra.consume import consume, ConsumerError
from pyinfra.callback import make_callback
from pyinfra.callback import make_callback, make_storage_callback
# TODO: implement meaningful checks
@ -35,11 +35,7 @@ def start_integrity_checks_webserver(mode="debug"):
def main():
def processor(body):
print(f"[OUT]: {json.loads(body)}")
return body
callback = make_callback(body_processor=processor, output_queue_name=CONFIG.rabbitmq.queues.output)
callback = make_storage_callback(output_queue_name=CONFIG.rabbitmq.queues.output)
webserver = Process(target=start_integrity_checks_webserver, args=("debug", ))
webserver.start()