pyinfra/src/serve.py
Julius Unverfehrt 391bcc482f Pull request #21: refactoring of prometheus tunneling
Merge in RR/pyinfra from add-prometheus-metrics to master

Squashed commit of the following:

commit 3736e867bfb105f2c2601f6d25343c996027cc5f
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Mar 16 11:35:25 2022 +0100

    removed obsolete config entry

commit dc191b17d863ec4f8009fb130c2c3a78d4116969
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Mar 16 11:34:12 2022 +0100

    removed obsolete dependency

commit 5ba9765e88da7dd15700b211794f433a6f7ea0df
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Mar 16 11:32:37 2022 +0100

    changed error handling for prometheus endpoint

commit 894a6b5d4c7026b9a703a8b2cd70641e7ed7323b
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Mar 16 11:16:39 2022 +0100

    fixed definition order broken by auto-refac; reduced prometheus code to only forwarding to analysis endpoint

commit 9f3c884c75289c7b558e8cc8fb0154b5ddd3a323
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Wed Mar 16 08:59:45 2022 +0100

    black is back

commit 5950799e03f3578ff58f19430494c6b0c223c0f6
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Wed Mar 16 08:59:11 2022 +0100

    add prometheus memory peak monitoring, combine report with analysis report
2022-03-16 11:37:37 +01:00

80 lines
2.6 KiB
Python

import logging
from multiprocessing import Process
import requests
from retry import retry
from pyinfra.config import CONFIG, make_art
from pyinfra.exceptions import AnalysisFailure, ConsumerError
from pyinfra.flask import run_probing_webserver, set_up_probing_webserver
from pyinfra.queue.consumer import Consumer
from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager
from pyinfra.storage.storages import get_storage
from pyinfra.visitor import QueueVisitor, StorageStrategy
def make_callback(analysis_endpoint):
def callback(message):
def perform_operation(operation):
endpoint = f"{analysis_endpoint}/{operation}"
try:
logging.debug(f"Requesting analysis from {endpoint}...")
analysis_response = requests.post(endpoint, data=message["data"])
analysis_response.raise_for_status()
analysis_response = analysis_response.json()
logging.debug(f"Received response.")
return analysis_response
except Exception as err:
logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.")
raise AnalysisFailure() from err
operations = message.get("operations", ["/"])
results = map(perform_operation, operations)
result = dict(zip(operations, results))
return result
return callback
def main():
webserver = Process(target=run_probing_webserver, args=(set_up_probing_webserver(),))
logging.info(make_art())
logging.info("Starting webserver...")
webserver.start()
callback = make_callback(CONFIG.rabbitmq.callback.analysis_endpoint)
storage = get_storage(CONFIG.storage.backend)
response_strategy = StorageStrategy(storage)
visitor = QueueVisitor(storage, callback, response_strategy)
queue_manager = PikaQueueManager(CONFIG.rabbitmq.queues.input, CONFIG.rabbitmq.queues.output)
@retry(ConsumerError, tries=3, delay=5, jitter=(1, 3))
def consume():
try:
consumer = Consumer(visitor, queue_manager)
consumer.consume_and_publish()
except Exception as err:
raise ConsumerError from err
try:
consume()
except KeyboardInterrupt:
pass
except ConsumerError:
webserver.terminate()
raise
webserver.join()
if __name__ == "__main__":
logging_level = CONFIG.service.logging_level
logging.basicConfig(level=logging_level)
logging.getLogger("pika").setLevel(logging.ERROR)
logging.getLogger("flask").setLevel(logging.ERROR)
logging.getLogger("urllib3").setLevel(logging.ERROR)
main()