feat: add async retry for tenant server calls
This commit is contained in:
parent
66aaeca928
commit
2a2028085e
17
poetry.lock
generated
17
poetry.lock
generated
@ -3413,6 +3413,21 @@ anyio = ">=3.4.0,<5"
|
|||||||
[package.extras]
|
[package.extras]
|
||||||
full = ["httpx (>=0.22.0)", "itsdangerous", "jinja2", "python-multipart (>=0.0.7)", "pyyaml"]
|
full = ["httpx (>=0.22.0)", "itsdangerous", "jinja2", "python-multipart (>=0.0.7)", "pyyaml"]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tenacity"
|
||||||
|
version = "8.5.0"
|
||||||
|
description = "Retry code until it succeeds"
|
||||||
|
optional = false
|
||||||
|
python-versions = ">=3.8"
|
||||||
|
files = [
|
||||||
|
{file = "tenacity-8.5.0-py3-none-any.whl", hash = "sha256:b594c2a5945830c267ce6b79a166228323ed52718f30302c1359836112346687"},
|
||||||
|
{file = "tenacity-8.5.0.tar.gz", hash = "sha256:8bc6c0c8a09b31e6cad13c47afbed1a567518250a9a171418582ed8d9c20ca78"},
|
||||||
|
]
|
||||||
|
|
||||||
|
[package.extras]
|
||||||
|
doc = ["reno", "sphinx"]
|
||||||
|
test = ["pytest", "tornado (>=4.5)", "typeguard"]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tomli"
|
name = "tomli"
|
||||||
version = "2.0.1"
|
version = "2.0.1"
|
||||||
@ -3802,4 +3817,4 @@ test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools",
|
|||||||
[metadata]
|
[metadata]
|
||||||
lock-version = "2.0"
|
lock-version = "2.0"
|
||||||
python-versions = ">=3.10,<3.11"
|
python-versions = ">=3.10,<3.11"
|
||||||
content-hash = "5d7e7dddc7b3aca84f5263def609a2dee5c7155b940d54cd4a158bb72b2bf496"
|
content-hash = "8a0ce0f234721a8db75619f13e108a94d03a7db14e532c4b8799cf96e8927f45"
|
||||||
|
|||||||
@ -13,7 +13,12 @@ from aio_pika.abc import (
|
|||||||
AbstractIncomingMessage,
|
AbstractIncomingMessage,
|
||||||
)
|
)
|
||||||
from kn_utils.logging import logger
|
from kn_utils.logging import logger
|
||||||
from retry import retry
|
from tenacity import (
|
||||||
|
retry,
|
||||||
|
retry_if_exception_type,
|
||||||
|
stop_after_attempt,
|
||||||
|
wait_exponential_jitter,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@ -190,26 +195,31 @@ class AsyncQueueManager:
|
|||||||
)
|
)
|
||||||
logger.info(f"Published result to queue {tenant_id}.")
|
logger.info(f"Published result to queue {tenant_id}.")
|
||||||
|
|
||||||
@retry(tries=3, delay=5, jitter=(1, 3), logger=logger, exceptions=(aiohttp.ClientError))
|
@retry(
|
||||||
|
stop=stop_after_attempt(5),
|
||||||
|
wait=wait_exponential_jitter(initial=1, max=10),
|
||||||
|
retry=retry_if_exception_type(aiohttp.ClientResponseError),
|
||||||
|
reraise=True,
|
||||||
|
)
|
||||||
async def fetch_active_tenants(self) -> Set[str]:
|
async def fetch_active_tenants(self) -> Set[str]:
|
||||||
try:
|
async with aiohttp.ClientSession() as session:
|
||||||
async with aiohttp.ClientSession() as session:
|
async with session.get(self.tenant_service_url) as response:
|
||||||
async with session.get(self.tenant_service_url) as response:
|
response.raise_for_status()
|
||||||
response.raise_for_status()
|
if response.headers["content-type"].lower() == "application/json":
|
||||||
if response.headers["content-type"].lower() == "application/json":
|
data = await response.json()
|
||||||
data = await response.json()
|
return {tenant["tenantId"] for tenant in data}
|
||||||
return {tenant["tenantId"] for tenant in data}
|
else:
|
||||||
else:
|
logger.error(
|
||||||
logger.error(
|
f"Failed to fetch active tenants. Content type is not JSON: {response.headers['content-type'].lower()}"
|
||||||
f"Failed to fetch active tenants. Content type is not JSON: {response.headers['content-type'].lower()}"
|
)
|
||||||
)
|
return set()
|
||||||
return set()
|
|
||||||
except aiohttp.ClientError as e:
|
|
||||||
logger.error(f"Error fetching active tenants: {e}")
|
|
||||||
return set()
|
|
||||||
|
|
||||||
async def initialize_tenant_queues(self) -> None:
|
async def initialize_tenant_queues(self) -> None:
|
||||||
active_tenants = await self.fetch_active_tenants()
|
try:
|
||||||
|
active_tenants = await self.fetch_active_tenants()
|
||||||
|
except aiohttp.ClientResponseError:
|
||||||
|
logger.warning("API calls to tenant server failed. No tenant queues initialized.")
|
||||||
|
active_tenants = set()
|
||||||
for tenant_id in active_tenants:
|
for tenant_id in active_tenants:
|
||||||
await self.create_tenant_queues(tenant_id)
|
await self.create_tenant_queues(tenant_id)
|
||||||
|
|
||||||
|
|||||||
@ -36,6 +36,7 @@ wcwidth = "<=0.2.12"
|
|||||||
azure-monitor-opentelemetry = "^1.6.0"
|
azure-monitor-opentelemetry = "^1.6.0"
|
||||||
aio-pika = "^9.4.2"
|
aio-pika = "^9.4.2"
|
||||||
aiohttp = "^3.9.5"
|
aiohttp = "^3.9.5"
|
||||||
|
tenacity = "^8.5.0"
|
||||||
|
|
||||||
[tool.poetry.group.dev.dependencies]
|
[tool.poetry.group.dev.dependencies]
|
||||||
pytest = "^7"
|
pytest = "^7"
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user