Merge in RR/pyinfra from change-queue-durability to master
Squashed commit of the following:
commit b01e5e05e27776cbb629ba3c2e757a64934ace30
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Thu Mar 17 12:50:40 2022 +0100
refactor
commit 549b89952e626ef7b1029c97ec7fb527112cc2b2
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Thu Mar 17 12:47:13 2022 +0100
black
commit 6a36f56a3e6308f4954a28c1bf4797a838986371
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Thu Mar 17 12:46:51 2022 +0100
adjust mock scriptis,add different file format option for IO
commit 3b50f88482d74ea39240cdd41cc7dd62076c87b7
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Thu Mar 17 12:45:49 2022 +0100
change request_queue to durable=True
73 lines
2.2 KiB
Python
73 lines
2.2 KiB
Python
import argparse
|
|
import gzip
|
|
import os
|
|
from pathlib import Path
|
|
|
|
from tqdm import tqdm
|
|
|
|
from pyinfra.config import CONFIG
|
|
from pyinfra.storage.storages import get_s3_storage
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser()
|
|
|
|
subparsers = parser.add_subparsers(help="sub-command help", dest="command")
|
|
|
|
parser_add = subparsers.add_parser("add", help="Add file(s) to the MinIO store")
|
|
parser_add.add_argument("dossier_id")
|
|
add_group = parser_add.add_mutually_exclusive_group(required=True)
|
|
add_group.add_argument("--file", "-f")
|
|
add_group.add_argument("--directory", "-d")
|
|
|
|
subparsers.add_parser("purge", help="Delete all files and buckets in the MinIO store")
|
|
|
|
args = parser.parse_args()
|
|
return args
|
|
|
|
|
|
def combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, extension):
|
|
return f"{dossier_id}/{file_id}{extension}"
|
|
|
|
|
|
def upload_compressed_response(storage, bucket_name, dossier_id, file_id, result) -> None:
|
|
data = gzip.compress(result.encode())
|
|
path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, CONFIG.service.response.extension)
|
|
storage.put_object(bucket_name, path_gz, data)
|
|
|
|
|
|
def add_file_compressed(storage, bucket_name, dossier_id, path) -> None:
|
|
if Path(path).suffix == ".pdf":
|
|
suffix_gz = ".ORIGIN.pdf.gz"
|
|
if Path(path).suffix == ".json":
|
|
suffix_gz = ".TEXT.json.gz"
|
|
path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, Path(path).stem, suffix_gz)
|
|
|
|
with open(path, "rb") as f:
|
|
data = gzip.compress(f.read())
|
|
storage.put_object(bucket_name, path_gz, data)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
storage = get_s3_storage()
|
|
bucket_name = CONFIG.storage.bucket
|
|
|
|
if not storage.has_bucket(bucket_name):
|
|
storage.make_bucket(bucket_name)
|
|
|
|
args = parse_args()
|
|
|
|
if args.command == "add":
|
|
|
|
if args.file:
|
|
add_file_compressed(storage, bucket_name, args.dossier_id, args.file)
|
|
|
|
elif args.directory:
|
|
for fname in tqdm([*os.listdir(args.directory)], desc="Adding files"):
|
|
path = Path(args.directory) / fname
|
|
add_file_compressed(storage, bucket_name, args.dossier_id, path)
|
|
|
|
elif args.command == "purge":
|
|
storage.clear_bucket(bucket_name)
|