Pull request #2: Storage debugging
Merge in RR/mini_queue from storage_debugging to master
Squashed commit of the following:
commit 34b1353ebbf73c53891689be0fb533e857818b40
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Feb 16 13:47:18 2022 +0100
flask debug mode
commit a3726653ac7603c1b4f35b53745d010066a4eb2a
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Feb 16 13:36:15 2022 +0100
running flask in seperate process
commit ec8dcc858cd03a30f009749dcb3ec40786dde0a3
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Feb 16 13:25:42 2022 +0100
added integrity checks
commit e4180478190c80d8ce2d83aadb0990563f56f497
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Feb 16 12:55:41 2022 +0100
refactoring; added debugging void callback
This commit is contained in:
parent
4edf04ab24
commit
503b506887
@ -25,3 +25,11 @@ service:
|
||||
logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for log file messages
|
||||
logfile_path: $LOGFILE_PATH|null # Overwrites the default path for the service logfile (image_service/log.log)
|
||||
name: $SERVICE_NAME|mini-queue-service-v1 # Name of the service in the kubernetes cluster
|
||||
|
||||
webserver:
|
||||
host: $SANIC_HOST|"0.0.0.0" # Sanic webserver host address
|
||||
process_host: $SANIC_PROCESS_HOST|"127.0.0.1" # Sanic webserver host address for individual service processes
|
||||
port: $SANIC_PORT|8080 # Sanic webserver host port
|
||||
check_quantifier: $CHECK_QUANTIFIER|any # Whether all or any service instance needs to pass all checks for a passed master check
|
||||
cache: false # Whether to cache readiness and health check results
|
||||
logging_level_sanic: $LOGGING_LEVEL_SANIC|WARNING
|
||||
|
||||
@ -1,23 +1,53 @@
|
||||
import logging
|
||||
from multiprocessing import Process
|
||||
import pika
|
||||
from waitress import serve
|
||||
from flask import Flask, request, jsonify
|
||||
|
||||
from mini_queue.utils.config import CONFIG
|
||||
from mini_queue.utils.rabbitmq import make_channel, declare_queue, make_callback, read_connection_params
|
||||
|
||||
|
||||
def start_integrity_checks_webserver(mode="debug"):
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
@app.route("/ready", methods=["GET"])
|
||||
def ready():
|
||||
resp = jsonify("OK")
|
||||
resp.status_code = 200
|
||||
return resp
|
||||
|
||||
@app.route("/health", methods=["GET"])
|
||||
def healthy():
|
||||
resp = jsonify("OK")
|
||||
resp.status_code = 200
|
||||
return resp
|
||||
|
||||
if mode == "debug":
|
||||
app.run(host=CONFIG.webserver.host, port=CONFIG.webserver.port, debug=True)
|
||||
elif mode == "production":
|
||||
serve(app, host=CONFIG.webserver.host, port=CONFIG.webserver.port)
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
logging.info(" [S] Startet happy pikachu!")
|
||||
logging.info("Starting mini-queue...")
|
||||
|
||||
parameters = read_connection_params()
|
||||
connection = pika.BlockingConnection(parameters)
|
||||
channel = make_channel(connection)
|
||||
declare_queue(channel, CONFIG.rabbitmq.queues.input)
|
||||
|
||||
logging.info("Starting webserver...")
|
||||
|
||||
p = Process(target=start_integrity_checks_webserver, args=("debug", ))
|
||||
p.start()
|
||||
|
||||
while True:
|
||||
try:
|
||||
channel.basic_consume(
|
||||
queue=CONFIG.rabbitmq.queues.input, auto_ack=False, on_message_callback=make_callback()
|
||||
queue=CONFIG.rabbitmq.queues.input, auto_ack=False, on_message_callback=make_callback("dummy")
|
||||
)
|
||||
logging.info(" [*] Waiting for messages. To exit press CTRL+C")
|
||||
channel.start_consuming()
|
||||
@ -32,6 +62,8 @@ def main():
|
||||
logging.info("Connection was closed, retrying...")
|
||||
continue
|
||||
|
||||
p.join()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging_level = CONFIG.service.logging_level
|
||||
|
||||
@ -26,7 +26,30 @@ def declare_queue(channel, queue: str):
|
||||
return channel.queue_declare(queue=queue, auto_delete=False, arguments=args, durable=True)
|
||||
|
||||
|
||||
def make_callback():
|
||||
def make_dummy_callback():
|
||||
|
||||
def callback(channel, method, properties, body):
|
||||
logging.info(f" [R] Received {body}")
|
||||
response = json.dumps(process(body))
|
||||
channel.basic_publish(exchange="", routing_key=CONFIG.rabbitmq.queues.output, body=response)
|
||||
channel.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
def process(payload):
|
||||
payload = json.loads(payload)
|
||||
payload["imageMetadata"] = []
|
||||
return payload
|
||||
|
||||
return callback
|
||||
|
||||
|
||||
def make_callback(name):
|
||||
if name == "storage_debug":
|
||||
return make_storage_debug_callback()
|
||||
elif name == "dummy":
|
||||
return make_dummy_callback()
|
||||
|
||||
|
||||
def make_storage_debug_callback():
|
||||
|
||||
storage_client = MinioHandle()
|
||||
|
||||
|
||||
@ -1,4 +1,6 @@
|
||||
pika
|
||||
retry
|
||||
envyaml
|
||||
minio
|
||||
pika==1.2.0
|
||||
retry==0.9.2
|
||||
envyaml==1.8.210417
|
||||
minio==7.1.1
|
||||
Flask==2.0.2
|
||||
waitress==2.0.0
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user