From 1540c2894e243d02bd732b9ee9d16eda1ea86dc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonathan=20K=C3=B6ssler?= Date: Wed, 13 Nov 2024 16:30:18 +0100 Subject: [PATCH] feat: ensure shutdown of queue manager --- pyinfra/examples.py | 15 +++++++++------ pyinfra/queue/async_manager.py | 4 ++-- pyinfra/webserver/utils.py | 2 +- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/pyinfra/examples.py b/pyinfra/examples.py index 036cbe5..407f389 100644 --- a/pyinfra/examples.py +++ b/pyinfra/examples.py @@ -26,16 +26,20 @@ from pyinfra.webserver.utils import ( shutdown_flag = False -async def graceful_shutdown(manager, queue_task, webserver_task): +async def graceful_shutdown(manager: AsyncQueueManager, queue_task, webserver_task): global shutdown_flag shutdown_flag = True logger.info("SIGTERM received, shutting down gracefully...") + if queue_task and not queue_task.done(): queue_task.cancel() - await manager.shutdown() + if webserver_task and not webserver_task.done(): webserver_task.cancel() - await asyncio.gather(queue_task, webserver_task, return_exceptions=True) + + # explicitly shutdown manager and webserver + await asyncio.gather(queue_task, manager.shutdown(), webserver_task, return_exceptions=True) + logger.info("Shutdown complete.") @@ -72,19 +76,18 @@ async def run_async_queues(manager: AsyncQueueManager, app, port, host): sys.exit(1) finally: if shutdown_flag: - logger.info("Graceful shutdown already in progress.") + logger.debug("Graceful shutdown already in progress.") else: logger.warning("Initiating shutdown due to error or manual interruption.") if not tenant_api_available: sys.exit(0) if queue_task and not queue_task.done(): queue_task.cancel() - await manager.shutdown() if webserver_task and not webserver_task.done(): webserver_task.cancel() - await asyncio.gather(queue_task, webserver_task, return_exceptions=True) + await asyncio.gather(queue_task, manager.shutdown(), webserver_task, return_exceptions=True) logger.info("Shutdown complete.") diff --git a/pyinfra/queue/async_manager.py b/pyinfra/queue/async_manager.py index 2d41cfc..6789324 100644 --- a/pyinfra/queue/async_manager.py +++ b/pyinfra/queue/async_manager.py @@ -290,7 +290,7 @@ class AsyncQueueManager: while self.message_count != 0: logger.debug(f"Messages are still being processed: {self.message_count=} ") await asyncio.sleep(2) - await self.channel.close() + await self.channel.close(exc=asyncio.CancelledError) logger.debug("Channel closed.") else: logger.debug("No channel to close.") @@ -304,7 +304,7 @@ class AsyncQueueManager: async def close_connection(self) -> None: try: if self.connection and not self.connection.is_closed: - await self.connection.close() + await self.connection.close(exc=asyncio.CancelledError) logger.debug("Connection closed.") else: logger.debug("No connection to close.") diff --git a/pyinfra/webserver/utils.py b/pyinfra/webserver/utils.py index 0db6048..4def5e5 100644 --- a/pyinfra/webserver/utils.py +++ b/pyinfra/webserver/utils.py @@ -65,7 +65,7 @@ async def run_async_webserver(app: FastAPI, port: int, host: str): try: await server.serve() except asyncio.CancelledError: - logger.info("Webserver was cancelled.") + logger.debug("Webserver was cancelled.") server.should_exit = True await server.shutdown() except Exception as e: