diff --git a/pyinfra/examples.py b/pyinfra/examples.py index 568b2eb..4e8acd2 100644 --- a/pyinfra/examples.py +++ b/pyinfra/examples.py @@ -36,7 +36,7 @@ async def run_async_queues(manager: AsyncQueueManager, app, port, host): except AMQPConnectionError as e: logger.warning(f"AMQPConnectionError: {e} - shutting down.") except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError): - logger.info("Tenant server did not answer - shutting down.") + logger.warning("Tenant server did not answer - shutting down.") except Exception as e: logger.error(f"An error occurred while running async queues: {e}", exc_info=True) sys.exit(1) diff --git a/pyinfra/queue/async_manager.py b/pyinfra/queue/async_manager.py index 2ea2dfb..b5f9a4f 100644 --- a/pyinfra/queue/async_manager.py +++ b/pyinfra/queue/async_manager.py @@ -279,20 +279,21 @@ class AsyncQueueManager: ), reraise=True, ) - async def initialize_tenant_queues(self) -> None: + async def initialize_tenant_queues(self, active_tenants: set) -> None: + for tenant_id in active_tenants: + await self.create_tenant_queues(tenant_id) + + async def run(self) -> None: try: active_tenants = await self.fetch_active_tenants() except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError): logger.warning("API calls to tenant server failed. No tenant queues initialized.") # reraise error to shutdown service raise - for tenant_id in active_tenants: - await self.create_tenant_queues(tenant_id) - async def run(self) -> None: await self.connect() await self.setup_exchanges() - await self.initialize_tenant_queues() + await self.initialize_tenant_queues(active_tenants=active_tenants) await self.setup_tenant_queue() logger.info("RabbitMQ handler is running. Press CTRL+C to exit.") diff --git a/pyproject.toml b/pyproject.toml index 60ac4a1..73d8675 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pyinfra" -version = "3.3.1" +version = "3.3.2" description = "" authors = ["Team Research "] license = "All rights reseverd"