diff --git a/pyinfra/examples.py b/pyinfra/examples.py index 6c8bf9d..568b2eb 100644 --- a/pyinfra/examples.py +++ b/pyinfra/examples.py @@ -1,6 +1,7 @@ import asyncio import sys +import aiohttp from aiormq.exceptions import AMQPConnectionError from dynaconf import Dynaconf from fastapi import FastAPI @@ -34,6 +35,8 @@ async def run_async_queues(manager: AsyncQueueManager, app, port, host): logger.info("Main task was cancelled, initiating shutdown.") except AMQPConnectionError as e: logger.warning(f"AMQPConnectionError: {e} - shutting down.") + except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError): + logger.info("Tenant server did not answer - shutting down.") except Exception as e: logger.error(f"An error occurred while running async queues: {e}", exc_info=True) sys.exit(1) diff --git a/pyinfra/queue/async_manager.py b/pyinfra/queue/async_manager.py index b15b2df..2ea2dfb 100644 --- a/pyinfra/queue/async_manager.py +++ b/pyinfra/queue/async_manager.py @@ -276,8 +276,6 @@ class AsyncQueueManager: exceptions=( AMQPConnectionError, ChannelInvalidStateError, - aiohttp.ClientResponseError, - aiohttp.ClientConnectorError, ), reraise=True, ) @@ -285,8 +283,9 @@ class AsyncQueueManager: try: active_tenants = await self.fetch_active_tenants() except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError): - logger.error("API calls to tenant server failed. No tenant queues initialized.", exc_info=True) - raise asyncio.CancelledError + logger.warning("API calls to tenant server failed. No tenant queues initialized.") + # reraise error to shutdown service + raise for tenant_id in active_tenants: await self.create_tenant_queues(tenant_id) @@ -305,7 +304,8 @@ class AsyncQueueManager: logger.debug("Cancelling queues...") for tenant, queue in self.tenant_queues.items(): await queue.cancel(self.consumer_tags[tenant]) - await self.tenant_exchange_queue.cancel(self.consumer_tags["tenant_exchange_queue"]) + if self.tenant_exchange_queue: + await self.tenant_exchange_queue.cancel(self.consumer_tags["tenant_exchange_queue"]) while self.message_count != 0: logger.debug(f"Messages are still being processed: {self.message_count=} ") await asyncio.sleep(2)