Merge in RR/pyinfra from add-prometheus-metrics to master
Squashed commit of the following:
commit 3736e867bfb105f2c2601f6d25343c996027cc5f
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Mar 16 11:35:25 2022 +0100
removed obsolete config entry
commit dc191b17d863ec4f8009fb130c2c3a78d4116969
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Mar 16 11:34:12 2022 +0100
removed obsolete dependency
commit 5ba9765e88da7dd15700b211794f433a6f7ea0df
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Mar 16 11:32:37 2022 +0100
changed error handling for prometheus endpoint
commit 894a6b5d4c7026b9a703a8b2cd70641e7ed7323b
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Mar 16 11:16:39 2022 +0100
fixed definition order broken by auto-refac; reduced prometheus code to only forwarding to analysis endpoint
commit 9f3c884c75289c7b558e8cc8fb0154b5ddd3a323
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Wed Mar 16 08:59:45 2022 +0100
black is back
commit 5950799e03f3578ff58f19430494c6b0c223c0f6
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Wed Mar 16 08:59:11 2022 +0100
add prometheus memory peak monitoring, combine report with analysis report
80 lines
2.6 KiB
Python
80 lines
2.6 KiB
Python
import logging
|
|
from multiprocessing import Process
|
|
|
|
import requests
|
|
from retry import retry
|
|
|
|
from pyinfra.config import CONFIG, make_art
|
|
from pyinfra.exceptions import AnalysisFailure, ConsumerError
|
|
from pyinfra.flask import run_probing_webserver, set_up_probing_webserver
|
|
from pyinfra.queue.consumer import Consumer
|
|
from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager
|
|
from pyinfra.storage.storages import get_storage
|
|
from pyinfra.visitor import QueueVisitor, StorageStrategy
|
|
|
|
|
|
def make_callback(analysis_endpoint):
|
|
def callback(message):
|
|
def perform_operation(operation):
|
|
endpoint = f"{analysis_endpoint}/{operation}"
|
|
try:
|
|
logging.debug(f"Requesting analysis from {endpoint}...")
|
|
analysis_response = requests.post(endpoint, data=message["data"])
|
|
analysis_response.raise_for_status()
|
|
analysis_response = analysis_response.json()
|
|
logging.debug(f"Received response.")
|
|
return analysis_response
|
|
except Exception as err:
|
|
logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.")
|
|
raise AnalysisFailure() from err
|
|
|
|
operations = message.get("operations", ["/"])
|
|
results = map(perform_operation, operations)
|
|
result = dict(zip(operations, results))
|
|
return result
|
|
|
|
return callback
|
|
|
|
|
|
def main():
|
|
|
|
webserver = Process(target=run_probing_webserver, args=(set_up_probing_webserver(),))
|
|
logging.info(make_art())
|
|
logging.info("Starting webserver...")
|
|
webserver.start()
|
|
|
|
callback = make_callback(CONFIG.rabbitmq.callback.analysis_endpoint)
|
|
storage = get_storage(CONFIG.storage.backend)
|
|
response_strategy = StorageStrategy(storage)
|
|
visitor = QueueVisitor(storage, callback, response_strategy)
|
|
|
|
queue_manager = PikaQueueManager(CONFIG.rabbitmq.queues.input, CONFIG.rabbitmq.queues.output)
|
|
|
|
@retry(ConsumerError, tries=3, delay=5, jitter=(1, 3))
|
|
def consume():
|
|
try:
|
|
consumer = Consumer(visitor, queue_manager)
|
|
consumer.consume_and_publish()
|
|
except Exception as err:
|
|
raise ConsumerError from err
|
|
|
|
try:
|
|
consume()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
except ConsumerError:
|
|
webserver.terminate()
|
|
raise
|
|
|
|
webserver.join()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging_level = CONFIG.service.logging_level
|
|
logging.basicConfig(level=logging_level)
|
|
logging.getLogger("pika").setLevel(logging.ERROR)
|
|
logging.getLogger("flask").setLevel(logging.ERROR)
|
|
logging.getLogger("urllib3").setLevel(logging.ERROR)
|
|
|
|
main()
|