Merge branch 'master' into feature/RES-856-test-proto-format

This commit is contained in:
Jonathan Kössler 2024-09-26 09:39:28 +02:00
commit 9669152e14
6 changed files with 69 additions and 28 deletions

View File

@ -1 +1 @@
3.10.12
3.10

21
poetry.lock generated
View File

@ -3148,6 +3148,25 @@ opentelemetry-api = ">=1.4,<2.0"
setuptools = ">=16.0"
wrapt = ">=1.0.0,<2.0.0"
[[package]]
name = "opentelemetry-instrumentation-aio-pika"
version = "0.46b0"
description = "OpenTelemetry Aio-pika instrumentation"
optional = false
python-versions = ">=3.8"
files = [
{file = "opentelemetry_instrumentation_aio_pika-0.46b0-py3-none-any.whl", hash = "sha256:835c8601f67570130cf140518c5f0aad3ce9dbed46979a65588c82c39b37139f"},
{file = "opentelemetry_instrumentation_aio_pika-0.46b0.tar.gz", hash = "sha256:8516c155586d01c46995127fa8ce9435b74fb2a02fa0e25f7605a4fbe0ae1456"},
]
[package.dependencies]
opentelemetry-api = ">=1.5,<2.0"
opentelemetry-instrumentation = "0.46b0"
wrapt = ">=1.0.0,<2.0.0"
[package.extras]
instruments = ["aio-pika (>=7.2.0,<10.0.0)"]
[[package]]
name = "opentelemetry-instrumentation-asgi"
version = "0.46b0"
@ -5405,4 +5424,4 @@ type = ["pytest-mypy"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.10,<3.11"
content-hash = "2b8ced3cbb432e1a39c0ab58685ff8361265dab851702f8f8a41b94e9072f505"
content-hash = "85cc4f846ba584e4b4d41a1e07ad11eeea96ce3377f3137cf5efaf8de4716050"

View File

@ -1,14 +1,20 @@
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import sys
from dynaconf import Dynaconf
from fastapi import FastAPI
from kn_utils.logging import logger
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from pyinfra.config.loader import get_pyinfra_validators, validate_settings
from pyinfra.queue.async_manager import AsyncQueueManager, RabbitMQConfig
from pyinfra.queue.manager import QueueManager
from pyinfra.queue.callback import Callback
from pyinfra.queue.manager import QueueManager
from pyinfra.utils.opentelemetry import instrument_app, instrument_pika, setup_trace
from pyinfra.webserver.prometheus import (
add_prometheus_endpoint,
@ -34,6 +40,9 @@ async def run_async_queues(manager, app, port, host):
await run_async_webserver(app, port, host)
except asyncio.CancelledError:
logger.info("Main task is cancelled.")
except Exception as e:
logger.error(f"An error occurred while running async queues: {e}", exc_info=True)
sys.exit(1)
finally:
logger.info("Signal received, shutting down...")
await manager.shutdown()
@ -80,10 +89,11 @@ def start_standard_queue_consumer(
if settings.tracing.enabled:
setup_trace(settings)
instrument_pika()
instrument_pika(dynamic_queues=settings.dynamic_tenant_queues.enabled)
instrument_app(app)
if settings.dynamic_tenant_queues.enabled:
logger.info("Dynamic tenant queues enabled. Running async queues.")
config = RabbitMQConfig(
host=settings.rabbitmq.host,
port=settings.rabbitmq.port,
@ -100,9 +110,15 @@ def start_standard_queue_consumer(
pod_name=settings.kubernetes.pod_name,
)
manager = AsyncQueueManager(
config=config, tenant_service_url=settings.storage.tenant_server.endpoint, message_processor=callback
config=config,
tenant_service_url=settings.storage.tenant_server.endpoint,
message_processor=callback,
max_concurrent_tasks=(
settings.asyncio.max_concurrent_tasks if hasattr(settings.asyncio, "max_concurrent_tasks") else 10
),
)
else:
logger.info("Dynamic tenant queues disabled. Running sync queues.")
manager = QueueManager(settings)
app = add_health_check_endpoint(app, manager.is_ready)
@ -116,9 +132,7 @@ def start_standard_queue_consumer(
try:
manager.start_consuming(callback)
except Exception as e:
logger.error(f"An error occurred while consuming messages: {e}")
# Optionally, you can choose to exit here if you want to restart the process
# import sys
# sys.exit(1)
logger.error(f"An error occurred while consuming messages: {e}", exc_info=True)
sys.exit(1)
else:
logger.warning(f"Behavior for type {type(manager)} is not defined")

View File

@ -15,17 +15,16 @@ from aio_pika.abc import (
AbstractIncomingMessage,
AbstractQueue,
)
from aio_pika.exceptions import AMQPConnectionError, ChannelInvalidStateError
from aiormq.exceptions import AMQPConnectionError
from kn_utils.logging import logger
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
wait_exponential,
retry_if_exception_type,
wait_exponential_jitter,
)
from aio_pika.exceptions import AMQPConnectionError, ChannelInvalidStateError
from aiormq.exceptions import AMQPConnectionError
@dataclass
@ -62,10 +61,12 @@ class AsyncQueueManager:
config: RabbitMQConfig,
tenant_service_url: str,
message_processor: Callable[[Dict[str, Any]], Dict[str, Any]],
max_concurrent_tasks: int = 10,
):
self.config = config
self.tenant_service_url = tenant_service_url
self.message_processor = message_processor
self.semaphore = asyncio.Semaphore(max_concurrent_tasks)
self.connection: AbstractConnection | None = None
self.channel: AbstractChannel | None = None
@ -178,11 +179,14 @@ class AsyncQueueManager:
async def process_input_message(self, message: IncomingMessage) -> None:
async def process_message_body_and_await_result(unpacked_message_body):
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_pool_executor:
logger.info("Processing payload in a separate thread.")
result = await loop.run_in_executor(thread_pool_executor, self.message_processor, unpacked_message_body)
return result
async with self.semaphore:
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_pool_executor:
logger.info("Processing payload in a separate thread.")
result = await loop.run_in_executor(
thread_pool_executor, self.message_processor, unpacked_message_body
)
return result
async with message.process(ignore_processed=True):
if message.redelivered:
@ -222,14 +226,13 @@ class AsyncQueueManager:
except json.JSONDecodeError:
await message.nack(requeue=False)
logger.error(f"Invalid JSON in input message: {message.body}")
logger.error(f"Invalid JSON in input message: {message.body}", exc_info=True)
except FileNotFoundError as e:
logger.warning(f"{e}, declining message with {message.delivery_tag=}.")
logger.warning(f"{e}, declining message with {message.delivery_tag=}.", exc_info=True)
await message.nack(requeue=False)
except Exception as e:
await message.nack(requeue=False)
logger.error(f"Error processing input message: {e}", exc_info=True)
raise
finally:
self.message_count -= 1
@ -269,7 +272,7 @@ class AsyncQueueManager:
try:
active_tenants = await self.fetch_active_tenants()
except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError):
logger.warning("API calls to tenant server failed. No tenant queues initialized.")
logger.warning("API calls to tenant server failed. No tenant queues initialized.", exc_info=True)
active_tenants = set()
for tenant_id in active_tenants:
await self.create_tenant_queues(tenant_id)
@ -283,7 +286,7 @@ class AsyncQueueManager:
logger.info("RabbitMQ handler is running. Press CTRL+C to exit.")
except AMQPConnectionError as e:
logger.error(f"Failed to establish connection to RabbitMQ: {e}")
logger.error(f"Failed to establish connection to RabbitMQ: {e}", exc_info=True)
# TODO: implement a custom exception handling strategy here
except asyncio.CancelledError:
logger.warning("Operation cancelled.")

View File

@ -3,8 +3,10 @@ import json
from azure.monitor.opentelemetry import configure_azure_monitor
from dynaconf import Dynaconf
from fastapi import FastAPI
from kn_utils.logging import logger
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.pika import PikaInstrumentor
from opentelemetry.sdk.resources import Resource
@ -18,7 +20,6 @@ from opentelemetry.sdk.trace.export import (
from pyinfra.config.loader import validate_settings
from pyinfra.config.validators import opentelemetry_validators
from kn_utils.logging import logger
class JsonSpanExporter(SpanExporter):
@ -37,7 +38,7 @@ class JsonSpanExporter(SpanExporter):
def setup_trace(settings: Dynaconf, service_name: str = None, exporter: SpanExporter = None):
tracing_type = settings.tracing.type
if tracing_type == "azure_monitor":
# Configure OpenTelemetry to use Azure Monitor with the
# Configure OpenTelemetry to use Azure Monitor with the
# APPLICATIONINSIGHTS_CONNECTION_STRING environment variable.
try:
configure_azure_monitor()
@ -84,8 +85,11 @@ def get_exporter(settings: Dynaconf):
)
def instrument_pika():
PikaInstrumentor().instrument()
def instrument_pika(dynamic_queues: bool):
if dynamic_queues:
AioPikaInstrumentor().instrument()
else:
PikaInstrumentor().instrument()
def instrument_app(app: FastAPI, excluded_urls: str = "/health,/ready,/prometheus"):

View File

@ -40,6 +40,7 @@ protobuf = ">=3.20 <5.0.0"
aio-pika = "^9.4.2"
aiohttp = "^3.9.5"
tenacity = "^8.5.0"
opentelemetry-instrumentation-aio-pika = "0.46b0"
[tool.poetry.group.dev.dependencies]
pytest = "^7"