refactor: fetch active tenants before start

This commit is contained in:
Jonathan Kössler 2024-11-12 17:11:33 +01:00
parent 86af05c12c
commit 04c90533b6
2 changed files with 10 additions and 9 deletions

View File

@ -27,8 +27,11 @@ async def run_async_queues(manager: AsyncQueueManager, app, port, host):
"""Run the async webserver and the async queue manager concurrently."""
queue_task = None
webserver_task = None
tenant_api_available = True
try:
queue_task = asyncio.create_task(manager.run(), name="queues")
active_tenants = await manager.fetch_active_tenants()
queue_task = asyncio.create_task(manager.run(active_tenants=active_tenants), name="queues")
webserver_task = asyncio.create_task(run_async_webserver(app, port, host), name="webserver")
await asyncio.gather(queue_task, webserver_task)
except asyncio.CancelledError:
@ -37,11 +40,14 @@ async def run_async_queues(manager: AsyncQueueManager, app, port, host):
logger.warning(f"AMQPConnectionError: {e} - shutting down.")
except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError):
logger.warning("Tenant server did not answer - shutting down.")
tenant_api_available = False
except Exception as e:
logger.error(f"An error occurred while running async queues: {e}", exc_info=True)
sys.exit(1)
finally:
logger.info("Signal received, shutting down...")
if not tenant_api_available:
sys.exit(0)
if queue_task and not queue_task.done():
queue_task.cancel()
if webserver_task and not webserver_task.done():

View File

@ -93,7 +93,8 @@ class AsyncQueueManager:
if isinstance(exc, ConnectionClosed):
logger.warning("Connection to RabbitMQ lost. Attempting to reconnect...")
try:
await self.run()
active_tenants = await self.fetch_active_tenants()
await self.run(active_tenants=active_tenants)
logger.debug("Reconnected to RabbitMQ successfully")
except Exception as e:
logger.warning(f"Failed to reconnect to RabbitMQ: {e}")
@ -271,13 +272,7 @@ class AsyncQueueManager:
for tenant_id in active_tenants:
await self.create_tenant_queues(tenant_id)
async def run(self) -> None:
try:
active_tenants = await self.fetch_active_tenants()
except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError):
logger.warning("API calls to tenant server failed. No tenant queues initialized.")
# reraise error to shutdown service
raise
async def run(self, active_tenants: set) -> None:
await self.connect()
await self.setup_exchanges()