Pull request #21: refactoring of prometheus tunneling
Merge in RR/pyinfra from add-prometheus-metrics to master
Squashed commit of the following:
commit 3736e867bfb105f2c2601f6d25343c996027cc5f
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Mar 16 11:35:25 2022 +0100
removed obsolete config entry
commit dc191b17d863ec4f8009fb130c2c3a78d4116969
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Mar 16 11:34:12 2022 +0100
removed obsolete dependency
commit 5ba9765e88da7dd15700b211794f433a6f7ea0df
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Mar 16 11:32:37 2022 +0100
changed error handling for prometheus endpoint
commit 894a6b5d4c7026b9a703a8b2cd70641e7ed7323b
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Mar 16 11:16:39 2022 +0100
fixed definition order broken by auto-refac; reduced prometheus code to only forwarding to analysis endpoint
commit 9f3c884c75289c7b558e8cc8fb0154b5ddd3a323
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Wed Mar 16 08:59:45 2022 +0100
black is back
commit 5950799e03f3578ff58f19430494c6b0c223c0f6
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Wed Mar 16 08:59:11 2022 +0100
add prometheus memory peak monitoring, combine report with analysis report
This commit is contained in:
parent
f5b7203778
commit
391bcc482f
@ -1,11 +1,16 @@
|
||||
import logging
|
||||
|
||||
import requests
|
||||
from flask import Flask, jsonify
|
||||
from retry import retry
|
||||
from waitress import serve
|
||||
|
||||
from pyinfra.config import CONFIG
|
||||
|
||||
|
||||
logger = logging.getLogger(__file__)
|
||||
logger.setLevel(CONFIG.service.logging_level)
|
||||
|
||||
|
||||
def run_probing_webserver(app, host=None, port=None, mode=None):
|
||||
if not host:
|
||||
host = CONFIG.probing_webserver.host
|
||||
@ -24,6 +29,7 @@ def run_probing_webserver(app, host=None, port=None, mode=None):
|
||||
|
||||
|
||||
def set_up_probing_webserver():
|
||||
# TODO: implement meaningful checks
|
||||
app = Flask(__name__)
|
||||
|
||||
@app.route("/ready", methods=["GET"])
|
||||
@ -39,14 +45,13 @@ def set_up_probing_webserver():
|
||||
return resp
|
||||
|
||||
@app.route("/prometheus", methods=["GET"])
|
||||
def get_analysis_prometheus_endpoint():
|
||||
@retry(requests.exceptions.ConnectionError, tries=3, delay=5, jitter=(1, 3))
|
||||
def inner():
|
||||
prom_endpoint = f"{CONFIG.rabbitmq.callback.analysis_endpoint}/prometheus"
|
||||
metric = requests.get(prom_endpoint)
|
||||
metric.raise_for_status()
|
||||
return metric.text
|
||||
|
||||
return inner()
|
||||
def get_metrics_from_analysis_endpoint():
|
||||
try:
|
||||
resp = requests.get(f"{CONFIG.rabbitmq.callback.analysis_endpoint}/prometheus")
|
||||
resp.raise_for_status()
|
||||
return resp.text
|
||||
except Exception as err:
|
||||
logger.warning(f"Got no metrics from analysis prometheus endpoint: {err}")
|
||||
return resp
|
||||
|
||||
return app
|
||||
|
||||
@ -26,6 +26,16 @@ def parse_args():
|
||||
return args
|
||||
|
||||
|
||||
def combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, extension):
|
||||
return f"{dossier_id}/{file_id}{extension}"
|
||||
|
||||
|
||||
def upload_compressed_response(storage, bucket_name, dossier_id, file_id, result) -> None:
|
||||
data = gzip.compress(result.encode())
|
||||
path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, CONFIG.service.response.extension)
|
||||
storage.put_object(bucket_name, path_gz, data)
|
||||
|
||||
|
||||
def add_file_compressed(storage, bucket_name, dossier_id, path) -> None:
|
||||
|
||||
path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, Path(path).stem, ".ORIGIN.pdf.gz")
|
||||
@ -57,13 +67,3 @@ if __name__ == "__main__":
|
||||
|
||||
elif args.command == "purge":
|
||||
storage.clear_bucket(bucket_name)
|
||||
|
||||
|
||||
def combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, extension):
|
||||
return f"{dossier_id}/{file_id}{extension}"
|
||||
|
||||
|
||||
def upload_compressed_response(storage, bucket_name, dossier_id, file_id, result) -> None:
|
||||
data = gzip.compress(result.encode())
|
||||
path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, CONFIG.service.response.extension)
|
||||
storage.put_object(bucket_name, path_gz, data)
|
||||
|
||||
@ -6,6 +6,34 @@ from pyinfra.config import CONFIG
|
||||
from pyinfra.storage.storages import get_s3_storage
|
||||
|
||||
|
||||
def read_connection_params():
|
||||
credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password)
|
||||
parameters = pika.ConnectionParameters(
|
||||
host=CONFIG.rabbitmq.host,
|
||||
port=CONFIG.rabbitmq.port,
|
||||
heartbeat=CONFIG.rabbitmq.heartbeat,
|
||||
credentials=credentials,
|
||||
)
|
||||
return parameters
|
||||
|
||||
|
||||
def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel:
|
||||
channel = connection.channel()
|
||||
channel.basic_qos(prefetch_count=1)
|
||||
return channel
|
||||
|
||||
|
||||
def declare_queue(channel, queue: str):
|
||||
args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": CONFIG.rabbitmq.queues.dead_letter}
|
||||
return channel.queue_declare(queue=queue, auto_delete=False, arguments=args)
|
||||
|
||||
|
||||
def make_connection() -> pika.BlockingConnection:
|
||||
parameters = read_connection_params()
|
||||
connection = pika.BlockingConnection(parameters)
|
||||
return connection
|
||||
|
||||
|
||||
def build_message_bodies():
|
||||
storage = get_s3_storage()
|
||||
for bucket_name, pdf_name in storage.get_all_object_names(CONFIG.storage.bucket):
|
||||
@ -21,23 +49,6 @@ def build_message_bodies():
|
||||
).encode()
|
||||
|
||||
|
||||
def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel:
|
||||
channel = connection.channel()
|
||||
channel.basic_qos(prefetch_count=CONFIG.rabbitmq.prefetch_count)
|
||||
return channel
|
||||
|
||||
|
||||
def declare_queue(channel, queue: str):
|
||||
args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": CONFIG.rabbitmq.queues.dead_letter}
|
||||
return channel.queue_declare(queue=queue, auto_delete=False, arguments=args)
|
||||
|
||||
|
||||
def make_connection() -> pika.BlockingConnection:
|
||||
parameters = read_connection_params()
|
||||
connection = pika.BlockingConnection(parameters)
|
||||
return connection
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
connection = make_connection()
|
||||
@ -52,14 +63,3 @@ if __name__ == "__main__":
|
||||
for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output):
|
||||
print(f"Received {json.loads(body)}")
|
||||
channel.basic_ack(method_frame.delivery_tag)
|
||||
|
||||
|
||||
def read_connection_params():
|
||||
credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password)
|
||||
parameters = pika.ConnectionParameters(
|
||||
host=CONFIG.rabbitmq.host,
|
||||
port=CONFIG.rabbitmq.port,
|
||||
heartbeat=CONFIG.rabbitmq.heartbeat,
|
||||
credentials=credentials,
|
||||
)
|
||||
return parameters
|
||||
|
||||
@ -37,9 +37,9 @@ def make_callback(analysis_endpoint):
|
||||
|
||||
|
||||
def main():
|
||||
# TODO: implement meaningful checks
|
||||
logging.info(make_art())
|
||||
|
||||
webserver = Process(target=run_probing_webserver, args=(set_up_probing_webserver(),))
|
||||
logging.info(make_art())
|
||||
logging.info("Starting webserver...")
|
||||
webserver.start()
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user