From 4edf04ab24e4be4dd38ca352e77f5bd0442a83e4 Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Wed, 16 Feb 2022 11:57:52 +0100 Subject: [PATCH] Pull request #1: Add storage handle Merge in RR/mini_queue from add-storage-handle to master Squashed commit of the following: commit 03e542d2a65802c28735873fae184209f0c83553 Author: Julius Unverfehrt Date: Wed Feb 16 11:55:34 2022 +0100 Quickfix typo commit b4d538e9445187435d87c5cf8ce1f4e448021129 Author: Julius Unverfehrt Date: Wed Feb 16 11:41:42 2022 +0100 added prefetch count and make channel function commit d46d1375e387d36641c06b062a8ccc54f114ef4c Author: Julius Unverfehrt Date: Wed Feb 16 11:20:39 2022 +0100 black on M.s request commit bc47b20312a978f19b08531804bf42b00f0a88f0 Author: Julius Unverfehrt Date: Wed Feb 16 11:19:57 2022 +0100 changed response commit 9a475ecd8df9ca007e5f7fe146483b6403eccc3b Author: Julius Unverfehrt Date: Wed Feb 16 10:15:08 2022 +0100 . commit 108bc3ea90d867575db8c1b1503c9df859222485 Author: Julius Unverfehrt Date: Wed Feb 16 09:56:56 2022 +0100 quickrestore commit ae04d17d8d041f612d86117e8e96c96ddffcbde3 Author: Julius Unverfehrt Date: Wed Feb 16 09:37:30 2022 +0100 refactor commit 68051a72eb93868eba8adba234258b9e5373ecaa Author: Julius Unverfehrt Date: Wed Feb 16 08:50:59 2022 +0100 added answer file template for rancher commit 09ef45ead51c07732a20133acad0b8b2ae7d0a61 Author: Julius Unverfehrt Date: Wed Feb 16 08:26:05 2022 +0100 Quickfix inconsistency commit d925b0f3f91f29403c88fb6149566ec966af2973 Author: Julius Unverfehrt Date: Wed Feb 16 08:20:40 2022 +0100 Quick refactor commit 48795455cde8d97ed98e58c3004a87a26f331352 Author: Julius Unverfehrt Date: Tue Feb 15 17:46:45 2022 +0100 bluckckck commit 80e58efab0269dc513990f83b14ceb36b3e4dd8e Author: Julius Unverfehrt Date: Tue Feb 15 17:45:49 2022 +0100 Quick restatus setting commit 83f276ee13348a678b7da84e25ca844dd348b4c9 Author: Julius Unverfehrt Date: Tue Feb 15 17:30:16 2022 +0100 Quickreset to working status commit d44cdcf922250639a6832cc3e16d0d967d9853fb Author: Julius Unverfehrt Date: Tue Feb 15 14:44:26 2022 +0100 added storage handle for minio WIP --- .dockerignore | 1 + config.yaml | 11 +- docker-compose.yaml | 11 ++ dotfiles/minimal_conf_rancher.yaml | 76 ++++++------ mini_queue/run.py | 35 ++---- mini_queue/utils/file.py | 88 ++++++++++++++ mini_queue/utils/meta.py | 116 +++++++++++++++++++ mini_queue/utils/minio.py | 105 +++++++++++++++++ mini_queue/utils/rabbitmq.py | 66 +++++++++++ mini_queue/utils/storage.py | 179 +++++++++++++++++++++++++++++ requirements.txt | 1 + scripts/manage_minio.py | 51 ++++++++ scripts/mock_knock.py | 38 ++++++ scripts/mock_publish.py | 30 ----- 14 files changed, 714 insertions(+), 94 deletions(-) create mode 100644 .dockerignore create mode 100644 mini_queue/utils/file.py create mode 100644 mini_queue/utils/meta.py create mode 100644 mini_queue/utils/minio.py create mode 100644 mini_queue/utils/rabbitmq.py create mode 100644 mini_queue/utils/storage.py create mode 100644 scripts/manage_minio.py create mode 100644 scripts/mock_knock.py delete mode 100644 scripts/mock_publish.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..1269488 --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +data diff --git a/config.yaml b/config.yaml index a03c35f..451d999 100755 --- a/config.yaml +++ b/config.yaml @@ -9,7 +9,16 @@ rabbitmq: queues: # Hack image-service queues input: image_request_queue # requests to service output: image_response_queue # responses by service - dead_letter: image_letter_queue # messages that failed to process + dead_letter: image_dead_letter_queue # messages that failed to process + + prefetch_count: 2 + +minio: + host: $STORAGE_ENDPOINT|localhost # MinIO host address + port: $STORAGE_PORT|9000 # MinIO host port + user: $STORAGE_KEY|root # MinIO user name + password: $STORAGE_SECRET|password # MinIO user password + bucket: $STORAGE_BUCKET_NAME|redaction # MinIO bucket service: diff --git a/docker-compose.yaml b/docker-compose.yaml index d6d75a2..4f4a6fa 100755 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,5 +1,16 @@ version: '2' services: + minio: + image: minio/minio + ports: + - "9000:9000" + environment: + - MINIO_ROOT_PASSWORD=password + - MINIO_ROOT_USER=root + volumes: + - ./data/minio_store:/data + command: server /data + network_mode: "bridge" rabbitmq: image: docker.io/bitnami/rabbitmq:3.9 ports: diff --git a/dotfiles/minimal_conf_rancher.yaml b/dotfiles/minimal_conf_rancher.yaml index a23ee21..de28a60 100644 --- a/dotfiles/minimal_conf_rancher.yaml +++ b/dotfiles/minimal_conf_rancher.yaml @@ -2,25 +2,25 @@ apiVersion: apps/v1 kind: Deployment metadata: annotations: - meta.helm.sh/release-name: red-research - meta.helm.sh/release-namespace: red-research + meta.helm.sh/release-name: red-research2 + meta.helm.sh/release-namespace: red-research2 labels: apiVersion: v2 app: image-service - app.kubernetes.io/instance: red-research + app.kubernetes.io/instance: red-research2 app.kubernetes.io/managed-by: Helm app.kubernetes.io/name: redaction helm.sh/chart: redaction - io.cattle.field/appId: red-research + io.cattle.field/appId: red-research2 type: service name: mini-queue - namespace: red-research + namespace: red-research2 spec: selector: matchLabels: apiVersion: v2 app: image-service - io.cattle.field/appId: red-research + io.cattle.field/appId: red-research2 template: metadata: annotations: @@ -30,7 +30,7 @@ spec: labels: apiVersion: v2 app: image-service - io.cattle.field/appId: red-research + io.cattle.field/appId: red-research2 spec: affinity: podAntiAffinity: @@ -49,55 +49,57 @@ spec: - env: - name: BATCH_SIZE value: "32" - - name: CONCURRENCY - value: "1" - - name: LOGGING_LEVEL_ROOT - value: DEBUG - - name: MAX_IMAGE_FORMAT - value: "10" - - name: MAX_REL_IMAGE_SIZE - value: "0.75" + - name: AVAILABLE_MEMORY + value: "6000" - name: MINIMUM_FREE_MEMORY_PERCENTAGE value: "0.3" - - name: MIN_IMAGE_FORMAT - value: "0.1" - - name: MIN_REL_IMAGE_SIZE - value: "0.05" - - name: MONITORING_ENABLED - value: "true" - - name: MONITOR_MEMORY_USAGE - value: "true" - name: RABBITMQ_HEARTBEAT value: "7200" - - name: RABBITMQ_HOST - value: red-research-rabbitmq - - name: RABBITMQ_USERNAME - value: user - - name: RUN_ID - value: fabfb1f192c745369b88cab34471aba7 - - name: STORAGE_BUCKET_NAME - value: redaction - - name: STORAGE_ENDPOINT - value: red-research-minio-headless + - name: MONITOR_MEMORY_USAGE + value: "true" - name: VERBOSE value: "true" + - name: RUN_ID + value: fabfb1f192c745369b88cab34471aba7 + - name: MIN_REL_IMAGE_SIZE + value: "0.05" + - name: MAX_REL_IMAGE_SIZE + value: "0.75" + - name: MIN_IMAGE_FORMAT + value: "0.1" + - name: MAX_IMAGE_FORMAT + value: "10" + - name: LOGGING_LEVEL_ROOT + value: DEBUG + - name: CONCURRENCY + value: "1" + - name: MONITORING_ENABLED + value: "true" + - name: RABBITMQ_HOST + value: red-research2-rabbitmq + - name: RABBITMQ_USERNAME + value: user - name: RABBITMQ_PASSWORD valueFrom: secretKeyRef: key: rabbitmq-password - name: red-research-rabbitmq + name: red-research2-rabbitmq + - name: STORAGE_ENDPOINT + value: red-research2-minio-headless + - name: STORAGE_BUCKET_NAME + value: redaction optional: false - name: STORAGE_KEY valueFrom: secretKeyRef: key: root-user - name: red-research-minio + name: red-research2-minio optional: false - name: STORAGE_SECRET valueFrom: secretKeyRef: key: root-password - name: red-research-minio + name: red-research2-minio optional: false envFrom: - configMapRef: @@ -136,7 +138,7 @@ spec: - command: - sh - -c - - until nc -z -w 10 red-research-rabbitmq 5672; do echo waiting for rabbitmq; + - until nc -z -w 10 red-research2-rabbitmq 5672; do echo waiting for rabbitmq; done; echo rabbitmq found image: nexus.iqser.com:5001/infra/busybox:1.33.1 imagePullPolicy: Always diff --git a/mini_queue/run.py b/mini_queue/run.py index 8db7f14..e598daa 100644 --- a/mini_queue/run.py +++ b/mini_queue/run.py @@ -1,49 +1,32 @@ import logging -import time import pika from mini_queue.utils.config import CONFIG - - -def callback(channel, method, properties, body): - logging.info(" [R] Received %r" % body) - time.sleep(1) - response = body - channel.basic_publish(exchange="", routing_key=CONFIG.rabbitmq.queues.output, body=response) - channel.basic_ack(delivery_tag=method.delivery_tag) - - -def init_params(): - credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) - parameters = pika.ConnectionParameters( - host=CONFIG.rabbitmq.host, - port=CONFIG.rabbitmq.port, - heartbeat=CONFIG.rabbitmq.heartbeat, - credentials=credentials, - ) - return parameters +from mini_queue.utils.rabbitmq import make_channel, declare_queue, make_callback, read_connection_params def main(): logging.info(" [S] Startet happy pikachu!") - parameters = init_params() + parameters = read_connection_params() connection = pika.BlockingConnection(parameters) - channel = connection.channel() - # channel.queue_declare(queue=CONFIG.rabbitmq.queues.output, durable=True) + channel = make_channel(connection) + declare_queue(channel, CONFIG.rabbitmq.queues.input) while True: try: - channel.basic_consume(queue=CONFIG.rabbitmq.queues.input, auto_ack=False, on_message_callback=callback) + channel.basic_consume( + queue=CONFIG.rabbitmq.queues.input, auto_ack=False, on_message_callback=make_callback() + ) logging.info(" [*] Waiting for messages. To exit press CTRL+C") channel.start_consuming() except pika.exceptions.ConnectionClosedByBroker as err: - logging.info("Caught a channel error: {}, stopping...".format(err)) + logging.info(f"Caught a channel error: {err}, retrying...") continue except pika.exceptions.AMQPChannelError as err: - logging.warning("Caught a channel error: {}, stopping...".format(err)) + logging.critical(f"Caught a channel error: {err}, stopping...") break except pika.exceptions.AMQPConnectionError: logging.info("Connection was closed, retrying...") diff --git a/mini_queue/utils/file.py b/mini_queue/utils/file.py new file mode 100644 index 0000000..9692bea --- /dev/null +++ b/mini_queue/utils/file.py @@ -0,0 +1,88 @@ +"""Defines utilities for different operations on files.""" + + +import gzip +import logging +import os +import shutil +import tempfile +from operator import itemgetter + + +def provide_directory(path): + if os.path.isfile(path): + provide_directory(os.path.dirname(path)) + if not os.path.isdir(path): + try: + os.makedirs(path) + except FileExistsError: + pass + + +def produce_compressed_storage_pdf_object_name(path_no_ext, ext="pdf"): + return f"{path_no_ext}.ORIGIN.{ext}.gz" + + +def dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id): + path_no_ext = os.path.join(dossier_id, file_id) + pdf_object_name = produce_compressed_storage_pdf_object_name(path_no_ext) + return pdf_object_name + + +def path_to_compressed_storage_pdf_object_name(path): + path_no_ext, ext = os.path.splitext(path) + path_gz = produce_compressed_storage_pdf_object_name(path_no_ext) + return path_gz + + +def unzip(gz_path, pdf_dir): + def inner(): + + path, ext = os.path.splitext(gz_path) + basename = os.path.basename(path) + dossier_id = os.path.basename(os.path.dirname(gz_path)) + target_dir = os.path.join(pdf_dir, dossier_id) + provide_directory(target_dir) + target_path = os.path.join(target_dir, basename) + + assert ext == ".gz" + + logging.debug(f"unzipping {gz_path} into {target_path}") + + with gzip.open(gz_path, "rb") as f_in: + with open(target_path, "wb") as f_out: + shutil.copyfileobj(f_in, f_out) + + logging.debug(f"unzipped {gz_path} into {target_path}") + + return target_path + + try: + unzipped_file_path = inner() + finally: + shutil.rmtree(os.path.dirname(gz_path)) + + return unzipped_file_path + + +def download(storage_client, object_name, target_root_dir): + downloaded_file_path = storage_client.download_file(object_name, target_root_dir=target_root_dir) + logging.debug(f"Downloaded {object_name} into {downloaded_file_path}.") + return downloaded_file_path + + +def download_pdf_from_storage_via_request_payload(storage_client, payload: dict, pdf_dir: str): + + provide_directory(pdf_dir) + + with tempfile.TemporaryDirectory() as pdf_compressed_dir: + + dossier_id, file_id = itemgetter("dossierId", "fileId")(payload) + object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id) + downloaded_file_path = download(storage_client, object_name, pdf_compressed_dir) + unzipped_file_path = unzip(downloaded_file_path, pdf_dir) + return unzipped_file_path + + +def get_file_paths(directory): + return [os.path.join(directory, f) for f in os.listdir(directory)] diff --git a/mini_queue/utils/meta.py b/mini_queue/utils/meta.py new file mode 100644 index 0000000..722935c --- /dev/null +++ b/mini_queue/utils/meta.py @@ -0,0 +1,116 @@ +import functools +import logging +import time +from functools import partial, wraps +from math import exp +from typing import Tuple, Type, Callable + + +def invocation_counter(var: str): + def inner(func): + invocation_count = 0 + + @functools.wraps(func) + def inner(*args, **kwargs): + nonlocal invocation_count + invocation_count += 1 + + if var not in kwargs: + kwargs[var] = invocation_count + + return func(*args, **kwargs) + + return inner + + return inner + + +class NoAttemptsLeft(Exception): + pass + + +class MaxTimeoutReached(Exception): + pass + + +class _MethodDecoratorAdaptor(object): + def __init__(self, decorator, func): + self.decorator = decorator + self.func = func + + def __call__(self, *args, **kwargs): + return self.decorator(self.func)(*args, **kwargs) + + def __get__(self, obj, objtype): + return partial(self.__call__, obj) + + +def auto_adapt_to_methods(decorator): + """Allows you to use the same decorator on methods and functions, + hiding the self argument from the decorator.""" + + def adapt(func): + return _MethodDecoratorAdaptor(decorator, func) + + return adapt + + +def max_attempts( + n_attempts: int = 5, exceptions: Tuple[Type[Exception]] = None, timeout: float = 0.1, max_timeout: float = 10 +) -> Callable: + """Function decorator that attempts to run the wrapped function a certain number of times. Timeouts increase + exponentially according to `Tₖ ≔ t eᵏ`, where `t` is the timeout factor `timeout` and `k` is the attempt number. + If `∑ᵢ Tᵢ > mₜ` at the `i-th` attempt, where `mₜ` is the maximum timeout, then the function raises + MaxTimeoutReached. If `k > mₐ`, where `mₐ` is the maximum number of attempts allowed, then the function + raises NoAttemptsLeft. + + Args: + n_attempts: Number of times to attempt running the wrapped function. + exceptions: Exceptions to catch for a re-attempt. + timeout: Timeout factor in seconds. + max_timeout: Maximum allowed timeout. + + Raises: + MaxTimeoutReached + NoAttemptsLeft + + Returns: + Wrapped function. + """ + if not exceptions: + exceptions = (Exception,) + assert isinstance(exceptions, tuple) + + @auto_adapt_to_methods + def inner(func): + @wraps(func) + def inner(*args, **kwargs): + def run_attempt(attempt, timeout_aggr=0): + if attempt: + try: + return func(*args, **kwargs) + except exceptions as err: + + attempt_num = n_attempts - attempt + 1 + next_timeout = timeout * exp(attempt_num - 1) # start with timeout * e^0 = timeout + + logging.warn(f"{func.__name__} failed; attempt {attempt_num} of {n_attempts}") + + time_left = max(0, max_timeout - timeout_aggr) + if time_left: + sleep_for = min(next_timeout, time_left) + time.sleep(sleep_for) + return run_attempt(attempt - 1, timeout_aggr + sleep_for) + else: + logging.exception(err) + raise MaxTimeoutReached( + f"{func.__name__} reached maximum timeout ({max_timeout}) after {attempt_num} attempts." + ) + else: + raise NoAttemptsLeft(f"{func.__name__} failed {n_attempts} times; all attempts expended.") + + return run_attempt(n_attempts) + + return inner + + return inner diff --git a/mini_queue/utils/minio.py b/mini_queue/utils/minio.py new file mode 100644 index 0000000..1b8c5c0 --- /dev/null +++ b/mini_queue/utils/minio.py @@ -0,0 +1,105 @@ +"""Implements a wrapper around a MinIO client that provides operations on the MinIO store required by the service.""" +import os +from typing import Iterable + +from minio import Minio + +from mini_queue.utils.config import CONFIG +from mini_queue.utils.storage import StorageHandle + + +def get_minio_client(access_key=None, secret_key=None) -> Minio: + """Instantiates a minio.Minio client. + + Args: + access_key: Access key for MinIO client (username). + secret_key: Secret key for MinIO client (password). + + Returns: + A minio.Minio client. + """ + access_key = CONFIG.minio.user if access_key is None else access_key + secret_key = CONFIG.minio.password if secret_key is None else secret_key + + # TODO: secure=True/False? + return Minio(f"{CONFIG.minio.host}:{CONFIG.minio.port}", access_key=access_key, secret_key=secret_key, secure=False) + + +class MinioHandle(StorageHandle): + """Wrapper around a MinIO client that provides operations on the MinIO store required by the service.""" + + def __init__(self): + """Initializes a MinioHandle""" + super().__init__() + self.client: Minio = get_minio_client() + self.default_container_name = CONFIG.minio.bucket + self.backend = "s3" + + def _StorageHandle__provide_container(self, container_name): + if not self.client.bucket_exists(container_name): + self.client.make_bucket(container_name) + + def _StorageHandle__add_file(self, path, storage_path, container_name=None): + + if container_name is None: + container_name = self.default_container_name + + self._StorageHandle__provide_container(container_name) + + with open(path, "rb") as f: + stat = os.stat(path) + self.client.put_object(container_name, storage_path, f, stat.st_size) + + def list_files(self, container_name=None) -> Iterable[str]: + """List all files in a container. + + Args: + container_name: container to list files from. + + Returns: + Iterable of filenames. + """ + return self._list_files("object_name", container_name=container_name) + + def get_objects(self, container_name=None): + """Gets all objects in a container. + + Args: + container_name: container to get objects from. + + Returns: + Iterable over all objects in the container. + """ + if container_name is None: + container_name = self.default_container_name + yield from self.client.list_objects(container_name, recursive=True) + + def _StorageHandle__list_containers(self): + return self.client.list_buckets() + + def _StorageHandle__purge(self) -> None: + """Deletes all files and containers in the store.""" + for container, obj in self.get_all_objects(): + self.client.remove_object(container.name, obj.object_name) + + for container in self.client.list_buckets(): + self.client.remove_bucket(container.name) + + def _StorageHandle__fget_object(self, container_name, object_name, target_path): + self.client.fget_object(container_name, object_name, target_path) + + def _StorageHandle__remove_file(self, folder: str, filename: str, container_name: str = None) -> None: + """Removes a file from the store. + + Args: + folder: Folder containing file. + filename: Name of file (without folder) to remove. + container_name: container containing file. + """ + if container_name is None: + container_name = self.default_container_name + + path = os.path.join(folder, filename) + + if self.client.bucket_exists(container_name): + self.client.remove_object(container_name, path) diff --git a/mini_queue/utils/rabbitmq.py b/mini_queue/utils/rabbitmq.py new file mode 100644 index 0000000..e362895 --- /dev/null +++ b/mini_queue/utils/rabbitmq.py @@ -0,0 +1,66 @@ +import json +import logging +import tempfile +import time +from operator import itemgetter + +import pika + +from mini_queue.utils.config import CONFIG +from mini_queue.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name, download, unzip +from mini_queue.utils.minio import MinioHandle + + +def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel: + channel = connection.channel() + channel.basic_qos(prefetch_count=CONFIG.rabbitmq.prefetch_count) + return channel + + +def declare_queue(channel, queue: str): + args = { + # "x-message-ttl": CONFIG.rabbitmq.message_ttl, + "x-dead-letter-exchange": "", + "x-dead-letter-routing-key": CONFIG.rabbitmq.queues.dead_letter, + } + return channel.queue_declare(queue=queue, auto_delete=False, arguments=args, durable=True) + + +def make_callback(): + + storage_client = MinioHandle() + + def callback(channel, method, properties, body): + logging.info(f" [R] Received {body}") + response = json.dumps(process(body)) + channel.basic_publish(exchange="", routing_key=CONFIG.rabbitmq.queues.output, body=response) + channel.basic_ack(delivery_tag=method.delivery_tag) + + def process(payload): + payload = json.loads(payload) + with tempfile.TemporaryDirectory() as pdf_compressed_dir: + with tempfile.TemporaryDirectory() as pdf_dir: + + dossier_id, file_id = itemgetter("dossierId", "fileId")(payload) + time.sleep(0.4) + object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id) + time.sleep(0.6) + downloaded_file_path = download(storage_client, object_name, pdf_compressed_dir) + time.sleep(0.3) + unzipped_file_path = unzip(downloaded_file_path, pdf_dir) + time.sleep(1) + payload["imageMetadata"] = [] + return json.dumps(payload) + + return callback + + +def read_connection_params(): + credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) + parameters = pika.ConnectionParameters( + host=CONFIG.rabbitmq.host, + port=CONFIG.rabbitmq.port, + heartbeat=CONFIG.rabbitmq.heartbeat, + credentials=credentials, + ) + return parameters diff --git a/mini_queue/utils/storage.py b/mini_queue/utils/storage.py new file mode 100644 index 0000000..22e4053 --- /dev/null +++ b/mini_queue/utils/storage.py @@ -0,0 +1,179 @@ +import abc +import gzip +import logging +import os +from itertools import repeat +from operator import attrgetter +from typing import Iterable + +from mini_queue.utils.file import path_to_compressed_storage_pdf_object_name, provide_directory +from mini_queue.utils.meta import NoAttemptsLeft, max_attempts + + +class StorageHandle: + """Storage API base""" + + def __init__(self): + self.default_container_name = None + + @abc.abstractmethod + def __provide_container(self, container_name): + pass + + @abc.abstractmethod + def __add_file(self, path, filename, container_name=None): + pass + + @max_attempts(n_attempts=10, max_timeout=60) + def add_file(self, path: str, folder: str = None, container_name: str = None) -> None: + """Adds a file to the store. + + Args: + path: Path to file to add to store. + folder: Folder to hold file. + container_name: container to hold file. + """ + storage_path = self.__storage_path(path, folder=folder) + self.__add_file(path, storage_path, container_name) + + @max_attempts() + def _list_files(self, object_name_attr="object_name", container_name=None) -> Iterable[str]: + """List all files in a container. + + Args: + container_name: container to list files from. + + Returns: + Iterable of filenames. + """ + if container_name is None: + container_name = self.default_container_name + return map(attrgetter(object_name_attr), self.get_objects(container_name)) + + @abc.abstractmethod + def list_files(self, container_name=None) -> Iterable[str]: + pass + + @abc.abstractmethod + def get_objects(self, container_name=None): + pass + + @abc.abstractmethod + def __list_containers(self): + pass + + @max_attempts() + def get_all_objects(self) -> Iterable: + """Gets all objects in the store + + Returns: + Iterable over all objects in the store. + """ + for container in self.__list_containers(): + yield from zip(repeat(container), self.get_objects(container.name)) + + @abc.abstractmethod + def __purge(self) -> None: + pass + + @max_attempts() + def purge(self) -> None: + self.__purge() + + def list_files_by_type(self, container_name=None, extension=".pdf.gz"): + return filter(lambda p: p.endswith(extension), self.list_files(container_name)) + + @abc.abstractmethod + def __fget_object(self, *args, **kwargs): + pass + + @staticmethod + def __storage_path(path, folder: str = None): + def path_to_filename(path): + return os.path.basename(path) + + storage_path = path_to_filename(path) + if folder is not None: + storage_path = os.path.join(folder, storage_path) + + return storage_path + + @max_attempts() + def list_folders_and_files(self, container_name: str = None) -> Iterable[str]: + """Lists pairs of folder name (dossier-IDs) and file name (file-IDs) of items in a container. + + Args: + container_name: container to list items for. + + Returns: + Iterable of pairs folder name (dossier-ID) and file names (file-ID) + """ + return map(lambda p: p.split("/"), self.list_files_by_type(container_name)) + + @abc.abstractmethod + def __remove_file(self, folder: str, filename: str, container_name: str = None) -> None: + pass + + @max_attempts() + def remove_file(self, folder: str, filename: str, container_name: str = None) -> None: + self.__remove_file(folder, filename, container_name) + + def add_file_compressed(self, path, folder: str = None, container_name: str = None) -> None: + """Adds a file as a .gz archive to the store. + + Args: + path: Path to file to add to store. + folder: Folder to hold file. + container_name: container to hold file. + """ + + def compress(path_in: str, path_out: str): + with open(path_in, "rb") as f_in, gzip.open(path_out, "wb") as f_out: + f_out.writelines(f_in) + + path_gz = path_to_compressed_storage_pdf_object_name(path) + compress(path, path_gz) + + self.add_file(path_gz, folder, container_name) + os.unlink(path_gz) + + @max_attempts() + def download_file(self, object_names: str, target_root_dir: str, container_name: str = None) -> str: + """Downloads a file from the store. + + Args: + object_names: Complete object name (folder and file). + target_root_dir: Root directory to download file into (including its folder). + container_name: container to load file from. + + Returns: + str: Path to downloaded file. + """ + + @max_attempts(5, exceptions=(FileNotFoundError,)) + def download(object_name: str) -> str: + + path, basename = os.path.split(object_name) + target_dir = os.path.join(target_root_dir, path) + provide_directory(target_dir) + target_path = os.path.join(target_dir, basename) + + logging.log(msg=f"Downloading {object_name}...", level=logging.DEBUG) + try: + self.__fget_object(container_name, object_name, target_path) + logging.log(msg=f"Downloaded {object_name}.", level=logging.DEBUG) + except Exception as err: + logging.log(msg=f"Downloading {object_name} failed.", level=logging.ERROR) + raise err + return target_path + + if container_name is None: + container_name = self.default_container_name + + try: + target_path = download(object_names) + except NoAttemptsLeft as err: + logging.log(msg=f"{err}", level=logging.ERROR) + raise err + + return target_path diff --git a/requirements.txt b/requirements.txt index bbd140c..780fe93 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ pika retry envyaml +minio diff --git a/scripts/manage_minio.py b/scripts/manage_minio.py new file mode 100644 index 0000000..5577f06 --- /dev/null +++ b/scripts/manage_minio.py @@ -0,0 +1,51 @@ +import argparse +import os + +from tqdm import tqdm + +from mini_queue.utils.minio import MinioHandle + + +def parse_args(): + parser = argparse.ArgumentParser() + + subparsers = parser.add_subparsers(help="sub-command help", dest="command") + + parser_add = subparsers.add_parser("add", help="Add file(s) to the MinIO store") + parser_add.add_argument("dossier_id") + add_group = parser_add.add_mutually_exclusive_group(required=True) + add_group.add_argument("--file", "-f") + add_group.add_argument("--directory", "-d") + + parser_remove = subparsers.add_parser("remove", help="Remove a file from the MinIO store") + parser_remove.add_argument("dossier_id") + parser_remove.add_argument("file_path") + + subparsers.add_parser("purge", help="Delete all files and buckets in the MinIO store") + + args = parser.parse_args() + return args + + +if __name__ == "__main__": + + client = MinioHandle() + + args = parse_args() + + if args.command == "add": + + if args.file: + client.add_file_compressed(args.file, folder=args.dossier_id) + + elif args.directory: + for fname in tqdm([*os.listdir(args.directory)], desc="Adding files"): + path = os.path.join(args.directory, fname) + client.add_file_compressed(path, folder=args.dossier_id) + + elif args.command == "remove": + fname = os.path.basename(args.file_path) + client.remove_file(folder=args.dossier_id, filename=fname) + + elif args.command == "purge": + client.purge() diff --git a/scripts/mock_knock.py b/scripts/mock_knock.py new file mode 100644 index 0000000..f30da16 --- /dev/null +++ b/scripts/mock_knock.py @@ -0,0 +1,38 @@ +import json + +import pika + +from mini_queue.utils.config import CONFIG +from mini_queue.utils.minio import MinioHandle +from mini_queue.utils.rabbitmq import make_channel, declare_queue + + +def build_message_bodies(): + minio_client = MinioHandle() + for dossier_id, pdf_name in minio_client.list_folders_and_files(): + file_id = pdf_name.split(".")[0] + yield json.dumps({"dossierId": dossier_id, "fileId": file_id}) + + +if __name__ == "__main__": + credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) + parameters = pika.ConnectionParameters( + host=CONFIG.rabbitmq.host, + port=CONFIG.rabbitmq.port, + heartbeat=CONFIG.rabbitmq.heartbeat, + credentials=credentials, + ) + + connection = pika.BlockingConnection(parameters) + channel = make_channel(connection) + + declare_queue(channel, CONFIG.rabbitmq.queues.input) + declare_queue(channel, CONFIG.rabbitmq.queues.output) + + for body in build_message_bodies(): + channel.basic_publish("", CONFIG.rabbitmq.queues.input, body) + print(f" [x] Put {body} on {CONFIG.rabbitmq.queues.input}") + + for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output): + print(json.loads(body)) + channel.basic_ack(method_frame.delivery_tag) diff --git a/scripts/mock_publish.py b/scripts/mock_publish.py deleted file mode 100644 index 336998f..0000000 --- a/scripts/mock_publish.py +++ /dev/null @@ -1,30 +0,0 @@ -import json - -import pika - -from mini_queue.utils.config import CONFIG - - -if __name__ == "__main__": - - credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) - parameters = pika.ConnectionParameters( - host=CONFIG.rabbitmq.host, - port=CONFIG.rabbitmq.port, - heartbeat=CONFIG.rabbitmq.heartbeat, - credentials=credentials, - ) - body = json.dumps({"fileId": "234", "dossierId": "3403"}) - connection = pika.BlockingConnection(parameters) - channel = connection.channel() - - channel.queue_declare(queue=CONFIG.rabbitmq.queues.input, durable=True) - channel.queue_declare(queue=CONFIG.rabbitmq.queues.output, durable=True) - - channel.basic_publish("", CONFIG.rabbitmq.queues.input, body) - - print(f" [x] Put {body} on {CONFIG.rabbitmq.queues.input}") - - for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output): - print(json.loads(body)) - break