feat: ensure shutdown of queue manager
This commit is contained in:
parent
3d3c76b466
commit
1540c2894e
@ -26,16 +26,20 @@ from pyinfra.webserver.utils import (
|
||||
shutdown_flag = False
|
||||
|
||||
|
||||
async def graceful_shutdown(manager, queue_task, webserver_task):
|
||||
async def graceful_shutdown(manager: AsyncQueueManager, queue_task, webserver_task):
|
||||
global shutdown_flag
|
||||
shutdown_flag = True
|
||||
logger.info("SIGTERM received, shutting down gracefully...")
|
||||
|
||||
if queue_task and not queue_task.done():
|
||||
queue_task.cancel()
|
||||
await manager.shutdown()
|
||||
|
||||
if webserver_task and not webserver_task.done():
|
||||
webserver_task.cancel()
|
||||
await asyncio.gather(queue_task, webserver_task, return_exceptions=True)
|
||||
|
||||
# explicitly shutdown manager and webserver
|
||||
await asyncio.gather(queue_task, manager.shutdown(), webserver_task, return_exceptions=True)
|
||||
|
||||
logger.info("Shutdown complete.")
|
||||
|
||||
|
||||
@ -72,19 +76,18 @@ async def run_async_queues(manager: AsyncQueueManager, app, port, host):
|
||||
sys.exit(1)
|
||||
finally:
|
||||
if shutdown_flag:
|
||||
logger.info("Graceful shutdown already in progress.")
|
||||
logger.debug("Graceful shutdown already in progress.")
|
||||
else:
|
||||
logger.warning("Initiating shutdown due to error or manual interruption.")
|
||||
if not tenant_api_available:
|
||||
sys.exit(0)
|
||||
if queue_task and not queue_task.done():
|
||||
queue_task.cancel()
|
||||
await manager.shutdown()
|
||||
|
||||
if webserver_task and not webserver_task.done():
|
||||
webserver_task.cancel()
|
||||
|
||||
await asyncio.gather(queue_task, webserver_task, return_exceptions=True)
|
||||
await asyncio.gather(queue_task, manager.shutdown(), webserver_task, return_exceptions=True)
|
||||
logger.info("Shutdown complete.")
|
||||
|
||||
|
||||
|
||||
@ -290,7 +290,7 @@ class AsyncQueueManager:
|
||||
while self.message_count != 0:
|
||||
logger.debug(f"Messages are still being processed: {self.message_count=} ")
|
||||
await asyncio.sleep(2)
|
||||
await self.channel.close()
|
||||
await self.channel.close(exc=asyncio.CancelledError)
|
||||
logger.debug("Channel closed.")
|
||||
else:
|
||||
logger.debug("No channel to close.")
|
||||
@ -304,7 +304,7 @@ class AsyncQueueManager:
|
||||
async def close_connection(self) -> None:
|
||||
try:
|
||||
if self.connection and not self.connection.is_closed:
|
||||
await self.connection.close()
|
||||
await self.connection.close(exc=asyncio.CancelledError)
|
||||
logger.debug("Connection closed.")
|
||||
else:
|
||||
logger.debug("No connection to close.")
|
||||
|
||||
@ -65,7 +65,7 @@ async def run_async_webserver(app: FastAPI, port: int, host: str):
|
||||
try:
|
||||
await server.serve()
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Webserver was cancelled.")
|
||||
logger.debug("Webserver was cancelled.")
|
||||
server.should_exit = True
|
||||
await server.shutdown()
|
||||
except Exception as e:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user