fix: process message in thread in event loop

This commit is contained in:
Jonathan Kössler 2024-08-23 16:56:24 +02:00
parent a5167d1230
commit f2018f9c86
3 changed files with 24 additions and 29 deletions

View File

@ -15,17 +15,18 @@ from pyinfra.webserver.prometheus import (
)
from pyinfra.webserver.utils import (
add_health_check_endpoint,
create_webserver_task_from_settings,
create_webserver_thread_from_settings,
run_async_webserver,
)
async def run_async_queues(manager, webserver):
"""Run the webserver and the async queue manager concurrently."""
webserver_task = asyncio.create_task(webserver)
queue_manager_task = asyncio.create_task(manager.run())
await asyncio.gather(webserver_task, queue_manager_task)
async def run_async_queues(manager, app, port, host):
"""Run the async webserver and the async queue manager concurrently."""
try:
await manager.run()
await run_async_webserver(app, port, host)
except asyncio.CancelledError:
logger.info("Main task is cancelled.")
def start_standard_queue_consumer(
@ -82,8 +83,8 @@ def start_standard_queue_consumer(
app = add_health_check_endpoint(app, manager.is_ready)
if isinstance(manager, AsyncQueueManager):
webserver = create_webserver_task_from_settings(app, settings)
asyncio.run(run_async_queues(manager, webserver))
asyncio.run(run_async_queues(manager, app, port=settings.webserver.port, host=settings.webserver.host))
elif isinstance(manager, QueueManager):
webserver = create_webserver_thread_from_settings(app, settings)
webserver.start()

View File

@ -1,6 +1,8 @@
import asyncio
import concurrent.futures
import json
import signal
import sys
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Set
@ -79,7 +81,7 @@ class AsyncQueueManager:
async def is_ready(self) -> bool:
await self.connect()
return self.channel.is_initialized
return self.connection is not None and not self.connection.is_closed
async def setup_exchanges(self) -> None:
self.tenant_exchange = await self.channel.declare_exchange(
@ -100,7 +102,6 @@ class AsyncQueueManager:
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": self.config.service_dead_letter_queue_name,
"x-expires": self.config.queue_expiration_time,
# "x-max-priority": 2,
},
)
await self.tenant_exchange_queue.bind(self.tenant_exchange, routing_key="tenant.*")
@ -130,12 +131,6 @@ class AsyncQueueManager:
input_queue = await self.channel.declare_queue(
queue_name,
durable=True,
# arguments={
# "x-dead-letter-exchange": "",
# "x-dead-letter-routing-key": self.config.service_dead_letter_queue_name,
# "x-expires": self.config.queue_expiration_time,
# "x-max-priority": 2,
# },
)
await input_queue.bind(self.input_exchange, routing_key=tenant_id)
self.consumer_tags[tenant_id] = await input_queue.consume(self.process_input_message)
@ -154,7 +149,11 @@ class AsyncQueueManager:
async def process_input_message(self, message: IncomingMessage) -> None:
async def process_message_body_and_await_result(unpacked_message_body):
return self.message_processor(unpacked_message_body)
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_pool_executor:
logger.info("Processing payload in a separate thread.")
result = await loop.run_in_executor(thread_pool_executor, self.message_processor, unpacked_message_body)
return result
async with message.process(ignore_processed=True):
if message.redelivered:
@ -281,3 +280,4 @@ class AsyncQueueManager:
if self.connection:
await self.connection.close()
logger.info("RabbitMQ handler shut down successfully.")
sys.exit(0)

View File

@ -1,4 +1,3 @@
import asyncio
import inspect
import logging
import threading
@ -26,17 +25,12 @@ def create_webserver_thread(app: FastAPI, port: int, host: str) -> threading.Thr
return thread
async def create_webserver_task_from_settings(app: FastAPI, settings: Dynaconf) -> asyncio.Task:
validate_settings(settings, validators=webserver_validators)
return await create_webserver_task(app=app, port=settings.webserver.port, host=settings.webserver.host)
async def create_webserver_task(app: FastAPI, port: int, host: str) -> asyncio.Task:
"""Creates an asyncio task that runs a FastAPI webserver."""
config = uvicorn.Config(app=app, host=host, port=port, log_level=logging.WARNING)
async def run_async_webserver(app: FastAPI, port: int, host: str):
"""Run the FastAPI web server async."""
config = uvicorn.Config(app, host=host, port=port, log_level=logging.WARNING)
server = uvicorn.Server(config)
task = asyncio.create_task(server.serve())
return task
await server.serve()
HealthFunction = Callable[[], bool]