From 391bcc482f69215592b5eeeb5ef5db851336e64c Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Wed, 16 Mar 2022 11:37:37 +0100 Subject: [PATCH] Pull request #21: refactoring of prometheus tunneling Merge in RR/pyinfra from add-prometheus-metrics to master Squashed commit of the following: commit 3736e867bfb105f2c2601f6d25343c996027cc5f Author: Matthias Bisping Date: Wed Mar 16 11:35:25 2022 +0100 removed obsolete config entry commit dc191b17d863ec4f8009fb130c2c3a78d4116969 Author: Matthias Bisping Date: Wed Mar 16 11:34:12 2022 +0100 removed obsolete dependency commit 5ba9765e88da7dd15700b211794f433a6f7ea0df Author: Matthias Bisping Date: Wed Mar 16 11:32:37 2022 +0100 changed error handling for prometheus endpoint commit 894a6b5d4c7026b9a703a8b2cd70641e7ed7323b Author: Matthias Bisping Date: Wed Mar 16 11:16:39 2022 +0100 fixed definition order broken by auto-refac; reduced prometheus code to only forwarding to analysis endpoint commit 9f3c884c75289c7b558e8cc8fb0154b5ddd3a323 Author: Julius Unverfehrt Date: Wed Mar 16 08:59:45 2022 +0100 black is back commit 5950799e03f3578ff58f19430494c6b0c223c0f6 Author: Julius Unverfehrt Date: Wed Mar 16 08:59:11 2022 +0100 add prometheus memory peak monitoring, combine report with analysis report --- pyinfra/flask.py | 25 ++++++++++-------- scripts/manage_minio.py | 20 +++++++-------- scripts/mock_client.py | 56 ++++++++++++++++++++--------------------- src/serve.py | 4 +-- 4 files changed, 55 insertions(+), 50 deletions(-) diff --git a/pyinfra/flask.py b/pyinfra/flask.py index 81e8b94..35e3d3a 100644 --- a/pyinfra/flask.py +++ b/pyinfra/flask.py @@ -1,11 +1,16 @@ +import logging + import requests from flask import Flask, jsonify -from retry import retry from waitress import serve from pyinfra.config import CONFIG +logger = logging.getLogger(__file__) +logger.setLevel(CONFIG.service.logging_level) + + def run_probing_webserver(app, host=None, port=None, mode=None): if not host: host = CONFIG.probing_webserver.host @@ -24,6 +29,7 @@ def run_probing_webserver(app, host=None, port=None, mode=None): def set_up_probing_webserver(): + # TODO: implement meaningful checks app = Flask(__name__) @app.route("/ready", methods=["GET"]) @@ -39,14 +45,13 @@ def set_up_probing_webserver(): return resp @app.route("/prometheus", methods=["GET"]) - def get_analysis_prometheus_endpoint(): - @retry(requests.exceptions.ConnectionError, tries=3, delay=5, jitter=(1, 3)) - def inner(): - prom_endpoint = f"{CONFIG.rabbitmq.callback.analysis_endpoint}/prometheus" - metric = requests.get(prom_endpoint) - metric.raise_for_status() - return metric.text - - return inner() + def get_metrics_from_analysis_endpoint(): + try: + resp = requests.get(f"{CONFIG.rabbitmq.callback.analysis_endpoint}/prometheus") + resp.raise_for_status() + return resp.text + except Exception as err: + logger.warning(f"Got no metrics from analysis prometheus endpoint: {err}") + return resp return app diff --git a/scripts/manage_minio.py b/scripts/manage_minio.py index 4528a12..3a72632 100644 --- a/scripts/manage_minio.py +++ b/scripts/manage_minio.py @@ -26,6 +26,16 @@ def parse_args(): return args +def combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, extension): + return f"{dossier_id}/{file_id}{extension}" + + +def upload_compressed_response(storage, bucket_name, dossier_id, file_id, result) -> None: + data = gzip.compress(result.encode()) + path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, CONFIG.service.response.extension) + storage.put_object(bucket_name, path_gz, data) + + def add_file_compressed(storage, bucket_name, dossier_id, path) -> None: path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, Path(path).stem, ".ORIGIN.pdf.gz") @@ -57,13 +67,3 @@ if __name__ == "__main__": elif args.command == "purge": storage.clear_bucket(bucket_name) - - -def combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, extension): - return f"{dossier_id}/{file_id}{extension}" - - -def upload_compressed_response(storage, bucket_name, dossier_id, file_id, result) -> None: - data = gzip.compress(result.encode()) - path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, CONFIG.service.response.extension) - storage.put_object(bucket_name, path_gz, data) diff --git a/scripts/mock_client.py b/scripts/mock_client.py index 938561c..3590d3e 100644 --- a/scripts/mock_client.py +++ b/scripts/mock_client.py @@ -6,6 +6,34 @@ from pyinfra.config import CONFIG from pyinfra.storage.storages import get_s3_storage +def read_connection_params(): + credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) + parameters = pika.ConnectionParameters( + host=CONFIG.rabbitmq.host, + port=CONFIG.rabbitmq.port, + heartbeat=CONFIG.rabbitmq.heartbeat, + credentials=credentials, + ) + return parameters + + +def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel: + channel = connection.channel() + channel.basic_qos(prefetch_count=1) + return channel + + +def declare_queue(channel, queue: str): + args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": CONFIG.rabbitmq.queues.dead_letter} + return channel.queue_declare(queue=queue, auto_delete=False, arguments=args) + + +def make_connection() -> pika.BlockingConnection: + parameters = read_connection_params() + connection = pika.BlockingConnection(parameters) + return connection + + def build_message_bodies(): storage = get_s3_storage() for bucket_name, pdf_name in storage.get_all_object_names(CONFIG.storage.bucket): @@ -21,23 +49,6 @@ def build_message_bodies(): ).encode() -def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel: - channel = connection.channel() - channel.basic_qos(prefetch_count=CONFIG.rabbitmq.prefetch_count) - return channel - - -def declare_queue(channel, queue: str): - args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": CONFIG.rabbitmq.queues.dead_letter} - return channel.queue_declare(queue=queue, auto_delete=False, arguments=args) - - -def make_connection() -> pika.BlockingConnection: - parameters = read_connection_params() - connection = pika.BlockingConnection(parameters) - return connection - - if __name__ == "__main__": connection = make_connection() @@ -52,14 +63,3 @@ if __name__ == "__main__": for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output): print(f"Received {json.loads(body)}") channel.basic_ack(method_frame.delivery_tag) - - -def read_connection_params(): - credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) - parameters = pika.ConnectionParameters( - host=CONFIG.rabbitmq.host, - port=CONFIG.rabbitmq.port, - heartbeat=CONFIG.rabbitmq.heartbeat, - credentials=credentials, - ) - return parameters diff --git a/src/serve.py b/src/serve.py index 7373e57..8b76724 100644 --- a/src/serve.py +++ b/src/serve.py @@ -37,9 +37,9 @@ def make_callback(analysis_endpoint): def main(): - # TODO: implement meaningful checks - logging.info(make_art()) + webserver = Process(target=run_probing_webserver, args=(set_up_probing_webserver(),)) + logging.info(make_art()) logging.info("Starting webserver...") webserver.start()