feat: exit on ClientResponseError
This commit is contained in:
parent
ed2bd1ec86
commit
bf6f95f3e0
@ -1,6 +1,7 @@
|
||||
import asyncio
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from aiormq.exceptions import AMQPConnectionError
|
||||
from dynaconf import Dynaconf
|
||||
from fastapi import FastAPI
|
||||
@ -34,6 +35,8 @@ async def run_async_queues(manager: AsyncQueueManager, app, port, host):
|
||||
logger.info("Main task was cancelled, initiating shutdown.")
|
||||
except AMQPConnectionError as e:
|
||||
logger.warning(f"AMQPConnectionError: {e} - shutting down.")
|
||||
except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError):
|
||||
logger.info("Tenant server did not answer - shutting down.")
|
||||
except Exception as e:
|
||||
logger.error(f"An error occurred while running async queues: {e}", exc_info=True)
|
||||
sys.exit(1)
|
||||
|
||||
@ -276,8 +276,6 @@ class AsyncQueueManager:
|
||||
exceptions=(
|
||||
AMQPConnectionError,
|
||||
ChannelInvalidStateError,
|
||||
aiohttp.ClientResponseError,
|
||||
aiohttp.ClientConnectorError,
|
||||
),
|
||||
reraise=True,
|
||||
)
|
||||
@ -285,8 +283,9 @@ class AsyncQueueManager:
|
||||
try:
|
||||
active_tenants = await self.fetch_active_tenants()
|
||||
except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError):
|
||||
logger.error("API calls to tenant server failed. No tenant queues initialized.", exc_info=True)
|
||||
raise asyncio.CancelledError
|
||||
logger.warning("API calls to tenant server failed. No tenant queues initialized.")
|
||||
# reraise error to shutdown service
|
||||
raise
|
||||
for tenant_id in active_tenants:
|
||||
await self.create_tenant_queues(tenant_id)
|
||||
|
||||
@ -305,7 +304,8 @@ class AsyncQueueManager:
|
||||
logger.debug("Cancelling queues...")
|
||||
for tenant, queue in self.tenant_queues.items():
|
||||
await queue.cancel(self.consumer_tags[tenant])
|
||||
await self.tenant_exchange_queue.cancel(self.consumer_tags["tenant_exchange_queue"])
|
||||
if self.tenant_exchange_queue:
|
||||
await self.tenant_exchange_queue.cancel(self.consumer_tags["tenant_exchange_queue"])
|
||||
while self.message_count != 0:
|
||||
logger.debug(f"Messages are still being processed: {self.message_count=} ")
|
||||
await asyncio.sleep(2)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user