diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2c82e04 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.venv +__pycache__ +data/ diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..f2af8f2 --- /dev/null +++ b/config.yaml @@ -0,0 +1,57 @@ +rabbitmq: + + host: $RABBITMQ_HOST|localhost # RabbitMQ host address + port: $RABBITMQ_PORT|5672 # RabbitMQ host port + user: $RABBITMQ_USERNAME|user # RabbitMQ username + password: $RABBITMQ_PASSWORD|bitnami # RabbitMQ password + heartbeat: $RABBITMQ_HEARTBEAT|7200 # Controls AMQP heartbeat timeout in seconds + + queues: # Names of queues for... + input: input_queue # requests to service + output: response_queue # responses by service + dead_letter: dead_letter_queue # messages that failed to process + + prefetch_count: 1 + + retry: # Controls retry behaviour for messages the processing of which failed + enabled: $RETRY|True # Toggles retry behaviour + max_attempts: $MAX_ATTEMPTS|3 # Number of times a message may fail before being published to dead letter queue +# TODO: implement + max_interval: $MAX_INTERVAL|15000 # Increase timeout for a message every time it fails to a maximum of this value + +minio: + host: $STORAGE_ENDPOINT|localhost # MinIO host address + port: $STORAGE_PORT|9000 # MinIO host port + user: $STORAGE_KEY|root # MinIO user name + password: $STORAGE_SECRET|password # MinIO user password + bucket: $STORAGE_BUCKET_NAME|redaction # MinIO bucket + +azure_blob_storage: + connection_string: $STORAGE_AZURECONNECTIONSTRING|"DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net" + container: $STORAGE_AZURECONTAINERNAME|"image-service-v2-test-data" + +sanic: + host: $SANIC_HOST|"0.0.0.0" # Sanic webserver host address + process_host: $SANIC_PROCESS_HOST|"127.0.0.1" # Sanic webserver host address for individual service processes + port: $SANIC_PORT|8080 # Sanic webserver host port + check_quantifier: $CHECK_QUANTIFIER|any # Whether all or any service instance needs to pass all checks for a passed master check + cache: false # Whether to cache readiness and health check results + logging_level_sanic: $LOGGING_LEVEL_SANIC|WARNING + +service: + + logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for log file messages + logfile_path: $LOGFILE_PATH|null # Overwrites the default path for the service logfile (image_service/log.log) + verbose: $VERBOSE|True # Service workers print document processing progress to stdout + assert_gpu: $ASSERT_GPU|False # Whether to make a working GPU a mandatory readiness condition + run_id: $RUN_ID|fabfb1f192c745369b88cab34471aba7 # The ID of the mlflow run to load the model from + n_instances: $CONCURRENCY|1 # Number of service top loops that run in parallel (processes, not threads!) + name: $SERVICE_NAME|image-service-v2 # Name of the service in the kubernetes cluster + storage_backend: $STORAGE_BACKEND|s3 # The storage to pull files to be processed from + model_cache_file: "/root/.keras/models/efficientnetb0_notop.h5" # Workaround to intercept auto-download if model is not cached + batch_size: $BATCH_SIZE|32 # Number of images in memory simultaneously per service instance + minimum_free_memory_percentage: $MINIMUM_FREE_MEMORY_PERCENTAGE|.3 # Minimum allowed percentage of free memory + available_memory: $AVAILABLE_MEMORY|6000 # Available memory in MB + monitor_memory_usage: $MONITOR_MEMORY_USAGE|True # Whether to monitor the memory usage and kill the process when memory is insufficient + +pdftron_license_key: "Knecon AG(en.knecon.swiss):OEM:DDA-R::WL+:AMS(20211029):BECC974307DAB4F34B513BC9B2531B24496F6FCB83CD8AC574358A959730B622FABEF5C7" diff --git a/mini_queue/consumer.py b/mini_queue/consumer.py new file mode 100644 index 0000000..c81aff8 --- /dev/null +++ b/mini_queue/consumer.py @@ -0,0 +1,27 @@ +import pika + +from mini_queue.utils.config import CONFIG + + +def get_message(parameters): + def callback(ch, method, properties, body): + print(" [x] Received %r" % body) + inp_queue = CONFIG.rabbitmq.queues.input + out_queue = CONFIG.rabbitmq.queues.output + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + channel.queue_declare(queue=inp_queue, durable=True) + channel.basic_consume(queue=inp_queue, + auto_ack=True, + on_message_callback=callback) + print(' [*] Waiting for messages. To exit press CTRL+C') + channel.start_consuming() + + +if __name__ == "__main__": + + credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) + parameters = pika.ConnectionParameters(host=CONFIG.rabbitmq.host, port=CONFIG.rabbitmq.port, heartbeat=CONFIG.rabbitmq.heartbeat, credentials=credentials) + get_message(parameters) + + diff --git a/mini_queue/get_from_queue.py b/mini_queue/get_from_queue.py deleted file mode 100644 index 56b0537..0000000 --- a/mini_queue/get_from_queue.py +++ /dev/null @@ -1,24 +0,0 @@ -import pika - - -def get_message(parameters): - def callback(ch, method, properties, body): - print(" [x] Received %r" % body) - - connection = pika.BlockingConnection(parameters) - channel = connection.channel() - channel.queue_declare(queue="test", durable=True) - channel.basic_consume(queue='test', - auto_ack=True, - on_message_callback=callback) - print(' [*] Waiting for messages. To exit press CTRL+C') - channel.start_consuming() - - -if __name__ == "__main__": - - credentials = pika.PlainCredentials('user', 'bitnami') - parameters = pika.ConnectionParameters(host="172.17.0.2", port=5672, heartbeat=0, credentials=credentials) - get_message(parameters) - - diff --git a/mini_queue/put_on_queue.py b/mini_queue/put_on_queue.py index e2f6276..0db966a 100644 --- a/mini_queue/put_on_queue.py +++ b/mini_queue/put_on_queue.py @@ -1,19 +1,21 @@ import pika +from mini_queue.utils.config import CONFIG def put_message(parameters): connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.queue_declare(queue="test", durable=True) + inp_queue = CONFIG.rabbitmq.queues.input + channel.queue_declare(queue=inp_queue, durable=True) channel.basic_publish('', - 'test', + inp_queue, 'Hello Wald!') - print(" [x] Sent 'Hello World!'") + print(" [x] Sent something'") connection.close() if __name__ == "__main__": - credentials = pika.PlainCredentials('user', 'bitnami') - parameters = pika.ConnectionParameters(host="172.17.0.2", port=5672, heartbeat=1000, credentials=credentials) + credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) + parameters = pika.ConnectionParameters(host=CONFIG.rabbitmq.host, port=CONFIG.rabbitmq.port, heartbeat=CONFIG.rabbitmq.heartbeat, credentials=credentials) put_message(parameters) \ No newline at end of file diff --git a/mini_queue/utils/__init__.py b/mini_queue/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mini_queue/utils/config.py b/mini_queue/utils/config.py new file mode 100644 index 0000000..c04a5e1 --- /dev/null +++ b/mini_queue/utils/config.py @@ -0,0 +1,40 @@ +"""Implements a config object with dot-indexing syntax.""" + + +from envyaml import EnvYAML + +from mini_queue.utils.locations import CONFIG_FILE + + +def _get_item_and_maybe_make_dotindexable(container, item): + ret = container[item] + return DotIndexable(ret) if isinstance(ret, dict) else ret + + +class DotIndexable: + def __init__(self, x): + self.x = x + + def __getattr__(self, item): + return _get_item_and_maybe_make_dotindexable(self.x, item) + + def __setitem__(self, key, value): + self.x[key] = value + + def __repr__(self): + return self.x.__repr__() + + +class Config: + def __init__(self, config_path): + self.__config = EnvYAML(config_path) + + def __getattr__(self, item): + if item in self.__config: + return _get_item_and_maybe_make_dotindexable(self.__config, item) + + def __getitem__(self, item): + return self.__getattr__(item) + + +CONFIG = Config(CONFIG_FILE) diff --git a/mini_queue/utils/locations.py b/mini_queue/utils/locations.py new file mode 100644 index 0000000..0e34b40 --- /dev/null +++ b/mini_queue/utils/locations.py @@ -0,0 +1,15 @@ +"""Defines constant paths relative to the module root path.""" + + +from os import path + +MODULE_DIR = path.dirname(path.abspath(__file__)) +PACKAGE_ROOT_DIR = path.dirname(MODULE_DIR) +REPO_ROOT_DIR = path.dirname(PACKAGE_ROOT_DIR) + +DOCKER_COMPOSE_FILE = path.join(REPO_ROOT_DIR, "docker-compose.yaml") + +CONFIG_FILE = path.join(REPO_ROOT_DIR, "config.yaml") +LOG_FILE = "/tmp/log.log" + +DATA_DIR = path.join(PACKAGE_ROOT_DIR, "data")