diff --git a/pyinfra/examples.py b/pyinfra/examples.py index bf8bf1d..e35841a 100644 --- a/pyinfra/examples.py +++ b/pyinfra/examples.py @@ -1,4 +1,5 @@ import asyncio +from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type from dynaconf import Dynaconf from fastapi import FastAPI @@ -6,8 +7,8 @@ from kn_utils.logging import logger from pyinfra.config.loader import get_pyinfra_validators, validate_settings from pyinfra.queue.async_manager import AsyncQueueManager, RabbitMQConfig -from pyinfra.queue.callback import Callback from pyinfra.queue.manager import QueueManager +from pyinfra.queue.callback import Callback from pyinfra.utils.opentelemetry import instrument_app, instrument_pika, setup_trace from pyinfra.webserver.prometheus import ( add_prometheus_endpoint, @@ -20,6 +21,12 @@ from pyinfra.webserver.utils import ( ) +@retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1, min=4, max=60), + retry=retry_if_exception_type((Exception,)), # You might want to be more specific here + reraise=True, +) async def run_async_queues(manager, app, port, host): """Run the async webserver and the async queue manager concurrently.""" try: @@ -32,6 +39,21 @@ async def run_async_queues(manager, app, port, host): await manager.shutdown() +# async def run_async_queues(manager, app, port, host): +# server = None +# try: +# await manager.run() +# server = await asyncio.start_server(app, host, port) +# await server.serve_forever() +# except Exception as e: +# logger.error(f"An error occurred while running async queues: {e}") +# finally: +# if server: +# server.close() +# await server.wait_closed() +# await manager.shutdown() + + def start_standard_queue_consumer( callback: Callback, settings: Dynaconf, @@ -91,6 +113,12 @@ def start_standard_queue_consumer( elif isinstance(manager, QueueManager): webserver = create_webserver_thread_from_settings(app, settings) webserver.start() - manager.start_consuming(callback) + try: + manager.start_consuming(callback) + except Exception as e: + logger.error(f"An error occurred while consuming messages: {e}") + # Optionally, you can choose to exit here if you want to restart the process + # import sys + # sys.exit(1) else: logger.warning(f"Behavior for type {type(manager)} is not defined") diff --git a/pyinfra/queue/async_manager.py b/pyinfra/queue/async_manager.py index c86e88d..4d3fe31 100644 --- a/pyinfra/queue/async_manager.py +++ b/pyinfra/queue/async_manager.py @@ -21,7 +21,11 @@ from tenacity import ( retry_if_exception_type, stop_after_attempt, wait_exponential_jitter, + wait_exponential, + retry_if_exception_type, ) +from aio_pika.exceptions import AMQPConnectionError, ChannelInvalidStateError +from aiormq.exceptions import AMQPConnectionError @dataclass @@ -74,15 +78,34 @@ class AsyncQueueManager: self.message_count: int = 0 + @retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1, min=4, max=60), + retry=retry_if_exception_type(AMQPConnectionError), + reraise=True, + ) async def connect(self) -> None: + logger.info("Attempting to connect to RabbitMQ...") self.connection = await connect_robust(**self.config.connection_params) self.channel = await self.connection.channel() await self.channel.set_qos(prefetch_count=1) + logger.info("Successfully connected to RabbitMQ") async def is_ready(self) -> bool: - await self.connect() - return self.connection is not None and not self.connection.is_closed + if self.connection is None or self.connection.is_closed: + try: + await self.connect() + except Exception as e: + logger.error(f"Failed to connect to RabbitMQ: {e}") + return False + return True + @retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1, min=4, max=60), + retry=retry_if_exception_type((AMQPConnectionError, ChannelInvalidStateError)), + reraise=True, + ) async def setup_exchanges(self) -> None: self.tenant_exchange = await self.channel.declare_exchange( self.config.tenant_exchange_name, ExchangeType.TOPIC, durable=True @@ -94,6 +117,12 @@ class AsyncQueueManager: self.config.service_response_exchange_name, ExchangeType.DIRECT, durable=True ) + @retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1, min=4, max=60), + retry=retry_if_exception_type((AMQPConnectionError, ChannelInvalidStateError)), + reraise=True, + ) async def setup_tenant_queue(self) -> None: self.tenant_exchange_queue = await self.channel.declare_queue( f"{self.config.pod_name}_{self.config.tenant_event_queue_suffix}", @@ -230,6 +259,12 @@ class AsyncQueueManager: ) return set() + @retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1, min=4, max=60), + retry=retry_if_exception_type((AMQPConnectionError, ChannelInvalidStateError)), + reraise=True, + ) async def initialize_tenant_queues(self) -> None: try: active_tenants = await self.fetch_active_tenants() @@ -247,6 +282,9 @@ class AsyncQueueManager: await self.setup_tenant_queue() logger.info("RabbitMQ handler is running. Press CTRL+C to exit.") + except AMQPConnectionError as e: + logger.error(f"Failed to establish connection to RabbitMQ: {e}") + # TODO: implement a custom exception handling strategy here except asyncio.CancelledError: logger.warning("Operation cancelled.") except Exception as e: diff --git a/pyinfra/queue/manager.py b/pyinfra/queue/manager.py index b1d1a77..45bf0b8 100644 --- a/pyinfra/queue/manager.py +++ b/pyinfra/queue/manager.py @@ -12,6 +12,7 @@ from dynaconf import Dynaconf from kn_utils.logging import logger from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection from retry import retry +from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type from pyinfra.config.loader import validate_settings from pyinfra.config.validators import queue_manager_validators @@ -42,6 +43,9 @@ class QueueManager: signal.signal(signal.SIGTERM, self._handle_stop_signal) signal.signal(signal.SIGINT, self._handle_stop_signal) + self.max_retries = settings.rabbitmq.max_retries or 5 + self.max_delay = settings.rabbitmq.max_delay or 60 + @staticmethod def create_connection_parameters(settings: Dynaconf): credentials = pika.PlainCredentials(username=settings.rabbitmq.username, password=settings.rabbitmq.password) @@ -54,9 +58,13 @@ class QueueManager: return pika.ConnectionParameters(**pika_connection_params) - @retry(tries=3, delay=5, jitter=(1, 3), logger=logger) + @retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1, min=4, max=60), + retry=retry_if_exception_type((pika.exceptions.AMQPConnectionError, pika.exceptions.ChannelClosedByBroker)), + reraise=True, + ) def establish_connection(self): - # TODO: set sensible retry parameters if self.connection and self.connection.is_open: logger.debug("Connection to RabbitMQ already established.") return @@ -79,19 +87,32 @@ class QueueManager: logger.info("Connection to RabbitMQ established, channel open.") def is_ready(self): - self.establish_connection() - return self.channel.is_open + try: + self.establish_connection() + return self.channel.is_open + except Exception as e: + logger.error(f"Failed to establish connection: {e}") + return False - @retry(exceptions=pika.exceptions.AMQPConnectionError, tries=3, delay=5, jitter=(1, 3), logger=logger) + @retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1, min=4, max=60), + retry=retry_if_exception_type(pika.exceptions.AMQPConnectionError), + reraise=True, + ) def start_consuming(self, message_processor: Callable): on_message_callback = self._make_on_message_callback(message_processor) try: self.establish_connection() self.channel.basic_consume(self.input_queue, on_message_callback) + logger.info("Starting to consume messages...") self.channel.start_consuming() - except Exception: - logger.error("An unexpected error occurred while consuming messages. Consuming will stop.", exc_info=True) + except pika.exceptions.AMQPConnectionError as e: + logger.error(f"AMQP Connection Error: {e}") + raise + except Exception as e: + logger.error(f"An unexpected error occurred while consuming messages: {e}", exc_info=True) raise finally: self.stop_consuming() diff --git a/pyinfra/webserver/utils.py b/pyinfra/webserver/utils.py index 1843f46..5debb15 100644 --- a/pyinfra/webserver/utils.py +++ b/pyinfra/webserver/utils.py @@ -3,6 +3,8 @@ import inspect import logging import signal import threading +import time +from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type from typing import Callable import uvicorn @@ -14,6 +16,12 @@ from pyinfra.config.loader import validate_settings from pyinfra.config.validators import webserver_validators +@retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1, min=2, max=30), + retry=retry_if_exception_type((Exception,)), # You might want to be more specific here + reraise=True, +) def create_webserver_thread_from_settings(app: FastAPI, settings: Dynaconf) -> threading.Thread: validate_settings(settings, validators=webserver_validators) return create_webserver_thread(app=app, port=settings.webserver.port, host=settings.webserver.host) @@ -23,7 +31,22 @@ def create_webserver_thread(app: FastAPI, port: int, host: str) -> threading.Thr """Creates a thread that runs a FastAPI webserver. Start with thread.start(), and join with thread.join(). Note that the thread is a daemon thread, so it will be terminated when the main thread is terminated. """ - thread = threading.Thread(target=lambda: uvicorn.run(app, port=port, host=host, log_level=logging.WARNING)) + + def run_server(): + retries = 5 + for attempt in range(retries): + try: + uvicorn.run(app, port=port, host=host, log_level=logging.WARNING) + break + except Exception as e: + if attempt < retries - 1: # if it's not the last attempt + logger.warning(f"Attempt {attempt + 1} failed to start the server: {e}. Retrying...") + time.sleep(2**attempt) # exponential backoff + else: + logger.error(f"Failed to start the server after {retries} attempts: {e}") + raise + + thread = threading.Thread(target=run_server) thread.daemon = True return thread diff --git a/pyproject.toml b/pyproject.toml index 23d0d1c..296cd79 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pyinfra" -version = "3.2.5" +version = "3.2.6" description = "" authors = ["Team Research "] license = "All rights reseverd"