From 503b5068872dc11ad70cdea09a93a2964e9fec75 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Wed, 16 Feb 2022 14:32:19 +0100 Subject: [PATCH] Pull request #2: Storage debugging Merge in RR/mini_queue from storage_debugging to master Squashed commit of the following: commit 34b1353ebbf73c53891689be0fb533e857818b40 Author: Matthias Bisping Date: Wed Feb 16 13:47:18 2022 +0100 flask debug mode commit a3726653ac7603c1b4f35b53745d010066a4eb2a Author: Matthias Bisping Date: Wed Feb 16 13:36:15 2022 +0100 running flask in seperate process commit ec8dcc858cd03a30f009749dcb3ec40786dde0a3 Author: Matthias Bisping Date: Wed Feb 16 13:25:42 2022 +0100 added integrity checks commit e4180478190c80d8ce2d83aadb0990563f56f497 Author: Matthias Bisping Date: Wed Feb 16 12:55:41 2022 +0100 refactoring; added debugging void callback --- config.yaml | 8 ++++++++ mini_queue/run.py | 36 ++++++++++++++++++++++++++++++++++-- mini_queue/utils/rabbitmq.py | 25 ++++++++++++++++++++++++- requirements.txt | 10 ++++++---- 4 files changed, 72 insertions(+), 7 deletions(-) diff --git a/config.yaml b/config.yaml index 451d999..8dfbd51 100755 --- a/config.yaml +++ b/config.yaml @@ -25,3 +25,11 @@ service: logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for log file messages logfile_path: $LOGFILE_PATH|null # Overwrites the default path for the service logfile (image_service/log.log) name: $SERVICE_NAME|mini-queue-service-v1 # Name of the service in the kubernetes cluster + +webserver: + host: $SANIC_HOST|"0.0.0.0" # Sanic webserver host address + process_host: $SANIC_PROCESS_HOST|"127.0.0.1" # Sanic webserver host address for individual service processes + port: $SANIC_PORT|8080 # Sanic webserver host port + check_quantifier: $CHECK_QUANTIFIER|any # Whether all or any service instance needs to pass all checks for a passed master check + cache: false # Whether to cache readiness and health check results + logging_level_sanic: $LOGGING_LEVEL_SANIC|WARNING diff --git a/mini_queue/run.py b/mini_queue/run.py index e598daa..319ae64 100644 --- a/mini_queue/run.py +++ b/mini_queue/run.py @@ -1,23 +1,53 @@ import logging +from multiprocessing import Process import pika +from waitress import serve +from flask import Flask, request, jsonify from mini_queue.utils.config import CONFIG from mini_queue.utils.rabbitmq import make_channel, declare_queue, make_callback, read_connection_params +def start_integrity_checks_webserver(mode="debug"): + + app = Flask(__name__) + + @app.route("/ready", methods=["GET"]) + def ready(): + resp = jsonify("OK") + resp.status_code = 200 + return resp + + @app.route("/health", methods=["GET"]) + def healthy(): + resp = jsonify("OK") + resp.status_code = 200 + return resp + + if mode == "debug": + app.run(host=CONFIG.webserver.host, port=CONFIG.webserver.port, debug=True) + elif mode == "production": + serve(app, host=CONFIG.webserver.host, port=CONFIG.webserver.port) + + def main(): - logging.info(" [S] Startet happy pikachu!") + logging.info("Starting mini-queue...") parameters = read_connection_params() connection = pika.BlockingConnection(parameters) channel = make_channel(connection) declare_queue(channel, CONFIG.rabbitmq.queues.input) + logging.info("Starting webserver...") + + p = Process(target=start_integrity_checks_webserver, args=("debug", )) + p.start() + while True: try: channel.basic_consume( - queue=CONFIG.rabbitmq.queues.input, auto_ack=False, on_message_callback=make_callback() + queue=CONFIG.rabbitmq.queues.input, auto_ack=False, on_message_callback=make_callback("dummy") ) logging.info(" [*] Waiting for messages. To exit press CTRL+C") channel.start_consuming() @@ -32,6 +62,8 @@ def main(): logging.info("Connection was closed, retrying...") continue + p.join() + if __name__ == "__main__": logging_level = CONFIG.service.logging_level diff --git a/mini_queue/utils/rabbitmq.py b/mini_queue/utils/rabbitmq.py index e362895..a69b47f 100644 --- a/mini_queue/utils/rabbitmq.py +++ b/mini_queue/utils/rabbitmq.py @@ -26,7 +26,30 @@ def declare_queue(channel, queue: str): return channel.queue_declare(queue=queue, auto_delete=False, arguments=args, durable=True) -def make_callback(): +def make_dummy_callback(): + + def callback(channel, method, properties, body): + logging.info(f" [R] Received {body}") + response = json.dumps(process(body)) + channel.basic_publish(exchange="", routing_key=CONFIG.rabbitmq.queues.output, body=response) + channel.basic_ack(delivery_tag=method.delivery_tag) + + def process(payload): + payload = json.loads(payload) + payload["imageMetadata"] = [] + return payload + + return callback + + +def make_callback(name): + if name == "storage_debug": + return make_storage_debug_callback() + elif name == "dummy": + return make_dummy_callback() + + +def make_storage_debug_callback(): storage_client = MinioHandle() diff --git a/requirements.txt b/requirements.txt index 780fe93..2b2cfa6 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,6 @@ -pika -retry -envyaml -minio +pika==1.2.0 +retry==0.9.2 +envyaml==1.8.210417 +minio==7.1.1 +Flask==2.0.2 +waitress==2.0.0