diff --git a/pyinfra/flask.py b/pyinfra/flask.py index 81e8b94..35e3d3a 100644 --- a/pyinfra/flask.py +++ b/pyinfra/flask.py @@ -1,11 +1,16 @@ +import logging + import requests from flask import Flask, jsonify -from retry import retry from waitress import serve from pyinfra.config import CONFIG +logger = logging.getLogger(__file__) +logger.setLevel(CONFIG.service.logging_level) + + def run_probing_webserver(app, host=None, port=None, mode=None): if not host: host = CONFIG.probing_webserver.host @@ -24,6 +29,7 @@ def run_probing_webserver(app, host=None, port=None, mode=None): def set_up_probing_webserver(): + # TODO: implement meaningful checks app = Flask(__name__) @app.route("/ready", methods=["GET"]) @@ -39,14 +45,13 @@ def set_up_probing_webserver(): return resp @app.route("/prometheus", methods=["GET"]) - def get_analysis_prometheus_endpoint(): - @retry(requests.exceptions.ConnectionError, tries=3, delay=5, jitter=(1, 3)) - def inner(): - prom_endpoint = f"{CONFIG.rabbitmq.callback.analysis_endpoint}/prometheus" - metric = requests.get(prom_endpoint) - metric.raise_for_status() - return metric.text - - return inner() + def get_metrics_from_analysis_endpoint(): + try: + resp = requests.get(f"{CONFIG.rabbitmq.callback.analysis_endpoint}/prometheus") + resp.raise_for_status() + return resp.text + except Exception as err: + logger.warning(f"Got no metrics from analysis prometheus endpoint: {err}") + return resp return app diff --git a/scripts/manage_minio.py b/scripts/manage_minio.py index 4528a12..3a72632 100644 --- a/scripts/manage_minio.py +++ b/scripts/manage_minio.py @@ -26,6 +26,16 @@ def parse_args(): return args +def combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, extension): + return f"{dossier_id}/{file_id}{extension}" + + +def upload_compressed_response(storage, bucket_name, dossier_id, file_id, result) -> None: + data = gzip.compress(result.encode()) + path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, CONFIG.service.response.extension) + storage.put_object(bucket_name, path_gz, data) + + def add_file_compressed(storage, bucket_name, dossier_id, path) -> None: path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, Path(path).stem, ".ORIGIN.pdf.gz") @@ -57,13 +67,3 @@ if __name__ == "__main__": elif args.command == "purge": storage.clear_bucket(bucket_name) - - -def combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, extension): - return f"{dossier_id}/{file_id}{extension}" - - -def upload_compressed_response(storage, bucket_name, dossier_id, file_id, result) -> None: - data = gzip.compress(result.encode()) - path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, CONFIG.service.response.extension) - storage.put_object(bucket_name, path_gz, data) diff --git a/scripts/mock_client.py b/scripts/mock_client.py index 938561c..3590d3e 100644 --- a/scripts/mock_client.py +++ b/scripts/mock_client.py @@ -6,6 +6,34 @@ from pyinfra.config import CONFIG from pyinfra.storage.storages import get_s3_storage +def read_connection_params(): + credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) + parameters = pika.ConnectionParameters( + host=CONFIG.rabbitmq.host, + port=CONFIG.rabbitmq.port, + heartbeat=CONFIG.rabbitmq.heartbeat, + credentials=credentials, + ) + return parameters + + +def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel: + channel = connection.channel() + channel.basic_qos(prefetch_count=1) + return channel + + +def declare_queue(channel, queue: str): + args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": CONFIG.rabbitmq.queues.dead_letter} + return channel.queue_declare(queue=queue, auto_delete=False, arguments=args) + + +def make_connection() -> pika.BlockingConnection: + parameters = read_connection_params() + connection = pika.BlockingConnection(parameters) + return connection + + def build_message_bodies(): storage = get_s3_storage() for bucket_name, pdf_name in storage.get_all_object_names(CONFIG.storage.bucket): @@ -21,23 +49,6 @@ def build_message_bodies(): ).encode() -def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel: - channel = connection.channel() - channel.basic_qos(prefetch_count=CONFIG.rabbitmq.prefetch_count) - return channel - - -def declare_queue(channel, queue: str): - args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": CONFIG.rabbitmq.queues.dead_letter} - return channel.queue_declare(queue=queue, auto_delete=False, arguments=args) - - -def make_connection() -> pika.BlockingConnection: - parameters = read_connection_params() - connection = pika.BlockingConnection(parameters) - return connection - - if __name__ == "__main__": connection = make_connection() @@ -52,14 +63,3 @@ if __name__ == "__main__": for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output): print(f"Received {json.loads(body)}") channel.basic_ack(method_frame.delivery_tag) - - -def read_connection_params(): - credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) - parameters = pika.ConnectionParameters( - host=CONFIG.rabbitmq.host, - port=CONFIG.rabbitmq.port, - heartbeat=CONFIG.rabbitmq.heartbeat, - credentials=credentials, - ) - return parameters diff --git a/src/serve.py b/src/serve.py index 7373e57..8b76724 100644 --- a/src/serve.py +++ b/src/serve.py @@ -37,9 +37,9 @@ def make_callback(analysis_endpoint): def main(): - # TODO: implement meaningful checks - logging.info(make_art()) + webserver = Process(target=run_probing_webserver, args=(set_up_probing_webserver(),)) + logging.info(make_art()) logging.info("Starting webserver...") webserver.start()