Merge branch 'feature/RED-10441' into 'master'
RED-10441: fix abandoned queues See merge request knecon/research/pyinfra!102
This commit is contained in:
commit
8891249d7a
@ -1,6 +1,7 @@
|
||||
import asyncio
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from aiormq.exceptions import AMQPConnectionError
|
||||
from dynaconf import Dynaconf
|
||||
from fastapi import FastAPI
|
||||
@ -26,19 +27,27 @@ async def run_async_queues(manager: AsyncQueueManager, app, port, host):
|
||||
"""Run the async webserver and the async queue manager concurrently."""
|
||||
queue_task = None
|
||||
webserver_task = None
|
||||
tenant_api_available = True
|
||||
try:
|
||||
queue_task = asyncio.create_task(manager.run(), name="queues")
|
||||
active_tenants = await manager.fetch_active_tenants()
|
||||
|
||||
queue_task = asyncio.create_task(manager.run(active_tenants=active_tenants), name="queues")
|
||||
webserver_task = asyncio.create_task(run_async_webserver(app, port, host), name="webserver")
|
||||
await asyncio.gather(queue_task, webserver_task)
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Main task was cancelled, initiating shutdown.")
|
||||
except AMQPConnectionError as e:
|
||||
logger.warning(f"AMQPConnectionError: {e} - shutting down.")
|
||||
except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError):
|
||||
logger.warning("Tenant server did not answer - shutting down.")
|
||||
tenant_api_available = False
|
||||
except Exception as e:
|
||||
logger.error(f"An error occurred while running async queues: {e}", exc_info=True)
|
||||
sys.exit(1)
|
||||
finally:
|
||||
logger.info("Signal received, shutting down...")
|
||||
if not tenant_api_available:
|
||||
sys.exit(0)
|
||||
if queue_task and not queue_task.done():
|
||||
queue_task.cancel()
|
||||
if webserver_task and not webserver_task.done():
|
||||
|
||||
@ -75,11 +75,7 @@ class AsyncQueueManager:
|
||||
|
||||
self.message_count: int = 0
|
||||
|
||||
@retry(
|
||||
tries=5,
|
||||
exceptions=AMQPConnectionError,
|
||||
reraise=True,
|
||||
)
|
||||
@retry(tries=5, exceptions=AMQPConnectionError, reraise=True, logger=logger)
|
||||
async def connect(self) -> None:
|
||||
logger.info("Attempting to connect to RabbitMQ...")
|
||||
self.connection = await connect(**self.config.connection_params)
|
||||
@ -94,7 +90,8 @@ class AsyncQueueManager:
|
||||
if isinstance(exc, ConnectionClosed):
|
||||
logger.warning("Connection to RabbitMQ lost. Attempting to reconnect...")
|
||||
try:
|
||||
await self.run()
|
||||
active_tenants = await self.fetch_active_tenants()
|
||||
await self.run(active_tenants=active_tenants)
|
||||
logger.debug("Reconnected to RabbitMQ successfully")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to reconnect to RabbitMQ: {e}")
|
||||
@ -113,11 +110,7 @@ class AsyncQueueManager:
|
||||
return False
|
||||
return True
|
||||
|
||||
@retry(
|
||||
tries=5,
|
||||
exceptions=(AMQPConnectionError, ChannelInvalidStateError),
|
||||
reraise=True,
|
||||
)
|
||||
@retry(tries=5, exceptions=(AMQPConnectionError, ChannelInvalidStateError), reraise=True, logger=logger)
|
||||
async def setup_exchanges(self) -> None:
|
||||
self.tenant_exchange = await self.channel.declare_exchange(
|
||||
self.config.tenant_exchange_name, ExchangeType.TOPIC, durable=True
|
||||
@ -129,11 +122,7 @@ class AsyncQueueManager:
|
||||
self.config.service_response_exchange_name, ExchangeType.DIRECT, durable=True
|
||||
)
|
||||
|
||||
@retry(
|
||||
tries=5,
|
||||
exceptions=(AMQPConnectionError, ChannelInvalidStateError),
|
||||
reraise=True,
|
||||
)
|
||||
@retry(tries=5, exceptions=(AMQPConnectionError, ChannelInvalidStateError), reraise=True, logger=logger)
|
||||
async def setup_tenant_queue(self) -> None:
|
||||
self.tenant_exchange_queue = await self.channel.declare_queue(
|
||||
f"{self.config.pod_name}_{self.config.tenant_event_queue_suffix}",
|
||||
@ -253,11 +242,7 @@ class AsyncQueueManager:
|
||||
)
|
||||
logger.info(f"Published result to queue {tenant_id}.")
|
||||
|
||||
@retry(
|
||||
tries=5,
|
||||
exceptions=(aiohttp.ClientResponseError, aiohttp.ClientConnectorError),
|
||||
reraise=True,
|
||||
)
|
||||
@retry(tries=5, exceptions=(aiohttp.ClientResponseError, aiohttp.ClientConnectorError), reraise=True, logger=logger)
|
||||
async def fetch_active_tenants(self) -> Set[str]:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(self.tenant_service_url) as response:
|
||||
@ -273,22 +258,22 @@ class AsyncQueueManager:
|
||||
|
||||
@retry(
|
||||
tries=5,
|
||||
exceptions=(AMQPConnectionError, ChannelInvalidStateError),
|
||||
exceptions=(
|
||||
AMQPConnectionError,
|
||||
ChannelInvalidStateError,
|
||||
),
|
||||
reraise=True,
|
||||
logger=logger,
|
||||
)
|
||||
async def initialize_tenant_queues(self) -> None:
|
||||
try:
|
||||
active_tenants = await self.fetch_active_tenants()
|
||||
except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError):
|
||||
logger.warning("API calls to tenant server failed. No tenant queues initialized.", exc_info=True)
|
||||
active_tenants = set()
|
||||
async def initialize_tenant_queues(self, active_tenants: set) -> None:
|
||||
for tenant_id in active_tenants:
|
||||
await self.create_tenant_queues(tenant_id)
|
||||
|
||||
async def run(self) -> None:
|
||||
async def run(self, active_tenants: set) -> None:
|
||||
|
||||
await self.connect()
|
||||
await self.setup_exchanges()
|
||||
await self.initialize_tenant_queues()
|
||||
await self.initialize_tenant_queues(active_tenants=active_tenants)
|
||||
await self.setup_tenant_queue()
|
||||
|
||||
logger.info("RabbitMQ handler is running. Press CTRL+C to exit.")
|
||||
@ -300,6 +285,7 @@ class AsyncQueueManager:
|
||||
logger.debug("Cancelling queues...")
|
||||
for tenant, queue in self.tenant_queues.items():
|
||||
await queue.cancel(self.consumer_tags[tenant])
|
||||
if self.tenant_exchange_queue:
|
||||
await self.tenant_exchange_queue.cancel(self.consumer_tags["tenant_exchange_queue"])
|
||||
while self.message_count != 0:
|
||||
logger.debug(f"Messages are still being processed: {self.message_count=} ")
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "pyinfra"
|
||||
version = "3.3.1"
|
||||
version = "3.3.2"
|
||||
description = ""
|
||||
authors = ["Team Research <research@knecon.com>"]
|
||||
license = "All rights reseverd"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user