Merge branch 'bugfix/RES-834-service-disconnects' into 'master'

fix: pod restarts due to health check

See merge request knecon/research/pyinfra!92
This commit is contained in:
Jonathan Kössler 2024-08-26 15:10:51 +02:00
commit f626ef2e6f
4 changed files with 43 additions and 42 deletions

View File

@ -15,17 +15,21 @@ from pyinfra.webserver.prometheus import (
)
from pyinfra.webserver.utils import (
add_health_check_endpoint,
create_webserver_task_from_settings,
create_webserver_thread_from_settings,
run_async_webserver,
)
async def run_async_queues(manager, webserver):
"""Run the webserver and the async queue manager concurrently."""
webserver_task = asyncio.create_task(webserver)
queue_manager_task = asyncio.create_task(manager.run())
await asyncio.gather(webserver_task, queue_manager_task)
async def run_async_queues(manager, app, port, host):
"""Run the async webserver and the async queue manager concurrently."""
try:
await manager.run()
await run_async_webserver(app, port, host)
except asyncio.CancelledError:
logger.info("Main task is cancelled.")
finally:
logger.info("Signal received, shutting down...")
await manager.shutdown()
def start_standard_queue_consumer(
@ -82,8 +86,8 @@ def start_standard_queue_consumer(
app = add_health_check_endpoint(app, manager.is_ready)
if isinstance(manager, AsyncQueueManager):
webserver = create_webserver_task_from_settings(app, settings)
asyncio.run(run_async_queues(manager, webserver))
asyncio.run(run_async_queues(manager, app, port=settings.webserver.port, host=settings.webserver.host))
elif isinstance(manager, QueueManager):
webserver = create_webserver_thread_from_settings(app, settings)
webserver.start()

View File

@ -1,6 +1,8 @@
import asyncio
import concurrent.futures
import json
import signal
import sys
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Set
@ -79,7 +81,7 @@ class AsyncQueueManager:
async def is_ready(self) -> bool:
await self.connect()
return self.channel.is_initialized
return self.connection is not None and not self.connection.is_closed
async def setup_exchanges(self) -> None:
self.tenant_exchange = await self.channel.declare_exchange(
@ -100,7 +102,6 @@ class AsyncQueueManager:
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": self.config.service_dead_letter_queue_name,
"x-expires": self.config.queue_expiration_time,
# "x-max-priority": 2,
},
)
await self.tenant_exchange_queue.bind(self.tenant_exchange, routing_key="tenant.*")
@ -130,12 +131,6 @@ class AsyncQueueManager:
input_queue = await self.channel.declare_queue(
queue_name,
durable=True,
# arguments={
# "x-dead-letter-exchange": "",
# "x-dead-letter-routing-key": self.config.service_dead_letter_queue_name,
# "x-expires": self.config.queue_expiration_time,
# "x-max-priority": 2,
# },
)
await input_queue.bind(self.input_exchange, routing_key=tenant_id)
self.consumer_tags[tenant_id] = await input_queue.consume(self.process_input_message)
@ -154,7 +149,11 @@ class AsyncQueueManager:
async def process_input_message(self, message: IncomingMessage) -> None:
async def process_message_body_and_await_result(unpacked_message_body):
return self.message_processor(unpacked_message_body)
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_pool_executor:
logger.info("Processing payload in a separate thread.")
result = await loop.run_in_executor(thread_pool_executor, self.message_processor, unpacked_message_body)
return result
async with message.process(ignore_processed=True):
if message.redelivered:
@ -241,16 +240,6 @@ class AsyncQueueManager:
await self.create_tenant_queues(tenant_id)
async def run(self) -> None:
stop = asyncio.Event()
def signal_handler(*_):
logger.info("Signal received, shutting down...")
stop.set()
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, signal_handler)
try:
await self.connect()
await self.setup_exchanges()
@ -258,13 +247,10 @@ class AsyncQueueManager:
await self.setup_tenant_queue()
logger.info("RabbitMQ handler is running. Press CTRL+C to exit.")
await stop.wait() # Run until stop signal received
except asyncio.CancelledError:
logger.warning("Operation cancelled.")
except Exception as e:
logger.error(f"An error occurred: {e}", exc_info=True)
finally:
await self.shutdown()
async def shutdown(self) -> None:
logger.info("Shutting down RabbitMQ handler...")
@ -281,3 +267,4 @@ class AsyncQueueManager:
if self.connection:
await self.connection.close()
logger.info("RabbitMQ handler shut down successfully.")
sys.exit(0)

View File

@ -1,12 +1,14 @@
import asyncio
import inspect
import logging
import signal
import threading
from typing import Callable
import uvicorn
from dynaconf import Dynaconf
from fastapi import FastAPI
from kn_utils.logging import logger
from pyinfra.config.loader import validate_settings
from pyinfra.config.validators import webserver_validators
@ -26,17 +28,25 @@ def create_webserver_thread(app: FastAPI, port: int, host: str) -> threading.Thr
return thread
async def create_webserver_task_from_settings(app: FastAPI, settings: Dynaconf) -> asyncio.Task:
validate_settings(settings, validators=webserver_validators)
return await create_webserver_task(app=app, port=settings.webserver.port, host=settings.webserver.host)
async def create_webserver_task(app: FastAPI, port: int, host: str) -> asyncio.Task:
"""Creates an asyncio task that runs a FastAPI webserver."""
config = uvicorn.Config(app=app, host=host, port=port, log_level=logging.WARNING)
async def run_async_webserver(app: FastAPI, port: int, host: str):
"""Run the FastAPI web server async."""
config = uvicorn.Config(app, host=host, port=port, log_level=logging.WARNING)
server = uvicorn.Server(config)
task = asyncio.create_task(server.serve())
return task
async def shutdown(signal):
logger.info(f"Received signal {signal.name}, shutting down webserver...")
await app.shutdown()
await app.cleanup()
logger.info("Shutdown complete.")
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, lambda s=sig: asyncio.create_task(shutdown(s)))
try:
await server.serve()
except asyncio.CancelledError:
pass
HealthFunction = Callable[[], bool]

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "pyinfra"
version = "3.2.3"
version = "3.2.4"
description = ""
authors = ["Team Research <research@knecon.com>"]
license = "All rights reseverd"