Merge branch 'RES-842-pyinfra-fix-rabbit-mq-handler-shuts-down-when-queues-not-available-yet' into 'master'

fix(queuemanager): add retries to prevent container from shutting down when queues are not available yet

See merge request knecon/research/pyinfra!94
This commit is contained in:
Julius Unverfehrt 2024-08-30 13:59:02 +02:00
commit 5f31e2b15f
5 changed files with 123 additions and 13 deletions

View File

@ -1,4 +1,5 @@
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from dynaconf import Dynaconf
from fastapi import FastAPI
@ -6,8 +7,8 @@ from kn_utils.logging import logger
from pyinfra.config.loader import get_pyinfra_validators, validate_settings
from pyinfra.queue.async_manager import AsyncQueueManager, RabbitMQConfig
from pyinfra.queue.callback import Callback
from pyinfra.queue.manager import QueueManager
from pyinfra.queue.callback import Callback
from pyinfra.utils.opentelemetry import instrument_app, instrument_pika, setup_trace
from pyinfra.webserver.prometheus import (
add_prometheus_endpoint,
@ -20,6 +21,12 @@ from pyinfra.webserver.utils import (
)
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type((Exception,)), # You might want to be more specific here
reraise=True,
)
async def run_async_queues(manager, app, port, host):
"""Run the async webserver and the async queue manager concurrently."""
try:
@ -32,6 +39,21 @@ async def run_async_queues(manager, app, port, host):
await manager.shutdown()
# async def run_async_queues(manager, app, port, host):
# server = None
# try:
# await manager.run()
# server = await asyncio.start_server(app, host, port)
# await server.serve_forever()
# except Exception as e:
# logger.error(f"An error occurred while running async queues: {e}")
# finally:
# if server:
# server.close()
# await server.wait_closed()
# await manager.shutdown()
def start_standard_queue_consumer(
callback: Callback,
settings: Dynaconf,
@ -91,6 +113,12 @@ def start_standard_queue_consumer(
elif isinstance(manager, QueueManager):
webserver = create_webserver_thread_from_settings(app, settings)
webserver.start()
manager.start_consuming(callback)
try:
manager.start_consuming(callback)
except Exception as e:
logger.error(f"An error occurred while consuming messages: {e}")
# Optionally, you can choose to exit here if you want to restart the process
# import sys
# sys.exit(1)
else:
logger.warning(f"Behavior for type {type(manager)} is not defined")

View File

@ -21,7 +21,11 @@ from tenacity import (
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
wait_exponential,
retry_if_exception_type,
)
from aio_pika.exceptions import AMQPConnectionError, ChannelInvalidStateError
from aiormq.exceptions import AMQPConnectionError
@dataclass
@ -74,15 +78,34 @@ class AsyncQueueManager:
self.message_count: int = 0
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(AMQPConnectionError),
reraise=True,
)
async def connect(self) -> None:
logger.info("Attempting to connect to RabbitMQ...")
self.connection = await connect_robust(**self.config.connection_params)
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=1)
logger.info("Successfully connected to RabbitMQ")
async def is_ready(self) -> bool:
await self.connect()
return self.connection is not None and not self.connection.is_closed
if self.connection is None or self.connection.is_closed:
try:
await self.connect()
except Exception as e:
logger.error(f"Failed to connect to RabbitMQ: {e}")
return False
return True
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type((AMQPConnectionError, ChannelInvalidStateError)),
reraise=True,
)
async def setup_exchanges(self) -> None:
self.tenant_exchange = await self.channel.declare_exchange(
self.config.tenant_exchange_name, ExchangeType.TOPIC, durable=True
@ -94,6 +117,12 @@ class AsyncQueueManager:
self.config.service_response_exchange_name, ExchangeType.DIRECT, durable=True
)
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type((AMQPConnectionError, ChannelInvalidStateError)),
reraise=True,
)
async def setup_tenant_queue(self) -> None:
self.tenant_exchange_queue = await self.channel.declare_queue(
f"{self.config.pod_name}_{self.config.tenant_event_queue_suffix}",
@ -230,6 +259,12 @@ class AsyncQueueManager:
)
return set()
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type((AMQPConnectionError, ChannelInvalidStateError)),
reraise=True,
)
async def initialize_tenant_queues(self) -> None:
try:
active_tenants = await self.fetch_active_tenants()
@ -247,6 +282,9 @@ class AsyncQueueManager:
await self.setup_tenant_queue()
logger.info("RabbitMQ handler is running. Press CTRL+C to exit.")
except AMQPConnectionError as e:
logger.error(f"Failed to establish connection to RabbitMQ: {e}")
# TODO: implement a custom exception handling strategy here
except asyncio.CancelledError:
logger.warning("Operation cancelled.")
except Exception as e:

View File

@ -12,6 +12,7 @@ from dynaconf import Dynaconf
from kn_utils.logging import logger
from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection
from retry import retry
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from pyinfra.config.loader import validate_settings
from pyinfra.config.validators import queue_manager_validators
@ -42,6 +43,9 @@ class QueueManager:
signal.signal(signal.SIGTERM, self._handle_stop_signal)
signal.signal(signal.SIGINT, self._handle_stop_signal)
self.max_retries = settings.rabbitmq.max_retries or 5
self.max_delay = settings.rabbitmq.max_delay or 60
@staticmethod
def create_connection_parameters(settings: Dynaconf):
credentials = pika.PlainCredentials(username=settings.rabbitmq.username, password=settings.rabbitmq.password)
@ -54,9 +58,13 @@ class QueueManager:
return pika.ConnectionParameters(**pika_connection_params)
@retry(tries=3, delay=5, jitter=(1, 3), logger=logger)
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type((pika.exceptions.AMQPConnectionError, pika.exceptions.ChannelClosedByBroker)),
reraise=True,
)
def establish_connection(self):
# TODO: set sensible retry parameters
if self.connection and self.connection.is_open:
logger.debug("Connection to RabbitMQ already established.")
return
@ -79,19 +87,32 @@ class QueueManager:
logger.info("Connection to RabbitMQ established, channel open.")
def is_ready(self):
self.establish_connection()
return self.channel.is_open
try:
self.establish_connection()
return self.channel.is_open
except Exception as e:
logger.error(f"Failed to establish connection: {e}")
return False
@retry(exceptions=pika.exceptions.AMQPConnectionError, tries=3, delay=5, jitter=(1, 3), logger=logger)
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(pika.exceptions.AMQPConnectionError),
reraise=True,
)
def start_consuming(self, message_processor: Callable):
on_message_callback = self._make_on_message_callback(message_processor)
try:
self.establish_connection()
self.channel.basic_consume(self.input_queue, on_message_callback)
logger.info("Starting to consume messages...")
self.channel.start_consuming()
except Exception:
logger.error("An unexpected error occurred while consuming messages. Consuming will stop.", exc_info=True)
except pika.exceptions.AMQPConnectionError as e:
logger.error(f"AMQP Connection Error: {e}")
raise
except Exception as e:
logger.error(f"An unexpected error occurred while consuming messages: {e}", exc_info=True)
raise
finally:
self.stop_consuming()

View File

@ -3,6 +3,8 @@ import inspect
import logging
import signal
import threading
import time
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from typing import Callable
import uvicorn
@ -14,6 +16,12 @@ from pyinfra.config.loader import validate_settings
from pyinfra.config.validators import webserver_validators
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=2, max=30),
retry=retry_if_exception_type((Exception,)), # You might want to be more specific here
reraise=True,
)
def create_webserver_thread_from_settings(app: FastAPI, settings: Dynaconf) -> threading.Thread:
validate_settings(settings, validators=webserver_validators)
return create_webserver_thread(app=app, port=settings.webserver.port, host=settings.webserver.host)
@ -23,7 +31,22 @@ def create_webserver_thread(app: FastAPI, port: int, host: str) -> threading.Thr
"""Creates a thread that runs a FastAPI webserver. Start with thread.start(), and join with thread.join().
Note that the thread is a daemon thread, so it will be terminated when the main thread is terminated.
"""
thread = threading.Thread(target=lambda: uvicorn.run(app, port=port, host=host, log_level=logging.WARNING))
def run_server():
retries = 5
for attempt in range(retries):
try:
uvicorn.run(app, port=port, host=host, log_level=logging.WARNING)
break
except Exception as e:
if attempt < retries - 1: # if it's not the last attempt
logger.warning(f"Attempt {attempt + 1} failed to start the server: {e}. Retrying...")
time.sleep(2**attempt) # exponential backoff
else:
logger.error(f"Failed to start the server after {retries} attempts: {e}")
raise
thread = threading.Thread(target=run_server)
thread.daemon = True
return thread

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "pyinfra"
version = "3.2.5"
version = "3.2.6"
description = ""
authors = ["Team Research <research@knecon.com>"]
license = "All rights reseverd"