This commit is contained in:
Julius Unverfehrt 2022-02-14 12:05:40 +01:00
parent 0b26ab92a8
commit 4507bbb4e6
8 changed files with 149 additions and 29 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
.venv
__pycache__
data/

57
config.yaml Normal file
View File

@ -0,0 +1,57 @@
rabbitmq:
host: $RABBITMQ_HOST|localhost # RabbitMQ host address
port: $RABBITMQ_PORT|5672 # RabbitMQ host port
user: $RABBITMQ_USERNAME|user # RabbitMQ username
password: $RABBITMQ_PASSWORD|bitnami # RabbitMQ password
heartbeat: $RABBITMQ_HEARTBEAT|7200 # Controls AMQP heartbeat timeout in seconds
queues: # Names of queues for...
input: input_queue # requests to service
output: response_queue # responses by service
dead_letter: dead_letter_queue # messages that failed to process
prefetch_count: 1
retry: # Controls retry behaviour for messages the processing of which failed
enabled: $RETRY|True # Toggles retry behaviour
max_attempts: $MAX_ATTEMPTS|3 # Number of times a message may fail before being published to dead letter queue
# TODO: implement
max_interval: $MAX_INTERVAL|15000 # Increase timeout for a message every time it fails to a maximum of this value
minio:
host: $STORAGE_ENDPOINT|localhost # MinIO host address
port: $STORAGE_PORT|9000 # MinIO host port
user: $STORAGE_KEY|root # MinIO user name
password: $STORAGE_SECRET|password # MinIO user password
bucket: $STORAGE_BUCKET_NAME|redaction # MinIO bucket
azure_blob_storage:
connection_string: $STORAGE_AZURECONNECTIONSTRING|"DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net"
container: $STORAGE_AZURECONTAINERNAME|"image-service-v2-test-data"
sanic:
host: $SANIC_HOST|"0.0.0.0" # Sanic webserver host address
process_host: $SANIC_PROCESS_HOST|"127.0.0.1" # Sanic webserver host address for individual service processes
port: $SANIC_PORT|8080 # Sanic webserver host port
check_quantifier: $CHECK_QUANTIFIER|any # Whether all or any service instance needs to pass all checks for a passed master check
cache: false # Whether to cache readiness and health check results
logging_level_sanic: $LOGGING_LEVEL_SANIC|WARNING
service:
logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for log file messages
logfile_path: $LOGFILE_PATH|null # Overwrites the default path for the service logfile (image_service/log.log)
verbose: $VERBOSE|True # Service workers print document processing progress to stdout
assert_gpu: $ASSERT_GPU|False # Whether to make a working GPU a mandatory readiness condition
run_id: $RUN_ID|fabfb1f192c745369b88cab34471aba7 # The ID of the mlflow run to load the model from
n_instances: $CONCURRENCY|1 # Number of service top loops that run in parallel (processes, not threads!)
name: $SERVICE_NAME|image-service-v2 # Name of the service in the kubernetes cluster
storage_backend: $STORAGE_BACKEND|s3 # The storage to pull files to be processed from
model_cache_file: "/root/.keras/models/efficientnetb0_notop.h5" # Workaround to intercept auto-download if model is not cached
batch_size: $BATCH_SIZE|32 # Number of images in memory simultaneously per service instance
minimum_free_memory_percentage: $MINIMUM_FREE_MEMORY_PERCENTAGE|.3 # Minimum allowed percentage of free memory
available_memory: $AVAILABLE_MEMORY|6000 # Available memory in MB
monitor_memory_usage: $MONITOR_MEMORY_USAGE|True # Whether to monitor the memory usage and kill the process when memory is insufficient
pdftron_license_key: "Knecon AG(en.knecon.swiss):OEM:DDA-R::WL+:AMS(20211029):BECC974307DAB4F34B513BC9B2531B24496F6FCB83CD8AC574358A959730B622FABEF5C7"

27
mini_queue/consumer.py Normal file
View File

@ -0,0 +1,27 @@
import pika
from mini_queue.utils.config import CONFIG
def get_message(parameters):
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
inp_queue = CONFIG.rabbitmq.queues.input
out_queue = CONFIG.rabbitmq.queues.output
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=inp_queue, durable=True)
channel.basic_consume(queue=inp_queue,
auto_ack=True,
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == "__main__":
credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password)
parameters = pika.ConnectionParameters(host=CONFIG.rabbitmq.host, port=CONFIG.rabbitmq.port, heartbeat=CONFIG.rabbitmq.heartbeat, credentials=credentials)
get_message(parameters)

View File

@ -1,24 +0,0 @@
import pika
def get_message(parameters):
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue="test", durable=True)
channel.basic_consume(queue='test',
auto_ack=True,
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == "__main__":
credentials = pika.PlainCredentials('user', 'bitnami')
parameters = pika.ConnectionParameters(host="172.17.0.2", port=5672, heartbeat=0, credentials=credentials)
get_message(parameters)

View File

@ -1,19 +1,21 @@
import pika
from mini_queue.utils.config import CONFIG
def put_message(parameters):
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue="test", durable=True)
inp_queue = CONFIG.rabbitmq.queues.input
channel.queue_declare(queue=inp_queue, durable=True)
channel.basic_publish('',
'test',
inp_queue,
'Hello Wald!')
print(" [x] Sent 'Hello World!'")
print(" [x] Sent something'")
connection.close()
if __name__ == "__main__":
credentials = pika.PlainCredentials('user', 'bitnami')
parameters = pika.ConnectionParameters(host="172.17.0.2", port=5672, heartbeat=1000, credentials=credentials)
credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password)
parameters = pika.ConnectionParameters(host=CONFIG.rabbitmq.host, port=CONFIG.rabbitmq.port, heartbeat=CONFIG.rabbitmq.heartbeat, credentials=credentials)
put_message(parameters)

View File

View File

@ -0,0 +1,40 @@
"""Implements a config object with dot-indexing syntax."""
from envyaml import EnvYAML
from mini_queue.utils.locations import CONFIG_FILE
def _get_item_and_maybe_make_dotindexable(container, item):
ret = container[item]
return DotIndexable(ret) if isinstance(ret, dict) else ret
class DotIndexable:
def __init__(self, x):
self.x = x
def __getattr__(self, item):
return _get_item_and_maybe_make_dotindexable(self.x, item)
def __setitem__(self, key, value):
self.x[key] = value
def __repr__(self):
return self.x.__repr__()
class Config:
def __init__(self, config_path):
self.__config = EnvYAML(config_path)
def __getattr__(self, item):
if item in self.__config:
return _get_item_and_maybe_make_dotindexable(self.__config, item)
def __getitem__(self, item):
return self.__getattr__(item)
CONFIG = Config(CONFIG_FILE)

View File

@ -0,0 +1,15 @@
"""Defines constant paths relative to the module root path."""
from os import path
MODULE_DIR = path.dirname(path.abspath(__file__))
PACKAGE_ROOT_DIR = path.dirname(MODULE_DIR)
REPO_ROOT_DIR = path.dirname(PACKAGE_ROOT_DIR)
DOCKER_COMPOSE_FILE = path.join(REPO_ROOT_DIR, "docker-compose.yaml")
CONFIG_FILE = path.join(REPO_ROOT_DIR, "config.yaml")
LOG_FILE = "/tmp/log.log"
DATA_DIR = path.join(PACKAGE_ROOT_DIR, "data")