Pull request #1: Add storage handle
Merge in RR/mini_queue from add-storage-handle to master
Squashed commit of the following:
commit 03e542d2a65802c28735873fae184209f0c83553
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date: Wed Feb 16 11:55:34 2022 +0100
Quickfix typo
commit b4d538e9445187435d87c5cf8ce1f4e448021129
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date: Wed Feb 16 11:41:42 2022 +0100
added prefetch count and make channel function
commit d46d1375e387d36641c06b062a8ccc54f114ef4c
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date: Wed Feb 16 11:20:39 2022 +0100
black on M.s request
commit bc47b20312a978f19b08531804bf42b00f0a88f0
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date: Wed Feb 16 11:19:57 2022 +0100
changed response
commit 9a475ecd8df9ca007e5f7fe146483b6403eccc3b
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date: Wed Feb 16 10:15:08 2022 +0100
.
commit 108bc3ea90d867575db8c1b1503c9df859222485
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date: Wed Feb 16 09:56:56 2022 +0100
quickrestore
commit ae04d17d8d041f612d86117e8e96c96ddffcbde3
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date: Wed Feb 16 09:37:30 2022 +0100
refactor
commit 68051a72eb93868eba8adba234258b9e5373ecaa
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date: Wed Feb 16 08:50:59 2022 +0100
added answer file template for rancher
commit 09ef45ead51c07732a20133acad0b8b2ae7d0a61
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date: Wed Feb 16 08:26:05 2022 +0100
Quickfix inconsistency
commit d925b0f3f91f29403c88fb6149566ec966af2973
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date: Wed Feb 16 08:20:40 2022 +0100
Quick refactor
commit 48795455cde8d97ed98e58c3004a87a26f331352
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date: Tue Feb 15 17:46:45 2022 +0100
bluckckck
commit 80e58efab0269dc513990f83b14ceb36b3e4dd8e
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date: Tue Feb 15 17:45:49 2022 +0100
Quick restatus setting
commit 83f276ee13348a678b7da84e25ca844dd348b4c9
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date: Tue Feb 15 17:30:16 2022 +0100
Quickreset to working status
commit d44cdcf922250639a6832cc3e16d0d967d9853fb
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date: Tue Feb 15 14:44:26 2022 +0100
added storage handle for minio WIP
This commit is contained in:
parent
9c85222c78
commit
4edf04ab24
1
.dockerignore
Normal file
1
.dockerignore
Normal file
@ -0,0 +1 @@
|
||||
data
|
||||
11
config.yaml
11
config.yaml
@ -9,7 +9,16 @@ rabbitmq:
|
||||
queues: # Hack image-service queues
|
||||
input: image_request_queue # requests to service
|
||||
output: image_response_queue # responses by service
|
||||
dead_letter: image_letter_queue # messages that failed to process
|
||||
dead_letter: image_dead_letter_queue # messages that failed to process
|
||||
|
||||
prefetch_count: 2
|
||||
|
||||
minio:
|
||||
host: $STORAGE_ENDPOINT|localhost # MinIO host address
|
||||
port: $STORAGE_PORT|9000 # MinIO host port
|
||||
user: $STORAGE_KEY|root # MinIO user name
|
||||
password: $STORAGE_SECRET|password # MinIO user password
|
||||
bucket: $STORAGE_BUCKET_NAME|redaction # MinIO bucket
|
||||
|
||||
service:
|
||||
|
||||
|
||||
@ -1,5 +1,16 @@
|
||||
version: '2'
|
||||
services:
|
||||
minio:
|
||||
image: minio/minio
|
||||
ports:
|
||||
- "9000:9000"
|
||||
environment:
|
||||
- MINIO_ROOT_PASSWORD=password
|
||||
- MINIO_ROOT_USER=root
|
||||
volumes:
|
||||
- ./data/minio_store:/data
|
||||
command: server /data
|
||||
network_mode: "bridge"
|
||||
rabbitmq:
|
||||
image: docker.io/bitnami/rabbitmq:3.9
|
||||
ports:
|
||||
|
||||
@ -2,25 +2,25 @@ apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
annotations:
|
||||
meta.helm.sh/release-name: red-research
|
||||
meta.helm.sh/release-namespace: red-research
|
||||
meta.helm.sh/release-name: red-research2
|
||||
meta.helm.sh/release-namespace: red-research2
|
||||
labels:
|
||||
apiVersion: v2
|
||||
app: image-service
|
||||
app.kubernetes.io/instance: red-research
|
||||
app.kubernetes.io/instance: red-research2
|
||||
app.kubernetes.io/managed-by: Helm
|
||||
app.kubernetes.io/name: redaction
|
||||
helm.sh/chart: redaction
|
||||
io.cattle.field/appId: red-research
|
||||
io.cattle.field/appId: red-research2
|
||||
type: service
|
||||
name: mini-queue
|
||||
namespace: red-research
|
||||
namespace: red-research2
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
apiVersion: v2
|
||||
app: image-service
|
||||
io.cattle.field/appId: red-research
|
||||
io.cattle.field/appId: red-research2
|
||||
template:
|
||||
metadata:
|
||||
annotations:
|
||||
@ -30,7 +30,7 @@ spec:
|
||||
labels:
|
||||
apiVersion: v2
|
||||
app: image-service
|
||||
io.cattle.field/appId: red-research
|
||||
io.cattle.field/appId: red-research2
|
||||
spec:
|
||||
affinity:
|
||||
podAntiAffinity:
|
||||
@ -49,55 +49,57 @@ spec:
|
||||
- env:
|
||||
- name: BATCH_SIZE
|
||||
value: "32"
|
||||
- name: CONCURRENCY
|
||||
value: "1"
|
||||
- name: LOGGING_LEVEL_ROOT
|
||||
value: DEBUG
|
||||
- name: MAX_IMAGE_FORMAT
|
||||
value: "10"
|
||||
- name: MAX_REL_IMAGE_SIZE
|
||||
value: "0.75"
|
||||
- name: AVAILABLE_MEMORY
|
||||
value: "6000"
|
||||
- name: MINIMUM_FREE_MEMORY_PERCENTAGE
|
||||
value: "0.3"
|
||||
- name: MIN_IMAGE_FORMAT
|
||||
value: "0.1"
|
||||
- name: MIN_REL_IMAGE_SIZE
|
||||
value: "0.05"
|
||||
- name: MONITORING_ENABLED
|
||||
value: "true"
|
||||
- name: MONITOR_MEMORY_USAGE
|
||||
value: "true"
|
||||
- name: RABBITMQ_HEARTBEAT
|
||||
value: "7200"
|
||||
- name: RABBITMQ_HOST
|
||||
value: red-research-rabbitmq
|
||||
- name: RABBITMQ_USERNAME
|
||||
value: user
|
||||
- name: RUN_ID
|
||||
value: fabfb1f192c745369b88cab34471aba7
|
||||
- name: STORAGE_BUCKET_NAME
|
||||
value: redaction
|
||||
- name: STORAGE_ENDPOINT
|
||||
value: red-research-minio-headless
|
||||
- name: MONITOR_MEMORY_USAGE
|
||||
value: "true"
|
||||
- name: VERBOSE
|
||||
value: "true"
|
||||
- name: RUN_ID
|
||||
value: fabfb1f192c745369b88cab34471aba7
|
||||
- name: MIN_REL_IMAGE_SIZE
|
||||
value: "0.05"
|
||||
- name: MAX_REL_IMAGE_SIZE
|
||||
value: "0.75"
|
||||
- name: MIN_IMAGE_FORMAT
|
||||
value: "0.1"
|
||||
- name: MAX_IMAGE_FORMAT
|
||||
value: "10"
|
||||
- name: LOGGING_LEVEL_ROOT
|
||||
value: DEBUG
|
||||
- name: CONCURRENCY
|
||||
value: "1"
|
||||
- name: MONITORING_ENABLED
|
||||
value: "true"
|
||||
- name: RABBITMQ_HOST
|
||||
value: red-research2-rabbitmq
|
||||
- name: RABBITMQ_USERNAME
|
||||
value: user
|
||||
- name: RABBITMQ_PASSWORD
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
key: rabbitmq-password
|
||||
name: red-research-rabbitmq
|
||||
name: red-research2-rabbitmq
|
||||
- name: STORAGE_ENDPOINT
|
||||
value: red-research2-minio-headless
|
||||
- name: STORAGE_BUCKET_NAME
|
||||
value: redaction
|
||||
optional: false
|
||||
- name: STORAGE_KEY
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
key: root-user
|
||||
name: red-research-minio
|
||||
name: red-research2-minio
|
||||
optional: false
|
||||
- name: STORAGE_SECRET
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
key: root-password
|
||||
name: red-research-minio
|
||||
name: red-research2-minio
|
||||
optional: false
|
||||
envFrom:
|
||||
- configMapRef:
|
||||
@ -136,7 +138,7 @@ spec:
|
||||
- command:
|
||||
- sh
|
||||
- -c
|
||||
- until nc -z -w 10 red-research-rabbitmq 5672; do echo waiting for rabbitmq;
|
||||
- until nc -z -w 10 red-research2-rabbitmq 5672; do echo waiting for rabbitmq;
|
||||
done; echo rabbitmq found
|
||||
image: nexus.iqser.com:5001/infra/busybox:1.33.1
|
||||
imagePullPolicy: Always
|
||||
|
||||
@ -1,49 +1,32 @@
|
||||
import logging
|
||||
import time
|
||||
import pika
|
||||
|
||||
from mini_queue.utils.config import CONFIG
|
||||
|
||||
|
||||
def callback(channel, method, properties, body):
|
||||
logging.info(" [R] Received %r" % body)
|
||||
time.sleep(1)
|
||||
response = body
|
||||
channel.basic_publish(exchange="", routing_key=CONFIG.rabbitmq.queues.output, body=response)
|
||||
channel.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
|
||||
def init_params():
|
||||
credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password)
|
||||
parameters = pika.ConnectionParameters(
|
||||
host=CONFIG.rabbitmq.host,
|
||||
port=CONFIG.rabbitmq.port,
|
||||
heartbeat=CONFIG.rabbitmq.heartbeat,
|
||||
credentials=credentials,
|
||||
)
|
||||
return parameters
|
||||
from mini_queue.utils.rabbitmq import make_channel, declare_queue, make_callback, read_connection_params
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
logging.info(" [S] Startet happy pikachu!")
|
||||
|
||||
parameters = init_params()
|
||||
parameters = read_connection_params()
|
||||
connection = pika.BlockingConnection(parameters)
|
||||
channel = connection.channel()
|
||||
# channel.queue_declare(queue=CONFIG.rabbitmq.queues.output, durable=True)
|
||||
channel = make_channel(connection)
|
||||
declare_queue(channel, CONFIG.rabbitmq.queues.input)
|
||||
|
||||
while True:
|
||||
try:
|
||||
channel.basic_consume(queue=CONFIG.rabbitmq.queues.input, auto_ack=False, on_message_callback=callback)
|
||||
channel.basic_consume(
|
||||
queue=CONFIG.rabbitmq.queues.input, auto_ack=False, on_message_callback=make_callback()
|
||||
)
|
||||
logging.info(" [*] Waiting for messages. To exit press CTRL+C")
|
||||
channel.start_consuming()
|
||||
|
||||
except pika.exceptions.ConnectionClosedByBroker as err:
|
||||
logging.info("Caught a channel error: {}, stopping...".format(err))
|
||||
logging.info(f"Caught a channel error: {err}, retrying...")
|
||||
continue
|
||||
except pika.exceptions.AMQPChannelError as err:
|
||||
logging.warning("Caught a channel error: {}, stopping...".format(err))
|
||||
logging.critical(f"Caught a channel error: {err}, stopping...")
|
||||
break
|
||||
except pika.exceptions.AMQPConnectionError:
|
||||
logging.info("Connection was closed, retrying...")
|
||||
|
||||
88
mini_queue/utils/file.py
Normal file
88
mini_queue/utils/file.py
Normal file
@ -0,0 +1,88 @@
|
||||
"""Defines utilities for different operations on files."""
|
||||
|
||||
|
||||
import gzip
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from operator import itemgetter
|
||||
|
||||
|
||||
def provide_directory(path):
|
||||
if os.path.isfile(path):
|
||||
provide_directory(os.path.dirname(path))
|
||||
if not os.path.isdir(path):
|
||||
try:
|
||||
os.makedirs(path)
|
||||
except FileExistsError:
|
||||
pass
|
||||
|
||||
|
||||
def produce_compressed_storage_pdf_object_name(path_no_ext, ext="pdf"):
|
||||
return f"{path_no_ext}.ORIGIN.{ext}.gz"
|
||||
|
||||
|
||||
def dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id):
|
||||
path_no_ext = os.path.join(dossier_id, file_id)
|
||||
pdf_object_name = produce_compressed_storage_pdf_object_name(path_no_ext)
|
||||
return pdf_object_name
|
||||
|
||||
|
||||
def path_to_compressed_storage_pdf_object_name(path):
|
||||
path_no_ext, ext = os.path.splitext(path)
|
||||
path_gz = produce_compressed_storage_pdf_object_name(path_no_ext)
|
||||
return path_gz
|
||||
|
||||
|
||||
def unzip(gz_path, pdf_dir):
|
||||
def inner():
|
||||
|
||||
path, ext = os.path.splitext(gz_path)
|
||||
basename = os.path.basename(path)
|
||||
dossier_id = os.path.basename(os.path.dirname(gz_path))
|
||||
target_dir = os.path.join(pdf_dir, dossier_id)
|
||||
provide_directory(target_dir)
|
||||
target_path = os.path.join(target_dir, basename)
|
||||
|
||||
assert ext == ".gz"
|
||||
|
||||
logging.debug(f"unzipping {gz_path} into {target_path}")
|
||||
|
||||
with gzip.open(gz_path, "rb") as f_in:
|
||||
with open(target_path, "wb") as f_out:
|
||||
shutil.copyfileobj(f_in, f_out)
|
||||
|
||||
logging.debug(f"unzipped {gz_path} into {target_path}")
|
||||
|
||||
return target_path
|
||||
|
||||
try:
|
||||
unzipped_file_path = inner()
|
||||
finally:
|
||||
shutil.rmtree(os.path.dirname(gz_path))
|
||||
|
||||
return unzipped_file_path
|
||||
|
||||
|
||||
def download(storage_client, object_name, target_root_dir):
|
||||
downloaded_file_path = storage_client.download_file(object_name, target_root_dir=target_root_dir)
|
||||
logging.debug(f"Downloaded {object_name} into {downloaded_file_path}.")
|
||||
return downloaded_file_path
|
||||
|
||||
|
||||
def download_pdf_from_storage_via_request_payload(storage_client, payload: dict, pdf_dir: str):
|
||||
|
||||
provide_directory(pdf_dir)
|
||||
|
||||
with tempfile.TemporaryDirectory() as pdf_compressed_dir:
|
||||
|
||||
dossier_id, file_id = itemgetter("dossierId", "fileId")(payload)
|
||||
object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id)
|
||||
downloaded_file_path = download(storage_client, object_name, pdf_compressed_dir)
|
||||
unzipped_file_path = unzip(downloaded_file_path, pdf_dir)
|
||||
return unzipped_file_path
|
||||
|
||||
|
||||
def get_file_paths(directory):
|
||||
return [os.path.join(directory, f) for f in os.listdir(directory)]
|
||||
116
mini_queue/utils/meta.py
Normal file
116
mini_queue/utils/meta.py
Normal file
@ -0,0 +1,116 @@
|
||||
import functools
|
||||
import logging
|
||||
import time
|
||||
from functools import partial, wraps
|
||||
from math import exp
|
||||
from typing import Tuple, Type, Callable
|
||||
|
||||
|
||||
def invocation_counter(var: str):
|
||||
def inner(func):
|
||||
invocation_count = 0
|
||||
|
||||
@functools.wraps(func)
|
||||
def inner(*args, **kwargs):
|
||||
nonlocal invocation_count
|
||||
invocation_count += 1
|
||||
|
||||
if var not in kwargs:
|
||||
kwargs[var] = invocation_count
|
||||
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return inner
|
||||
|
||||
return inner
|
||||
|
||||
|
||||
class NoAttemptsLeft(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class MaxTimeoutReached(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class _MethodDecoratorAdaptor(object):
|
||||
def __init__(self, decorator, func):
|
||||
self.decorator = decorator
|
||||
self.func = func
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self.decorator(self.func)(*args, **kwargs)
|
||||
|
||||
def __get__(self, obj, objtype):
|
||||
return partial(self.__call__, obj)
|
||||
|
||||
|
||||
def auto_adapt_to_methods(decorator):
|
||||
"""Allows you to use the same decorator on methods and functions,
|
||||
hiding the self argument from the decorator."""
|
||||
|
||||
def adapt(func):
|
||||
return _MethodDecoratorAdaptor(decorator, func)
|
||||
|
||||
return adapt
|
||||
|
||||
|
||||
def max_attempts(
|
||||
n_attempts: int = 5, exceptions: Tuple[Type[Exception]] = None, timeout: float = 0.1, max_timeout: float = 10
|
||||
) -> Callable:
|
||||
"""Function decorator that attempts to run the wrapped function a certain number of times. Timeouts increase
|
||||
exponentially according to `Tₖ ≔ t eᵏ`, where `t` is the timeout factor `timeout` and `k` is the attempt number.
|
||||
If `∑ᵢ Tᵢ > mₜ` at the `i-th` attempt, where `mₜ` is the maximum timeout, then the function raises
|
||||
MaxTimeoutReached. If `k > mₐ`, where `mₐ` is the maximum number of attempts allowed, then the function
|
||||
raises NoAttemptsLeft.
|
||||
|
||||
Args:
|
||||
n_attempts: Number of times to attempt running the wrapped function.
|
||||
exceptions: Exceptions to catch for a re-attempt.
|
||||
timeout: Timeout factor in seconds.
|
||||
max_timeout: Maximum allowed timeout.
|
||||
|
||||
Raises:
|
||||
MaxTimeoutReached
|
||||
NoAttemptsLeft
|
||||
|
||||
Returns:
|
||||
Wrapped function.
|
||||
"""
|
||||
if not exceptions:
|
||||
exceptions = (Exception,)
|
||||
assert isinstance(exceptions, tuple)
|
||||
|
||||
@auto_adapt_to_methods
|
||||
def inner(func):
|
||||
@wraps(func)
|
||||
def inner(*args, **kwargs):
|
||||
def run_attempt(attempt, timeout_aggr=0):
|
||||
if attempt:
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except exceptions as err:
|
||||
|
||||
attempt_num = n_attempts - attempt + 1
|
||||
next_timeout = timeout * exp(attempt_num - 1) # start with timeout * e^0 = timeout
|
||||
|
||||
logging.warn(f"{func.__name__} failed; attempt {attempt_num} of {n_attempts}")
|
||||
|
||||
time_left = max(0, max_timeout - timeout_aggr)
|
||||
if time_left:
|
||||
sleep_for = min(next_timeout, time_left)
|
||||
time.sleep(sleep_for)
|
||||
return run_attempt(attempt - 1, timeout_aggr + sleep_for)
|
||||
else:
|
||||
logging.exception(err)
|
||||
raise MaxTimeoutReached(
|
||||
f"{func.__name__} reached maximum timeout ({max_timeout}) after {attempt_num} attempts."
|
||||
)
|
||||
else:
|
||||
raise NoAttemptsLeft(f"{func.__name__} failed {n_attempts} times; all attempts expended.")
|
||||
|
||||
return run_attempt(n_attempts)
|
||||
|
||||
return inner
|
||||
|
||||
return inner
|
||||
105
mini_queue/utils/minio.py
Normal file
105
mini_queue/utils/minio.py
Normal file
@ -0,0 +1,105 @@
|
||||
"""Implements a wrapper around a MinIO client that provides operations on the MinIO store required by the service."""
|
||||
import os
|
||||
from typing import Iterable
|
||||
|
||||
from minio import Minio
|
||||
|
||||
from mini_queue.utils.config import CONFIG
|
||||
from mini_queue.utils.storage import StorageHandle
|
||||
|
||||
|
||||
def get_minio_client(access_key=None, secret_key=None) -> Minio:
|
||||
"""Instantiates a minio.Minio client.
|
||||
|
||||
Args:
|
||||
access_key: Access key for MinIO client (username).
|
||||
secret_key: Secret key for MinIO client (password).
|
||||
|
||||
Returns:
|
||||
A minio.Minio client.
|
||||
"""
|
||||
access_key = CONFIG.minio.user if access_key is None else access_key
|
||||
secret_key = CONFIG.minio.password if secret_key is None else secret_key
|
||||
|
||||
# TODO: secure=True/False?
|
||||
return Minio(f"{CONFIG.minio.host}:{CONFIG.minio.port}", access_key=access_key, secret_key=secret_key, secure=False)
|
||||
|
||||
|
||||
class MinioHandle(StorageHandle):
|
||||
"""Wrapper around a MinIO client that provides operations on the MinIO store required by the service."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initializes a MinioHandle"""
|
||||
super().__init__()
|
||||
self.client: Minio = get_minio_client()
|
||||
self.default_container_name = CONFIG.minio.bucket
|
||||
self.backend = "s3"
|
||||
|
||||
def _StorageHandle__provide_container(self, container_name):
|
||||
if not self.client.bucket_exists(container_name):
|
||||
self.client.make_bucket(container_name)
|
||||
|
||||
def _StorageHandle__add_file(self, path, storage_path, container_name=None):
|
||||
|
||||
if container_name is None:
|
||||
container_name = self.default_container_name
|
||||
|
||||
self._StorageHandle__provide_container(container_name)
|
||||
|
||||
with open(path, "rb") as f:
|
||||
stat = os.stat(path)
|
||||
self.client.put_object(container_name, storage_path, f, stat.st_size)
|
||||
|
||||
def list_files(self, container_name=None) -> Iterable[str]:
|
||||
"""List all files in a container.
|
||||
|
||||
Args:
|
||||
container_name: container to list files from.
|
||||
|
||||
Returns:
|
||||
Iterable of filenames.
|
||||
"""
|
||||
return self._list_files("object_name", container_name=container_name)
|
||||
|
||||
def get_objects(self, container_name=None):
|
||||
"""Gets all objects in a container.
|
||||
|
||||
Args:
|
||||
container_name: container to get objects from.
|
||||
|
||||
Returns:
|
||||
Iterable over all objects in the container.
|
||||
"""
|
||||
if container_name is None:
|
||||
container_name = self.default_container_name
|
||||
yield from self.client.list_objects(container_name, recursive=True)
|
||||
|
||||
def _StorageHandle__list_containers(self):
|
||||
return self.client.list_buckets()
|
||||
|
||||
def _StorageHandle__purge(self) -> None:
|
||||
"""Deletes all files and containers in the store."""
|
||||
for container, obj in self.get_all_objects():
|
||||
self.client.remove_object(container.name, obj.object_name)
|
||||
|
||||
for container in self.client.list_buckets():
|
||||
self.client.remove_bucket(container.name)
|
||||
|
||||
def _StorageHandle__fget_object(self, container_name, object_name, target_path):
|
||||
self.client.fget_object(container_name, object_name, target_path)
|
||||
|
||||
def _StorageHandle__remove_file(self, folder: str, filename: str, container_name: str = None) -> None:
|
||||
"""Removes a file from the store.
|
||||
|
||||
Args:
|
||||
folder: Folder containing file.
|
||||
filename: Name of file (without folder) to remove.
|
||||
container_name: container containing file.
|
||||
"""
|
||||
if container_name is None:
|
||||
container_name = self.default_container_name
|
||||
|
||||
path = os.path.join(folder, filename)
|
||||
|
||||
if self.client.bucket_exists(container_name):
|
||||
self.client.remove_object(container_name, path)
|
||||
66
mini_queue/utils/rabbitmq.py
Normal file
66
mini_queue/utils/rabbitmq.py
Normal file
@ -0,0 +1,66 @@
|
||||
import json
|
||||
import logging
|
||||
import tempfile
|
||||
import time
|
||||
from operator import itemgetter
|
||||
|
||||
import pika
|
||||
|
||||
from mini_queue.utils.config import CONFIG
|
||||
from mini_queue.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name, download, unzip
|
||||
from mini_queue.utils.minio import MinioHandle
|
||||
|
||||
|
||||
def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel:
|
||||
channel = connection.channel()
|
||||
channel.basic_qos(prefetch_count=CONFIG.rabbitmq.prefetch_count)
|
||||
return channel
|
||||
|
||||
|
||||
def declare_queue(channel, queue: str):
|
||||
args = {
|
||||
# "x-message-ttl": CONFIG.rabbitmq.message_ttl,
|
||||
"x-dead-letter-exchange": "",
|
||||
"x-dead-letter-routing-key": CONFIG.rabbitmq.queues.dead_letter,
|
||||
}
|
||||
return channel.queue_declare(queue=queue, auto_delete=False, arguments=args, durable=True)
|
||||
|
||||
|
||||
def make_callback():
|
||||
|
||||
storage_client = MinioHandle()
|
||||
|
||||
def callback(channel, method, properties, body):
|
||||
logging.info(f" [R] Received {body}")
|
||||
response = json.dumps(process(body))
|
||||
channel.basic_publish(exchange="", routing_key=CONFIG.rabbitmq.queues.output, body=response)
|
||||
channel.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
def process(payload):
|
||||
payload = json.loads(payload)
|
||||
with tempfile.TemporaryDirectory() as pdf_compressed_dir:
|
||||
with tempfile.TemporaryDirectory() as pdf_dir:
|
||||
|
||||
dossier_id, file_id = itemgetter("dossierId", "fileId")(payload)
|
||||
time.sleep(0.4)
|
||||
object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id)
|
||||
time.sleep(0.6)
|
||||
downloaded_file_path = download(storage_client, object_name, pdf_compressed_dir)
|
||||
time.sleep(0.3)
|
||||
unzipped_file_path = unzip(downloaded_file_path, pdf_dir)
|
||||
time.sleep(1)
|
||||
payload["imageMetadata"] = []
|
||||
return json.dumps(payload)
|
||||
|
||||
return callback
|
||||
|
||||
|
||||
def read_connection_params():
|
||||
credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password)
|
||||
parameters = pika.ConnectionParameters(
|
||||
host=CONFIG.rabbitmq.host,
|
||||
port=CONFIG.rabbitmq.port,
|
||||
heartbeat=CONFIG.rabbitmq.heartbeat,
|
||||
credentials=credentials,
|
||||
)
|
||||
return parameters
|
||||
179
mini_queue/utils/storage.py
Normal file
179
mini_queue/utils/storage.py
Normal file
@ -0,0 +1,179 @@
|
||||
import abc
|
||||
import gzip
|
||||
import logging
|
||||
import os
|
||||
from itertools import repeat
|
||||
from operator import attrgetter
|
||||
from typing import Iterable
|
||||
|
||||
from mini_queue.utils.file import path_to_compressed_storage_pdf_object_name, provide_directory
|
||||
from mini_queue.utils.meta import NoAttemptsLeft, max_attempts
|
||||
|
||||
|
||||
class StorageHandle:
|
||||
"""Storage API base"""
|
||||
|
||||
def __init__(self):
|
||||
self.default_container_name = None
|
||||
|
||||
@abc.abstractmethod
|
||||
def __provide_container(self, container_name):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def __add_file(self, path, filename, container_name=None):
|
||||
pass
|
||||
|
||||
@max_attempts(n_attempts=10, max_timeout=60)
|
||||
def add_file(self, path: str, folder: str = None, container_name: str = None) -> None:
|
||||
"""Adds a file to the store.
|
||||
|
||||
Args:
|
||||
path: Path to file to add to store.
|
||||
folder: Folder to hold file.
|
||||
container_name: container to hold file.
|
||||
"""
|
||||
storage_path = self.__storage_path(path, folder=folder)
|
||||
self.__add_file(path, storage_path, container_name)
|
||||
|
||||
@max_attempts()
|
||||
def _list_files(self, object_name_attr="object_name", container_name=None) -> Iterable[str]:
|
||||
"""List all files in a container.
|
||||
|
||||
Args:
|
||||
container_name: container to list files from.
|
||||
|
||||
Returns:
|
||||
Iterable of filenames.
|
||||
"""
|
||||
if container_name is None:
|
||||
container_name = self.default_container_name
|
||||
return map(attrgetter(object_name_attr), self.get_objects(container_name))
|
||||
|
||||
@abc.abstractmethod
|
||||
def list_files(self, container_name=None) -> Iterable[str]:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_objects(self, container_name=None):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def __list_containers(self):
|
||||
pass
|
||||
|
||||
@max_attempts()
|
||||
def get_all_objects(self) -> Iterable:
|
||||
"""Gets all objects in the store
|
||||
|
||||
Returns:
|
||||
Iterable over all objects in the store.
|
||||
"""
|
||||
for container in self.__list_containers():
|
||||
yield from zip(repeat(container), self.get_objects(container.name))
|
||||
|
||||
@abc.abstractmethod
|
||||
def __purge(self) -> None:
|
||||
pass
|
||||
|
||||
@max_attempts()
|
||||
def purge(self) -> None:
|
||||
self.__purge()
|
||||
|
||||
def list_files_by_type(self, container_name=None, extension=".pdf.gz"):
|
||||
return filter(lambda p: p.endswith(extension), self.list_files(container_name))
|
||||
|
||||
@abc.abstractmethod
|
||||
def __fget_object(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def __storage_path(path, folder: str = None):
|
||||
def path_to_filename(path):
|
||||
return os.path.basename(path)
|
||||
|
||||
storage_path = path_to_filename(path)
|
||||
if folder is not None:
|
||||
storage_path = os.path.join(folder, storage_path)
|
||||
|
||||
return storage_path
|
||||
|
||||
@max_attempts()
|
||||
def list_folders_and_files(self, container_name: str = None) -> Iterable[str]:
|
||||
"""Lists pairs of folder name (dossier-IDs) and file name (file-IDs) of items in a container.
|
||||
|
||||
Args:
|
||||
container_name: container to list items for.
|
||||
|
||||
Returns:
|
||||
Iterable of pairs folder name (dossier-ID) and file names (file-ID)
|
||||
"""
|
||||
return map(lambda p: p.split("/"), self.list_files_by_type(container_name))
|
||||
|
||||
@abc.abstractmethod
|
||||
def __remove_file(self, folder: str, filename: str, container_name: str = None) -> None:
|
||||
pass
|
||||
|
||||
@max_attempts()
|
||||
def remove_file(self, folder: str, filename: str, container_name: str = None) -> None:
|
||||
self.__remove_file(folder, filename, container_name)
|
||||
|
||||
def add_file_compressed(self, path, folder: str = None, container_name: str = None) -> None:
|
||||
"""Adds a file as a .gz archive to the store.
|
||||
|
||||
Args:
|
||||
path: Path to file to add to store.
|
||||
folder: Folder to hold file.
|
||||
container_name: container to hold file.
|
||||
"""
|
||||
|
||||
def compress(path_in: str, path_out: str):
|
||||
with open(path_in, "rb") as f_in, gzip.open(path_out, "wb") as f_out:
|
||||
f_out.writelines(f_in)
|
||||
|
||||
path_gz = path_to_compressed_storage_pdf_object_name(path)
|
||||
compress(path, path_gz)
|
||||
|
||||
self.add_file(path_gz, folder, container_name)
|
||||
os.unlink(path_gz)
|
||||
|
||||
@max_attempts()
|
||||
def download_file(self, object_names: str, target_root_dir: str, container_name: str = None) -> str:
|
||||
"""Downloads a file from the store.
|
||||
|
||||
Args:
|
||||
object_names: Complete object name (folder and file).
|
||||
target_root_dir: Root directory to download file into (including its folder).
|
||||
container_name: container to load file from.
|
||||
|
||||
Returns:
|
||||
str: Path to downloaded file.
|
||||
"""
|
||||
|
||||
@max_attempts(5, exceptions=(FileNotFoundError,))
|
||||
def download(object_name: str) -> str:
|
||||
|
||||
path, basename = os.path.split(object_name)
|
||||
target_dir = os.path.join(target_root_dir, path)
|
||||
provide_directory(target_dir)
|
||||
target_path = os.path.join(target_dir, basename)
|
||||
|
||||
logging.log(msg=f"Downloading {object_name}...", level=logging.DEBUG)
|
||||
try:
|
||||
self.__fget_object(container_name, object_name, target_path)
|
||||
logging.log(msg=f"Downloaded {object_name}.", level=logging.DEBUG)
|
||||
except Exception as err:
|
||||
logging.log(msg=f"Downloading {object_name} failed.", level=logging.ERROR)
|
||||
raise err
|
||||
return target_path
|
||||
|
||||
if container_name is None:
|
||||
container_name = self.default_container_name
|
||||
|
||||
try:
|
||||
target_path = download(object_names)
|
||||
except NoAttemptsLeft as err:
|
||||
logging.log(msg=f"{err}", level=logging.ERROR)
|
||||
raise err
|
||||
|
||||
return target_path
|
||||
@ -1,3 +1,4 @@
|
||||
pika
|
||||
retry
|
||||
envyaml
|
||||
minio
|
||||
|
||||
51
scripts/manage_minio.py
Normal file
51
scripts/manage_minio.py
Normal file
@ -0,0 +1,51 @@
|
||||
import argparse
|
||||
import os
|
||||
|
||||
from tqdm import tqdm
|
||||
|
||||
from mini_queue.utils.minio import MinioHandle
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser()
|
||||
|
||||
subparsers = parser.add_subparsers(help="sub-command help", dest="command")
|
||||
|
||||
parser_add = subparsers.add_parser("add", help="Add file(s) to the MinIO store")
|
||||
parser_add.add_argument("dossier_id")
|
||||
add_group = parser_add.add_mutually_exclusive_group(required=True)
|
||||
add_group.add_argument("--file", "-f")
|
||||
add_group.add_argument("--directory", "-d")
|
||||
|
||||
parser_remove = subparsers.add_parser("remove", help="Remove a file from the MinIO store")
|
||||
parser_remove.add_argument("dossier_id")
|
||||
parser_remove.add_argument("file_path")
|
||||
|
||||
subparsers.add_parser("purge", help="Delete all files and buckets in the MinIO store")
|
||||
|
||||
args = parser.parse_args()
|
||||
return args
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
client = MinioHandle()
|
||||
|
||||
args = parse_args()
|
||||
|
||||
if args.command == "add":
|
||||
|
||||
if args.file:
|
||||
client.add_file_compressed(args.file, folder=args.dossier_id)
|
||||
|
||||
elif args.directory:
|
||||
for fname in tqdm([*os.listdir(args.directory)], desc="Adding files"):
|
||||
path = os.path.join(args.directory, fname)
|
||||
client.add_file_compressed(path, folder=args.dossier_id)
|
||||
|
||||
elif args.command == "remove":
|
||||
fname = os.path.basename(args.file_path)
|
||||
client.remove_file(folder=args.dossier_id, filename=fname)
|
||||
|
||||
elif args.command == "purge":
|
||||
client.purge()
|
||||
38
scripts/mock_knock.py
Normal file
38
scripts/mock_knock.py
Normal file
@ -0,0 +1,38 @@
|
||||
import json
|
||||
|
||||
import pika
|
||||
|
||||
from mini_queue.utils.config import CONFIG
|
||||
from mini_queue.utils.minio import MinioHandle
|
||||
from mini_queue.utils.rabbitmq import make_channel, declare_queue
|
||||
|
||||
|
||||
def build_message_bodies():
|
||||
minio_client = MinioHandle()
|
||||
for dossier_id, pdf_name in minio_client.list_folders_and_files():
|
||||
file_id = pdf_name.split(".")[0]
|
||||
yield json.dumps({"dossierId": dossier_id, "fileId": file_id})
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password)
|
||||
parameters = pika.ConnectionParameters(
|
||||
host=CONFIG.rabbitmq.host,
|
||||
port=CONFIG.rabbitmq.port,
|
||||
heartbeat=CONFIG.rabbitmq.heartbeat,
|
||||
credentials=credentials,
|
||||
)
|
||||
|
||||
connection = pika.BlockingConnection(parameters)
|
||||
channel = make_channel(connection)
|
||||
|
||||
declare_queue(channel, CONFIG.rabbitmq.queues.input)
|
||||
declare_queue(channel, CONFIG.rabbitmq.queues.output)
|
||||
|
||||
for body in build_message_bodies():
|
||||
channel.basic_publish("", CONFIG.rabbitmq.queues.input, body)
|
||||
print(f" [x] Put {body} on {CONFIG.rabbitmq.queues.input}")
|
||||
|
||||
for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output):
|
||||
print(json.loads(body))
|
||||
channel.basic_ack(method_frame.delivery_tag)
|
||||
@ -1,30 +0,0 @@
|
||||
import json
|
||||
|
||||
import pika
|
||||
|
||||
from mini_queue.utils.config import CONFIG
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password)
|
||||
parameters = pika.ConnectionParameters(
|
||||
host=CONFIG.rabbitmq.host,
|
||||
port=CONFIG.rabbitmq.port,
|
||||
heartbeat=CONFIG.rabbitmq.heartbeat,
|
||||
credentials=credentials,
|
||||
)
|
||||
body = json.dumps({"fileId": "234", "dossierId": "3403"})
|
||||
connection = pika.BlockingConnection(parameters)
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue=CONFIG.rabbitmq.queues.input, durable=True)
|
||||
channel.queue_declare(queue=CONFIG.rabbitmq.queues.output, durable=True)
|
||||
|
||||
channel.basic_publish("", CONFIG.rabbitmq.queues.input, body)
|
||||
|
||||
print(f" [x] Put {body} on {CONFIG.rabbitmq.queues.input}")
|
||||
|
||||
for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output):
|
||||
print(json.loads(body))
|
||||
break
|
||||
Loading…
x
Reference in New Issue
Block a user