refactor: tenant queues init
This commit is contained in:
parent
bf6f95f3e0
commit
c6e336cb35
@ -36,7 +36,7 @@ async def run_async_queues(manager: AsyncQueueManager, app, port, host):
|
||||
except AMQPConnectionError as e:
|
||||
logger.warning(f"AMQPConnectionError: {e} - shutting down.")
|
||||
except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError):
|
||||
logger.info("Tenant server did not answer - shutting down.")
|
||||
logger.warning("Tenant server did not answer - shutting down.")
|
||||
except Exception as e:
|
||||
logger.error(f"An error occurred while running async queues: {e}", exc_info=True)
|
||||
sys.exit(1)
|
||||
|
||||
@ -279,20 +279,21 @@ class AsyncQueueManager:
|
||||
),
|
||||
reraise=True,
|
||||
)
|
||||
async def initialize_tenant_queues(self) -> None:
|
||||
async def initialize_tenant_queues(self, active_tenants: set) -> None:
|
||||
for tenant_id in active_tenants:
|
||||
await self.create_tenant_queues(tenant_id)
|
||||
|
||||
async def run(self) -> None:
|
||||
try:
|
||||
active_tenants = await self.fetch_active_tenants()
|
||||
except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError):
|
||||
logger.warning("API calls to tenant server failed. No tenant queues initialized.")
|
||||
# reraise error to shutdown service
|
||||
raise
|
||||
for tenant_id in active_tenants:
|
||||
await self.create_tenant_queues(tenant_id)
|
||||
|
||||
async def run(self) -> None:
|
||||
await self.connect()
|
||||
await self.setup_exchanges()
|
||||
await self.initialize_tenant_queues()
|
||||
await self.initialize_tenant_queues(active_tenants=active_tenants)
|
||||
await self.setup_tenant_queue()
|
||||
|
||||
logger.info("RabbitMQ handler is running. Press CTRL+C to exit.")
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "pyinfra"
|
||||
version = "3.3.1"
|
||||
version = "3.3.2"
|
||||
description = ""
|
||||
authors = ["Team Research <research@knecon.com>"]
|
||||
license = "All rights reseverd"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user