feat: switch out tenacity retry with kn_utils
This commit is contained in:
parent
131afd7d3e
commit
bd2f0b9b9a
@ -26,6 +26,7 @@ repos:
|
||||
rev: v3.0.0a5
|
||||
hooks:
|
||||
- id: pylint
|
||||
language: system
|
||||
args:
|
||||
- --disable=C0111,R0903
|
||||
- --max-line-length=120
|
||||
|
||||
@ -5,12 +5,6 @@ from aiormq.exceptions import AMQPConnectionError
|
||||
from dynaconf import Dynaconf
|
||||
from fastapi import FastAPI
|
||||
from kn_utils.logging import logger
|
||||
from tenacity import (
|
||||
retry,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
)
|
||||
|
||||
from pyinfra.config.loader import get_pyinfra_validators, validate_settings
|
||||
from pyinfra.queue.async_manager import AsyncQueueManager, RabbitMQConfig
|
||||
|
||||
@ -20,13 +20,7 @@ from aio_pika.exceptions import (
|
||||
)
|
||||
from aiormq.exceptions import AMQPConnectionError
|
||||
from kn_utils.logging import logger
|
||||
from tenacity import (
|
||||
retry,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
wait_exponential_jitter,
|
||||
)
|
||||
from kn_utils.retry import retry
|
||||
|
||||
|
||||
@dataclass
|
||||
@ -82,9 +76,8 @@ class AsyncQueueManager:
|
||||
self.message_count: int = 0
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_exponential(multiplier=1, min=4, max=60),
|
||||
retry=retry_if_exception_type(AMQPConnectionError),
|
||||
tries=5,
|
||||
exceptions=AMQPConnectionError,
|
||||
reraise=True,
|
||||
)
|
||||
async def connect(self) -> None:
|
||||
@ -121,9 +114,8 @@ class AsyncQueueManager:
|
||||
return True
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_exponential(multiplier=1, min=4, max=60),
|
||||
retry=retry_if_exception_type((AMQPConnectionError, ChannelInvalidStateError)),
|
||||
tries=5,
|
||||
exceptions=(AMQPConnectionError, ChannelInvalidStateError),
|
||||
reraise=True,
|
||||
)
|
||||
async def setup_exchanges(self) -> None:
|
||||
@ -138,9 +130,8 @@ class AsyncQueueManager:
|
||||
)
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_exponential(multiplier=1, min=4, max=60),
|
||||
retry=retry_if_exception_type((AMQPConnectionError, ChannelInvalidStateError)),
|
||||
tries=5,
|
||||
exceptions=(AMQPConnectionError, ChannelInvalidStateError),
|
||||
reraise=True,
|
||||
)
|
||||
async def setup_tenant_queue(self) -> None:
|
||||
@ -263,9 +254,8 @@ class AsyncQueueManager:
|
||||
logger.info(f"Published result to queue {tenant_id}.")
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_exponential_jitter(initial=1, max=10),
|
||||
retry=retry_if_exception_type((aiohttp.ClientResponseError, aiohttp.ClientConnectorError)),
|
||||
tries=5,
|
||||
exceptions=(aiohttp.ClientResponseError, aiohttp.ClientConnectorError),
|
||||
reraise=True,
|
||||
)
|
||||
async def fetch_active_tenants(self) -> Set[str]:
|
||||
@ -282,9 +272,8 @@ class AsyncQueueManager:
|
||||
return set()
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_exponential(multiplier=1, min=4, max=60),
|
||||
retry=retry_if_exception_type((AMQPConnectionError, ChannelInvalidStateError)),
|
||||
tries=5,
|
||||
exceptions=(AMQPConnectionError, ChannelInvalidStateError),
|
||||
reraise=True,
|
||||
)
|
||||
async def initialize_tenant_queues(self) -> None:
|
||||
|
||||
@ -10,9 +10,8 @@ import pika
|
||||
import pika.exceptions
|
||||
from dynaconf import Dynaconf
|
||||
from kn_utils.logging import logger
|
||||
from kn_utils.retry import retry
|
||||
from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection
|
||||
from retry import retry
|
||||
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
|
||||
|
||||
from pyinfra.config.loader import validate_settings
|
||||
from pyinfra.config.validators import queue_manager_validators
|
||||
@ -59,9 +58,8 @@ class QueueManager:
|
||||
return pika.ConnectionParameters(**pika_connection_params)
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_exponential(multiplier=1, min=4, max=60),
|
||||
retry=retry_if_exception_type((pika.exceptions.AMQPConnectionError, pika.exceptions.ChannelClosedByBroker)),
|
||||
tries=5,
|
||||
exceptions=(pika.exceptions.AMQPConnectionError, pika.exceptions.ChannelClosedByBroker),
|
||||
reraise=True,
|
||||
)
|
||||
def establish_connection(self):
|
||||
@ -95,9 +93,8 @@ class QueueManager:
|
||||
return False
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_exponential(multiplier=1, min=4, max=60),
|
||||
retry=retry_if_exception_type(pika.exceptions.AMQPConnectionError),
|
||||
tries=5,
|
||||
exceptions=pika.exceptions.AMQPConnectionError,
|
||||
reraise=True,
|
||||
)
|
||||
def start_consuming(self, message_processor: Callable):
|
||||
|
||||
@ -10,21 +10,15 @@ import uvicorn
|
||||
from dynaconf import Dynaconf
|
||||
from fastapi import FastAPI
|
||||
from kn_utils.logging import logger
|
||||
from tenacity import (
|
||||
retry,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
)
|
||||
from kn_utils.retry import retry
|
||||
|
||||
from pyinfra.config.loader import validate_settings
|
||||
from pyinfra.config.validators import webserver_validators
|
||||
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_exponential(multiplier=1, min=2, max=30),
|
||||
retry=retry_if_exception_type((Exception,)), # You might want to be more specific here
|
||||
tries=5,
|
||||
exceptions=Exception,
|
||||
reraise=True,
|
||||
)
|
||||
def create_webserver_thread_from_settings(app: FastAPI, settings: Dynaconf) -> threading.Thread:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user