Merge branch 'feature/RED-10441' into 'master'
RED-10441: Fix graceful shutdown See merge request knecon/research/pyinfra!103
This commit is contained in:
commit
9b60594ce1
@ -1,4 +1,5 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
|
import signal
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
@ -22,12 +23,37 @@ from pyinfra.webserver.utils import (
|
|||||||
run_async_webserver,
|
run_async_webserver,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
shutdown_flag = False
|
||||||
|
|
||||||
|
|
||||||
|
async def graceful_shutdown(manager, queue_task, webserver_task):
|
||||||
|
global shutdown_flag
|
||||||
|
shutdown_flag = True
|
||||||
|
logger.info("SIGTERM received, shutting down gracefully...")
|
||||||
|
if queue_task and not queue_task.done():
|
||||||
|
queue_task.cancel()
|
||||||
|
await manager.shutdown()
|
||||||
|
if webserver_task and not webserver_task.done():
|
||||||
|
webserver_task.cancel()
|
||||||
|
await asyncio.gather(queue_task, webserver_task, return_exceptions=True)
|
||||||
|
logger.info("Shutdown complete.")
|
||||||
|
|
||||||
|
|
||||||
async def run_async_queues(manager: AsyncQueueManager, app, port, host):
|
async def run_async_queues(manager: AsyncQueueManager, app, port, host):
|
||||||
"""Run the async webserver and the async queue manager concurrently."""
|
"""Run the async webserver and the async queue manager concurrently."""
|
||||||
queue_task = None
|
queue_task = None
|
||||||
webserver_task = None
|
webserver_task = None
|
||||||
tenant_api_available = True
|
tenant_api_available = True
|
||||||
|
|
||||||
|
# add signal handler for SIGTERM and SIGINT
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
loop.add_signal_handler(
|
||||||
|
signal.SIGTERM, lambda: asyncio.create_task(graceful_shutdown(manager, queue_task, webserver_task))
|
||||||
|
)
|
||||||
|
loop.add_signal_handler(
|
||||||
|
signal.SIGINT, lambda: asyncio.create_task(graceful_shutdown(manager, queue_task, webserver_task))
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
active_tenants = await manager.fetch_active_tenants()
|
active_tenants = await manager.fetch_active_tenants()
|
||||||
|
|
||||||
@ -45,17 +71,21 @@ async def run_async_queues(manager: AsyncQueueManager, app, port, host):
|
|||||||
logger.error(f"An error occurred while running async queues: {e}", exc_info=True)
|
logger.error(f"An error occurred while running async queues: {e}", exc_info=True)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
finally:
|
finally:
|
||||||
logger.info("Signal received, shutting down...")
|
if shutdown_flag:
|
||||||
if not tenant_api_available:
|
logger.info("Graceful shutdown already in progress.")
|
||||||
sys.exit(0)
|
else:
|
||||||
if queue_task and not queue_task.done():
|
logger.warning("Initiating shutdown due to error or manual interruption.")
|
||||||
queue_task.cancel()
|
if not tenant_api_available:
|
||||||
if webserver_task and not webserver_task.done():
|
sys.exit(0)
|
||||||
webserver_task.cancel()
|
if queue_task and not queue_task.done():
|
||||||
|
queue_task.cancel()
|
||||||
|
await manager.shutdown()
|
||||||
|
|
||||||
await manager.shutdown()
|
if webserver_task and not webserver_task.done():
|
||||||
|
webserver_task.cancel()
|
||||||
|
|
||||||
await asyncio.gather(queue_task, webserver_task, return_exceptions=True)
|
await asyncio.gather(queue_task, webserver_task, return_exceptions=True)
|
||||||
|
logger.info("Shutdown complete.")
|
||||||
|
|
||||||
|
|
||||||
def start_standard_queue_consumer(
|
def start_standard_queue_consumer(
|
||||||
|
|||||||
@ -16,6 +16,13 @@ from pyinfra.config.loader import validate_settings
|
|||||||
from pyinfra.config.validators import webserver_validators
|
from pyinfra.config.validators import webserver_validators
|
||||||
|
|
||||||
|
|
||||||
|
class PyInfraUvicornServer(uvicorn.Server):
|
||||||
|
# this is a workaround to enable custom signal handlers
|
||||||
|
# https://github.com/encode/uvicorn/issues/1579
|
||||||
|
def install_signal_handlers(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
@retry(
|
@retry(
|
||||||
tries=5,
|
tries=5,
|
||||||
exceptions=Exception,
|
exceptions=Exception,
|
||||||
@ -53,7 +60,7 @@ def create_webserver_thread(app: FastAPI, port: int, host: str) -> threading.Thr
|
|||||||
async def run_async_webserver(app: FastAPI, port: int, host: str):
|
async def run_async_webserver(app: FastAPI, port: int, host: str):
|
||||||
"""Run the FastAPI web server async."""
|
"""Run the FastAPI web server async."""
|
||||||
config = uvicorn.Config(app, host=host, port=port, log_level=logging.WARNING)
|
config = uvicorn.Config(app, host=host, port=port, log_level=logging.WARNING)
|
||||||
server = uvicorn.Server(config)
|
server = PyInfraUvicornServer(config)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await server.serve()
|
await server.serve()
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
[tool.poetry]
|
[tool.poetry]
|
||||||
name = "pyinfra"
|
name = "pyinfra"
|
||||||
version = "3.3.2"
|
version = "3.3.3"
|
||||||
description = ""
|
description = ""
|
||||||
authors = ["Team Research <research@knecon.com>"]
|
authors = ["Team Research <research@knecon.com>"]
|
||||||
license = "All rights reseverd"
|
license = "All rights reseverd"
|
||||||
|
|||||||
17
scripts/send_sigterm.py
Normal file
17
scripts/send_sigterm.py
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
import os
|
||||||
|
import signal
|
||||||
|
import time
|
||||||
|
|
||||||
|
# BE CAREFUL WITH THIS SCRIPT - THIS SIMULATES A SIGTERM FROM KUBERNETES
|
||||||
|
target_pid = int(input("Enter the PID of the target script: "))
|
||||||
|
|
||||||
|
print(f"Sending SIGTERM to PID {target_pid}...")
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
os.kill(target_pid, signal.SIGTERM)
|
||||||
|
print("SIGTERM sent.")
|
||||||
|
except ProcessLookupError:
|
||||||
|
print("Process not found.")
|
||||||
|
except PermissionError:
|
||||||
|
print("Permission denied. Are you trying to signal a process you don't own?")
|
||||||
Loading…
x
Reference in New Issue
Block a user