Merge in RR/pyinfra from multiple_consumer_fix to master
Squashed commit of the following:
commit b2892d2f21c90e4ebc9a48718ab0a834bf65fc32
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Apr 21 17:38:11 2022 +0200
formatting
commit 57b3be627caf3c6a87ec248036b7258e9063709d
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Apr 21 17:36:05 2022 +0200
removed debug print
commit 41db74b9103b55fd6f6d79b27a654042f86c86ea
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Apr 21 17:35:05 2022 +0200
reintroduced usage for basic_consume. Normal consume was blocking other consumers, but basic_consume is not easily testable.
83 lines
2.7 KiB
Python
83 lines
2.7 KiB
Python
import logging
|
|
from multiprocessing import Process
|
|
|
|
import requests
|
|
from retry import retry
|
|
|
|
from pyinfra.config import CONFIG
|
|
from pyinfra.exceptions import AnalysisFailure, ConsumerError
|
|
from pyinfra.flask import run_probing_webserver, set_up_probing_webserver
|
|
from pyinfra.queue.consumer import Consumer
|
|
from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager
|
|
from pyinfra.storage.storages import get_storage
|
|
from pyinfra.utils.banner import show_banner
|
|
from pyinfra.visitor import QueueVisitor, StorageStrategy
|
|
|
|
|
|
def make_callback(analysis_endpoint):
|
|
def callback(message):
|
|
def perform_operation(operation):
|
|
endpoint = f"{analysis_endpoint}/{operation}"
|
|
try:
|
|
logging.debug(f"Requesting analysis from {endpoint}...")
|
|
analysis_response = requests.post(endpoint, data=message["data"])
|
|
analysis_response.raise_for_status()
|
|
analysis_response = analysis_response.json()
|
|
logging.debug(f"Received response.")
|
|
return analysis_response
|
|
except Exception as err:
|
|
logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.")
|
|
raise AnalysisFailure() from err
|
|
|
|
operations = message.get("operations", ["/"])
|
|
results = map(perform_operation, operations)
|
|
result = dict(zip(operations, results))
|
|
if list(result.keys()) == ["/"]:
|
|
result = list(result.values())[0]
|
|
return result
|
|
|
|
return callback
|
|
|
|
|
|
def main():
|
|
show_banner()
|
|
|
|
webserver = Process(target=run_probing_webserver, args=(set_up_probing_webserver(),))
|
|
logging.info("Starting webserver...")
|
|
webserver.start()
|
|
|
|
callback = make_callback(CONFIG.rabbitmq.callback.analysis_endpoint)
|
|
storage = get_storage(CONFIG.storage.backend)
|
|
response_strategy = StorageStrategy(storage)
|
|
visitor = QueueVisitor(storage, callback, response_strategy)
|
|
|
|
queue_manager = PikaQueueManager(CONFIG.rabbitmq.queues.input, CONFIG.rabbitmq.queues.output)
|
|
|
|
@retry(ConsumerError, tries=3, delay=5, jitter=(1, 3))
|
|
def consume():
|
|
consumer = Consumer(visitor, queue_manager)
|
|
try:
|
|
consumer.basic_consume_and_publish()
|
|
except Exception as err:
|
|
raise ConsumerError() from err
|
|
|
|
try:
|
|
consume()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
except ConsumerError:
|
|
webserver.terminate()
|
|
raise
|
|
|
|
webserver.join()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging_level = CONFIG.service.logging_level
|
|
logging.basicConfig(level=logging_level)
|
|
logging.getLogger("pika").setLevel(logging.ERROR)
|
|
logging.getLogger("flask").setLevel(logging.ERROR)
|
|
logging.getLogger("urllib3").setLevel(logging.ERROR)
|
|
|
|
main()
|