diff --git a/pyinfra/examples.py b/pyinfra/examples.py index 4e8acd2..adbd879 100644 --- a/pyinfra/examples.py +++ b/pyinfra/examples.py @@ -27,8 +27,11 @@ async def run_async_queues(manager: AsyncQueueManager, app, port, host): """Run the async webserver and the async queue manager concurrently.""" queue_task = None webserver_task = None + tenant_api_available = True try: - queue_task = asyncio.create_task(manager.run(), name="queues") + active_tenants = await manager.fetch_active_tenants() + + queue_task = asyncio.create_task(manager.run(active_tenants=active_tenants), name="queues") webserver_task = asyncio.create_task(run_async_webserver(app, port, host), name="webserver") await asyncio.gather(queue_task, webserver_task) except asyncio.CancelledError: @@ -37,11 +40,14 @@ async def run_async_queues(manager: AsyncQueueManager, app, port, host): logger.warning(f"AMQPConnectionError: {e} - shutting down.") except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError): logger.warning("Tenant server did not answer - shutting down.") + tenant_api_available = False except Exception as e: logger.error(f"An error occurred while running async queues: {e}", exc_info=True) sys.exit(1) finally: logger.info("Signal received, shutting down...") + if not tenant_api_available: + sys.exit(0) if queue_task and not queue_task.done(): queue_task.cancel() if webserver_task and not webserver_task.done(): diff --git a/pyinfra/queue/async_manager.py b/pyinfra/queue/async_manager.py index 872d494..a69bd0a 100644 --- a/pyinfra/queue/async_manager.py +++ b/pyinfra/queue/async_manager.py @@ -93,7 +93,8 @@ class AsyncQueueManager: if isinstance(exc, ConnectionClosed): logger.warning("Connection to RabbitMQ lost. Attempting to reconnect...") try: - await self.run() + active_tenants = await self.fetch_active_tenants() + await self.run(active_tenants=active_tenants) logger.debug("Reconnected to RabbitMQ successfully") except Exception as e: logger.warning(f"Failed to reconnect to RabbitMQ: {e}") @@ -271,13 +272,7 @@ class AsyncQueueManager: for tenant_id in active_tenants: await self.create_tenant_queues(tenant_id) - async def run(self) -> None: - try: - active_tenants = await self.fetch_active_tenants() - except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError): - logger.warning("API calls to tenant server failed. No tenant queues initialized.") - # reraise error to shutdown service - raise + async def run(self, active_tenants: set) -> None: await self.connect() await self.setup_exchanges()