From bd2f0b9b9a4d77f005adb86b6be25308297b085d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonathan=20K=C3=B6ssler?= Date: Wed, 23 Oct 2024 16:06:06 +0200 Subject: [PATCH] feat: switch out tenacity retry with kn_utils --- .pre-commit-config.yaml | 1 + pyinfra/examples.py | 6 ------ pyinfra/queue/async_manager.py | 33 +++++++++++---------------------- pyinfra/queue/manager.py | 13 +++++-------- pyinfra/webserver/utils.py | 12 +++--------- 5 files changed, 20 insertions(+), 45 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6cc462c..b0518a1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -26,6 +26,7 @@ repos: rev: v3.0.0a5 hooks: - id: pylint + language: system args: - --disable=C0111,R0903 - --max-line-length=120 diff --git a/pyinfra/examples.py b/pyinfra/examples.py index ddd45ea..6c8bf9d 100644 --- a/pyinfra/examples.py +++ b/pyinfra/examples.py @@ -5,12 +5,6 @@ from aiormq.exceptions import AMQPConnectionError from dynaconf import Dynaconf from fastapi import FastAPI from kn_utils.logging import logger -from tenacity import ( - retry, - retry_if_exception_type, - stop_after_attempt, - wait_exponential, -) from pyinfra.config.loader import get_pyinfra_validators, validate_settings from pyinfra.queue.async_manager import AsyncQueueManager, RabbitMQConfig diff --git a/pyinfra/queue/async_manager.py b/pyinfra/queue/async_manager.py index 4d31163..e0c82c5 100644 --- a/pyinfra/queue/async_manager.py +++ b/pyinfra/queue/async_manager.py @@ -20,13 +20,7 @@ from aio_pika.exceptions import ( ) from aiormq.exceptions import AMQPConnectionError from kn_utils.logging import logger -from tenacity import ( - retry, - retry_if_exception_type, - stop_after_attempt, - wait_exponential, - wait_exponential_jitter, -) +from kn_utils.retry import retry @dataclass @@ -82,9 +76,8 @@ class AsyncQueueManager: self.message_count: int = 0 @retry( - stop=stop_after_attempt(5), - wait=wait_exponential(multiplier=1, min=4, max=60), - retry=retry_if_exception_type(AMQPConnectionError), + tries=5, + exceptions=AMQPConnectionError, reraise=True, ) async def connect(self) -> None: @@ -121,9 +114,8 @@ class AsyncQueueManager: return True @retry( - stop=stop_after_attempt(5), - wait=wait_exponential(multiplier=1, min=4, max=60), - retry=retry_if_exception_type((AMQPConnectionError, ChannelInvalidStateError)), + tries=5, + exceptions=(AMQPConnectionError, ChannelInvalidStateError), reraise=True, ) async def setup_exchanges(self) -> None: @@ -138,9 +130,8 @@ class AsyncQueueManager: ) @retry( - stop=stop_after_attempt(5), - wait=wait_exponential(multiplier=1, min=4, max=60), - retry=retry_if_exception_type((AMQPConnectionError, ChannelInvalidStateError)), + tries=5, + exceptions=(AMQPConnectionError, ChannelInvalidStateError), reraise=True, ) async def setup_tenant_queue(self) -> None: @@ -263,9 +254,8 @@ class AsyncQueueManager: logger.info(f"Published result to queue {tenant_id}.") @retry( - stop=stop_after_attempt(5), - wait=wait_exponential_jitter(initial=1, max=10), - retry=retry_if_exception_type((aiohttp.ClientResponseError, aiohttp.ClientConnectorError)), + tries=5, + exceptions=(aiohttp.ClientResponseError, aiohttp.ClientConnectorError), reraise=True, ) async def fetch_active_tenants(self) -> Set[str]: @@ -282,9 +272,8 @@ class AsyncQueueManager: return set() @retry( - stop=stop_after_attempt(5), - wait=wait_exponential(multiplier=1, min=4, max=60), - retry=retry_if_exception_type((AMQPConnectionError, ChannelInvalidStateError)), + tries=5, + exceptions=(AMQPConnectionError, ChannelInvalidStateError), reraise=True, ) async def initialize_tenant_queues(self) -> None: diff --git a/pyinfra/queue/manager.py b/pyinfra/queue/manager.py index 45bf0b8..57b5dcb 100644 --- a/pyinfra/queue/manager.py +++ b/pyinfra/queue/manager.py @@ -10,9 +10,8 @@ import pika import pika.exceptions from dynaconf import Dynaconf from kn_utils.logging import logger +from kn_utils.retry import retry from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection -from retry import retry -from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type from pyinfra.config.loader import validate_settings from pyinfra.config.validators import queue_manager_validators @@ -59,9 +58,8 @@ class QueueManager: return pika.ConnectionParameters(**pika_connection_params) @retry( - stop=stop_after_attempt(5), - wait=wait_exponential(multiplier=1, min=4, max=60), - retry=retry_if_exception_type((pika.exceptions.AMQPConnectionError, pika.exceptions.ChannelClosedByBroker)), + tries=5, + exceptions=(pika.exceptions.AMQPConnectionError, pika.exceptions.ChannelClosedByBroker), reraise=True, ) def establish_connection(self): @@ -95,9 +93,8 @@ class QueueManager: return False @retry( - stop=stop_after_attempt(5), - wait=wait_exponential(multiplier=1, min=4, max=60), - retry=retry_if_exception_type(pika.exceptions.AMQPConnectionError), + tries=5, + exceptions=pika.exceptions.AMQPConnectionError, reraise=True, ) def start_consuming(self, message_processor: Callable): diff --git a/pyinfra/webserver/utils.py b/pyinfra/webserver/utils.py index 478ffe7..c0607b1 100644 --- a/pyinfra/webserver/utils.py +++ b/pyinfra/webserver/utils.py @@ -10,21 +10,15 @@ import uvicorn from dynaconf import Dynaconf from fastapi import FastAPI from kn_utils.logging import logger -from tenacity import ( - retry, - retry_if_exception_type, - stop_after_attempt, - wait_exponential, -) +from kn_utils.retry import retry from pyinfra.config.loader import validate_settings from pyinfra.config.validators import webserver_validators @retry( - stop=stop_after_attempt(5), - wait=wait_exponential(multiplier=1, min=2, max=30), - retry=retry_if_exception_type((Exception,)), # You might want to be more specific here + tries=5, + exceptions=Exception, reraise=True, ) def create_webserver_thread_from_settings(app: FastAPI, settings: Dynaconf) -> threading.Thread: