Merge branch 'feature/RES-731-add-queues-per-tenant' into 'master'
feat: refractor to work asynchronously See merge request knecon/research/pyinfra!86
This commit is contained in:
commit
8f1ad1a4bd
@ -1,42 +1,54 @@
|
||||
# See https://pre-commit.com for more information
|
||||
# See https://pre-commit.com/hooks.html for more hooks
|
||||
exclude: ^(docs/|notebooks/|data/|src/secrets/|src/static/|src/templates/|tests)
|
||||
exclude: ^(docs/|notebooks/|data/|src/configs/|tests/|.hooks/)
|
||||
default_language_version:
|
||||
python: python3.10
|
||||
repos:
|
||||
- repo: https://github.com/pre-commit/pre-commit-hooks
|
||||
rev: v4.5.0
|
||||
rev: v4.6.0
|
||||
hooks:
|
||||
- id: trailing-whitespace
|
||||
- id: end-of-file-fixer
|
||||
- id: check-yaml
|
||||
exclude: bamboo-specs/bamboo.yml
|
||||
name: Check Gitlab CI (unsafe)
|
||||
args: [--unsafe]
|
||||
files: .gitlab-ci.yml
|
||||
- id: check-yaml
|
||||
exclude: .gitlab-ci.yml
|
||||
- id: check-toml
|
||||
- id: detect-private-key
|
||||
- id: check-added-large-files
|
||||
args: ['--maxkb=10000']
|
||||
- id: check-case-conflict
|
||||
- id: mixed-line-ending
|
||||
|
||||
# - repo: https://github.com/pycqa/pylint
|
||||
# rev: v2.16.1
|
||||
# hooks:
|
||||
# - id: pylint
|
||||
# args:
|
||||
# ["--max-line-length=120", "--errors-only", "--ignore-imports=true", ]
|
||||
- repo: https://github.com/pre-commit/mirrors-pylint
|
||||
rev: v3.0.0a5
|
||||
hooks:
|
||||
- id: pylint
|
||||
args:
|
||||
- --disable=C0111,R0903
|
||||
- --max-line-length=120
|
||||
|
||||
- repo: https://github.com/pre-commit/mirrors-isort
|
||||
rev: v5.10.1
|
||||
hooks:
|
||||
- id: isort
|
||||
args: ["--profile", "black"]
|
||||
args:
|
||||
- --profile black
|
||||
|
||||
- repo: https://github.com/psf/black
|
||||
rev: 23.12.1
|
||||
rev: 24.4.2
|
||||
hooks:
|
||||
- id: black
|
||||
# exclude: ^(docs/|notebooks/|data/|src/secrets/)
|
||||
args:
|
||||
- --line-length=120
|
||||
# - repo: local
|
||||
# hooks:
|
||||
# - id: system
|
||||
# name: PyLint
|
||||
# entry: poetry run pylint
|
||||
# language: system
|
||||
# exclude: ^alembic/
|
||||
# files: \.py$
|
||||
|
||||
- repo: https://github.com/compilerla/conventional-pre-commit
|
||||
rev: v3.2.0
|
||||
hooks:
|
||||
- id: conventional-pre-commit
|
||||
pass_filenames: false
|
||||
stages: [commit-msg]
|
||||
# args: [] # optional: list of Conventional Commits types to allow e.g. [feat, fix, ci, chore, test]
|
||||
|
||||
71
README.md
71
README.md
@ -30,34 +30,47 @@ The following table shows all necessary settings. You can find a preconfigured s
|
||||
bitbucket. These are the complete settings, you only need all if using all features of the service as described in
|
||||
the [complete example](pyinfra/examples.py).
|
||||
|
||||
| Environment Variable | Internal / .toml Name | Description |
|
||||
|--------------------------------------|------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| LOGGING__LEVEL | logging.level | Log level |
|
||||
| METRICS__PROMETHEUS__ENABLED | metrics.prometheus.enabled | Enable Prometheus metrics collection |
|
||||
| METRICS__PROMETHEUS__PREFIX | metrics.prometheus.prefix | Prefix for Prometheus metrics (e.g. {product}-{service}) |
|
||||
| WEBSERVER__HOST | webserver.host | Host of the webserver (offering e.g. /prometheus, /ready and /health endpoints) |
|
||||
| WEBSERVER__PORT | webserver.port | Port of the webserver |
|
||||
| RABBITMQ__HOST | rabbitmq.host | Host of the RabbitMQ server |
|
||||
| RABBITMQ__PORT | rabbitmq.port | Port of the RabbitMQ server |
|
||||
| RABBITMQ__USERNAME | rabbitmq.username | Username for the RabbitMQ server |
|
||||
| RABBITMQ__PASSWORD | rabbitmq.password | Password for the RabbitMQ server |
|
||||
| RABBITMQ__HEARTBEAT | rabbitmq.heartbeat | Heartbeat for the RabbitMQ server |
|
||||
| RABBITMQ__CONNECTION_SLEEP | rabbitmq.connection_sleep | Sleep time intervals during message processing. Has to be a divider of heartbeat, and shouldn't be too big, since only in these intervals queue interactions happen (like receiving new messages) This is also the minimum time the service needs to process a message. |
|
||||
| RABBITMQ__INPUT_QUEUE | rabbitmq.input_queue | Name of the input queue |
|
||||
| RABBITMQ__OUTPUT_QUEUE | rabbitmq.output_queue | Name of the output queue |
|
||||
| RABBITMQ__DEAD_LETTER_QUEUE | rabbitmq.dead_letter_queue | Name of the dead letter queue |
|
||||
| STORAGE__BACKEND | storage.backend | Storage backend to use (currently only "s3" and "azure" are supported) |
|
||||
| STORAGE__S3__BUCKET | storage.s3.bucket | Name of the S3 bucket |
|
||||
| STORAGE__S3__ENDPOINT | storage.s3.endpoint | Endpoint of the S3 server |
|
||||
| STORAGE__S3__KEY | storage.s3.key | Access key for the S3 server |
|
||||
| STORAGE__S3__SECRET | storage.s3.secret | Secret key for the S3 server |
|
||||
| STORAGE__S3__REGION | storage.s3.region | Region of the S3 server |
|
||||
| STORAGE__AZURE__CONTAINER | storage.azure.container_name | Name of the Azure container |
|
||||
| STORAGE__AZURE__CONNECTION_STRING | storage.azure.connection_string | Connection string for the Azure server |
|
||||
| STORAGE__TENANT_SERVER__PUBLIC_KEY | storage.tenant_server.public_key | Public key of the tenant server |
|
||||
| STORAGE__TENANT_SERVER__ENDPOINT | storage.tenant_server.endpoint | Endpoint of the tenant server |
|
||||
| TRACING__OPENTELEMETRY__ENDPOINT | tracing.opentelemetry.endpoint | Endpoint to which OpenTelemetry traces are exported
|
||||
| TRACING__OPENTELEMETRY__SERVICE_NAME | tracing.opentelemetry.service_name | Name of the service as displayed in the traces collected
|
||||
| Environment Variable | Internal / .toml Name | Description |
|
||||
| ------------------------------------------ | --------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| LOGGING\_\_LEVEL | logging.level | Log level |
|
||||
| CONCURRENCY\_\_ENABLED | concurrency.enabled | Enable multi tenant queue mode |
|
||||
| METRICS\_\_PROMETHEUS\_\_ENABLED | metrics.prometheus.enabled | Enable Prometheus metrics collection |
|
||||
| METRICS\_\_PROMETHEUS\_\_PREFIX | metrics.prometheus.prefix | Prefix for Prometheus metrics (e.g. {product}-{service}) |
|
||||
| WEBSERVER\_\_HOST | webserver.host | Host of the webserver (offering e.g. /prometheus, /ready and /health endpoints) |
|
||||
| WEBSERVER\_\_PORT | webserver.port | Port of the webserver |
|
||||
| RABBITMQ\_\_HOST | rabbitmq.host | Host of the RabbitMQ server |
|
||||
| RABBITMQ\_\_PORT | rabbitmq.port | Port of the RabbitMQ server |
|
||||
| RABBITMQ\_\_USERNAME | rabbitmq.username | Username for the RabbitMQ server |
|
||||
| RABBITMQ\_\_PASSWORD | rabbitmq.password | Password for the RabbitMQ server |
|
||||
| RABBITMQ\_\_HEARTBEAT | rabbitmq.heartbeat | Heartbeat for the RabbitMQ server |
|
||||
| RABBITMQ\_\_CONNECTION_SLEEP | rabbitmq.connection_sleep | Sleep time intervals during message processing. Has to be a divider of heartbeat, and shouldn't be too big, since only in these intervals queue interactions happen (like receiving new messages) This is also the minimum time the service needs to process a message. |
|
||||
| RABBITMQ\_\_INPUT_QUEUE | rabbitmq.input_queue | Name of the input queue in single queue setting |
|
||||
| RABBITMQ\_\_OUTPUT_QUEUE | rabbitmq.output_queue | Name of the output queue in single queue setting |
|
||||
| RABBITMQ\_\_DEAD_LETTER_QUEUE | rabbitmq.dead_letter_queue | Name of the dead letter queue in single queue setting |
|
||||
| RABBITMQ\_\_TENANT_EVENT_QUEUE_SUFFIX | rabbitmq.tenant_event_queue_suffix | Suffix for the tenant event queue in multi tenant/queue setting |
|
||||
| RABBITMQ\_\_TENANT_EVENT_DLQ_SUFFIX | rabbitmq.tenant_event_dlq_suffix | Suffix for the dead letter queue in multi tenant/queue setting |
|
||||
| RABBITMQ\_\_TENANT_EXCHANGE_NAME | rabbitmq.tenant_exchange_name | Name of tenant exchange in multi tenant/queue setting |
|
||||
| RABBITMQ\_\_QUEUE_EXPIRATION_TIME | rabbitmq.queue_expiration_time | Time until queue expiration in multi tenant/queue setting |
|
||||
| RABBITMQ\_\_SERVICE_REQUEST_QUEUE_PREFIX | rabbitmq.service_request_queue_prefix | Service request queue prefix in multi tenant/queue setting |
|
||||
| RABBITMQ\_\_SERVICE_REQUEST_EXCHANGE_NAME | rabbitmq.service_request_exchange_name | Service request exchange name in multi tenant/queue setting |
|
||||
| RABBITMQ\_\_SERVICE_RESPONSE_EXCHANGE_NAME | rabbitmq.service_response_exchange_name | Service response exchange name in multi tenant/queue setting |
|
||||
| RABBITMQ\_\_SERVICE_DLQ_NAME | rabbitmq.service_dlq_name | Service dead letter queue name in multi tenant/queue setting |
|
||||
| STORAGE\_\_BACKEND | storage.backend | Storage backend to use (currently only "s3" and "azure" are supported) |
|
||||
| STORAGE\_\_S3\_\_BUCKET | storage.s3.bucket | Name of the S3 bucket |
|
||||
| STORAGE\_\_S3\_\_ENDPOINT | storage.s3.endpoint | Endpoint of the S3 server |
|
||||
| STORAGE\_\_S3\_\_KEY | storage.s3.key | Access key for the S3 server |
|
||||
| STORAGE\_\_S3\_\_SECRET | storage.s3.secret | Secret key for the S3 server |
|
||||
| STORAGE\_\_S3\_\_REGION | storage.s3.region | Region of the S3 server |
|
||||
| STORAGE\_\_AZURE\_\_CONTAINER | storage.azure.container_name | Name of the Azure container |
|
||||
| STORAGE\_\_AZURE\_\_CONNECTION_STRING | storage.azure.connection_string | Connection string for the Azure server |
|
||||
| STORAGE\_\_TENANT_SERVER\_\_PUBLIC_KEY | storage.tenant_server.public_key | Public key of the tenant server |
|
||||
| STORAGE\_\_TENANT_SERVER\_\_ENDPOINT | storage.tenant_server.endpoint | Endpoint of the tenant server |
|
||||
| TRACING\_\_ENABLED | tracing.enabled | Enable tracing |
|
||||
| TRACING\_\_TYPE | tracing.type | Tracing mode - possible values: "opentelemetry", "azure_monitor" (Excpects APPLICATIONINSIGHTS_CONNECTION_STRING environment variable.) |
|
||||
| TRACING\_\_OPENTELEMETRY\_\_ENDPOINT | tracing.opentelemetry.endpoint | Endpoint to which OpenTelemetry traces are exported |
|
||||
| TRACING\_\_OPENTELEMETRY\_\_SERVICE_NAME | tracing.opentelemetry.service_name | Name of the service as displayed in the traces collected |
|
||||
| TRACING\_\_OPENTELEMETRY\_\_EXPORTER | tracing.opentelemetry.exporter | Name of exporter |
|
||||
| KUBERNETES\_\_POD_NAME | kubernetes.pod_name | Service pod name |
|
||||
|
||||
### OpenTelemetry
|
||||
|
||||
@ -115,10 +128,8 @@ callback = make_download_process_upload_callback(processing_function, settings)
|
||||
start_standard_queue_consumer(callback, settings) # optionally also pass a fastAPI app object with preconfigured routes
|
||||
```
|
||||
|
||||
|
||||
### AMQP input message:
|
||||
|
||||
|
||||
Either use the legacy format with dossierId and fileId as strings or the new format where absolute paths are used.
|
||||
All headers beginning with "X-" are forwarded to the message processor, and returned in the response message (e.g.
|
||||
"X-TENANT-ID" is used to acquire storage information for the tenant).
|
||||
|
||||
1217
poetry.lock
generated
1217
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@ -1,11 +1,14 @@
|
||||
import asyncio
|
||||
|
||||
from dynaconf import Dynaconf
|
||||
from fastapi import FastAPI
|
||||
from kn_utils.logging import logger
|
||||
|
||||
from pyinfra.config.loader import get_pyinfra_validators, validate_settings
|
||||
from pyinfra.queue.async_manager import AsyncQueueManager, RabbitMQConfig
|
||||
from pyinfra.queue.callback import Callback
|
||||
from pyinfra.queue.manager import QueueManager
|
||||
from pyinfra.utils.opentelemetry import instrument_pika, setup_trace, instrument_app
|
||||
from pyinfra.utils.opentelemetry import instrument_app, instrument_pika, setup_trace
|
||||
from pyinfra.webserver.prometheus import (
|
||||
add_prometheus_endpoint,
|
||||
make_prometheus_processing_time_decorator_from_settings,
|
||||
@ -34,8 +37,6 @@ def start_standard_queue_consumer(
|
||||
|
||||
app = app or FastAPI()
|
||||
|
||||
queue_manager = QueueManager(settings)
|
||||
|
||||
if settings.metrics.prometheus.enabled:
|
||||
logger.info("Prometheus metrics enabled.")
|
||||
app = add_prometheus_endpoint(app)
|
||||
@ -43,13 +44,40 @@ def start_standard_queue_consumer(
|
||||
|
||||
if settings.tracing.enabled:
|
||||
setup_trace(settings)
|
||||
|
||||
|
||||
instrument_pika()
|
||||
instrument_app(app)
|
||||
|
||||
app = add_health_check_endpoint(app, queue_manager.is_ready)
|
||||
|
||||
if settings.concurrency.enabled:
|
||||
config = RabbitMQConfig(
|
||||
host=settings.rabbitmq.host,
|
||||
port=settings.rabbitmq.port,
|
||||
username=settings.rabbitmq.username,
|
||||
password=settings.rabbitmq.password,
|
||||
heartbeat=settings.rabbitmq.heartbeat,
|
||||
input_queue_prefix=settings.rabbitmq.service_request_queue_prefix,
|
||||
tenant_event_queue_suffix=settings.rabbitmq.tenant_event_queue_suffix,
|
||||
tenant_exchange_name=settings.rabbitmq.tenant_exchange_name,
|
||||
service_request_exchange_name=settings.rabbitmq.service_request_exchange_name,
|
||||
service_response_exchange_name=settings.rabbitmq.service_response_exchange_name,
|
||||
service_dead_letter_queue_name=settings.rabbitmq.service_dlq_name,
|
||||
queue_expiration_time=settings.rabbitmq.queue_expiration_time,
|
||||
pod_name=settings.kubernetes.pod_name,
|
||||
)
|
||||
manager = AsyncQueueManager(
|
||||
config=config, tenant_service_url=settings.storage.tenant_server.endpoint, message_processor=callback
|
||||
)
|
||||
else:
|
||||
manager = QueueManager(settings)
|
||||
|
||||
app = add_health_check_endpoint(app, manager.is_ready)
|
||||
|
||||
webserver_thread = create_webserver_thread_from_settings(app, settings)
|
||||
webserver_thread.start()
|
||||
|
||||
queue_manager.start_consuming(callback)
|
||||
if isinstance(manager, AsyncQueueManager):
|
||||
asyncio.run(manager.run())
|
||||
elif isinstance(manager, QueueManager):
|
||||
manager.start_consuming(callback)
|
||||
else:
|
||||
logger.warning(f"Behavior for type {type(manager)} is not defined")
|
||||
|
||||
258
pyinfra/queue/async_manager.py
Normal file
258
pyinfra/queue/async_manager.py
Normal file
@ -0,0 +1,258 @@
|
||||
import asyncio
|
||||
import json
|
||||
import signal
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Callable, Dict, Set
|
||||
|
||||
import aiohttp
|
||||
from aio_pika import ExchangeType, IncomingMessage, Message, connect_robust
|
||||
from aio_pika.abc import (
|
||||
AbstractChannel,
|
||||
AbstractConnection,
|
||||
AbstractExchange,
|
||||
AbstractIncomingMessage,
|
||||
)
|
||||
from kn_utils.logging import logger
|
||||
from tenacity import (
|
||||
retry,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
wait_exponential_jitter,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class RabbitMQConfig:
|
||||
host: str
|
||||
port: int
|
||||
username: str
|
||||
password: str
|
||||
heartbeat: int
|
||||
input_queue_prefix: str
|
||||
tenant_event_queue_suffix: str
|
||||
tenant_exchange_name: str
|
||||
service_request_exchange_name: str
|
||||
service_response_exchange_name: str
|
||||
service_dead_letter_queue_name: str
|
||||
queue_expiration_time: int
|
||||
pod_name: str
|
||||
|
||||
connection_params: Dict[str, object] = field(init=False)
|
||||
|
||||
def __post_init__(self):
|
||||
self.connection_params = {
|
||||
"host": self.host,
|
||||
"port": self.port,
|
||||
"login": self.username,
|
||||
"password": self.password,
|
||||
"client_properties": {"heartbeat": self.heartbeat},
|
||||
}
|
||||
|
||||
|
||||
class AsyncQueueManager:
|
||||
def __init__(
|
||||
self,
|
||||
config: RabbitMQConfig,
|
||||
tenant_service_url: str,
|
||||
message_processor: Callable[[Dict[str, Any]], Dict[str, Any]],
|
||||
):
|
||||
self.config = config
|
||||
self.tenant_service_url = tenant_service_url
|
||||
self.message_processor = message_processor
|
||||
|
||||
self.connection: AbstractConnection | None = None
|
||||
self.channel: AbstractChannel | None = None
|
||||
self.tenant_exchange: AbstractExchange | None = None
|
||||
self.input_exchange: AbstractExchange | None = None
|
||||
self.output_exchange: AbstractExchange | None = None
|
||||
self.tenant_queues: Dict[str, AbstractChannel] = {}
|
||||
|
||||
async def connect(self) -> None:
|
||||
self.connection = await connect_robust(**self.config.connection_params)
|
||||
self.channel = await self.connection.channel()
|
||||
await self.channel.set_qos(prefetch_count=1)
|
||||
|
||||
async def is_ready(self) -> bool:
|
||||
await self.connect()
|
||||
return await self.channel.is_open
|
||||
|
||||
async def setup_exchanges(self) -> None:
|
||||
self.tenant_exchange = await self.channel.declare_exchange(
|
||||
self.config.tenant_exchange_name, ExchangeType.TOPIC, durable=True
|
||||
)
|
||||
self.input_exchange = await self.channel.declare_exchange(
|
||||
self.config.service_request_exchange_name, ExchangeType.DIRECT, durable=True
|
||||
)
|
||||
self.output_exchange = await self.channel.declare_exchange(
|
||||
self.config.service_response_exchange_name, ExchangeType.DIRECT, durable=True
|
||||
)
|
||||
|
||||
async def setup_tenant_queue(self) -> None:
|
||||
queue = await self.channel.declare_queue(
|
||||
f"{self.config.pod_name}_{self.config.tenant_event_queue_suffix}",
|
||||
durable=True,
|
||||
arguments={
|
||||
"x-dead-letter-exchange": "",
|
||||
"x-dead-letter-routing-key": self.config.service_dead_letter_queue_name,
|
||||
"x-expires": self.config.queue_expiration_time,
|
||||
"x-max-priority": 2,
|
||||
},
|
||||
)
|
||||
await queue.bind(self.tenant_exchange, routing_key="tenant.*")
|
||||
await queue.consume(self.process_tenant_message)
|
||||
|
||||
async def process_tenant_message(self, message: AbstractIncomingMessage) -> None:
|
||||
async with message.process():
|
||||
message_body = json.loads(message.body.decode())
|
||||
logger.debug(f"Tenant message received: {message_body}")
|
||||
tenant_id = message_body["tenantId"]
|
||||
routing_key = message.routing_key
|
||||
|
||||
if routing_key == "tenant.created":
|
||||
await self.create_tenant_queues(tenant_id)
|
||||
elif routing_key == "tenant.delete":
|
||||
await self.delete_tenant_queues(tenant_id)
|
||||
|
||||
async def create_tenant_queues(self, tenant_id: str) -> None:
|
||||
queue_name = f"{self.config.input_queue_prefix}_{tenant_id}"
|
||||
logger.info(f"Declaring queue: {queue_name}")
|
||||
input_queue = await self.channel.declare_queue(
|
||||
queue_name,
|
||||
durable=True,
|
||||
arguments={
|
||||
"x-dead-letter-exchange": "",
|
||||
"x-dead-letter-routing-key": self.config.service_dead_letter_queue_name,
|
||||
"x-expires": self.config.queue_expiration_time,
|
||||
"x-max-priority": 2,
|
||||
},
|
||||
)
|
||||
await input_queue.bind(self.input_exchange, routing_key=tenant_id)
|
||||
await input_queue.consume(self.process_input_message)
|
||||
|
||||
self.tenant_queues[tenant_id] = input_queue
|
||||
logger.info(f"Created queues for tenant {tenant_id}")
|
||||
|
||||
async def delete_tenant_queues(self, tenant_id: str) -> None:
|
||||
if tenant_id in self.tenant_queues:
|
||||
# somehow queue.delete() does not work here
|
||||
await self.channel.queue_delete(f"{self.config.input_queue_prefix}_{tenant_id}")
|
||||
del self.tenant_queues[tenant_id]
|
||||
logger.info(f"Deleted queues for tenant {tenant_id}")
|
||||
|
||||
async def process_input_message(self, message: IncomingMessage) -> None:
|
||||
async def process_message_body_and_await_result(unpacked_message_body):
|
||||
return self.message_processor(unpacked_message_body)
|
||||
|
||||
async with message.process(ignore_processed=True):
|
||||
if message.redelivered:
|
||||
logger.warning(f"Declining message with {message.delivery_tag=} due to it being redelivered.")
|
||||
await message.nack(requeue=False)
|
||||
return
|
||||
|
||||
if message.body.decode("utf-8") == "STOP":
|
||||
logger.info("Received stop signal, stopping consumption...")
|
||||
await message.ack()
|
||||
# TODO: shutdown is probably not the right call here - align w/ Dev what should happen on stop signal
|
||||
await self.shutdown()
|
||||
return
|
||||
|
||||
try:
|
||||
tenant_id = message.routing_key
|
||||
|
||||
filtered_message_headers = (
|
||||
{k: v for k, v in message.headers.items() if k.lower().startswith("x-")} if message.headers else {}
|
||||
)
|
||||
|
||||
logger.debug(f"Processing message with {filtered_message_headers=}.")
|
||||
|
||||
result: dict = await (
|
||||
process_message_body_and_await_result({**json.loads(message.body), **filtered_message_headers})
|
||||
or {}
|
||||
)
|
||||
|
||||
if result:
|
||||
await self.publish_to_output_exchange(tenant_id, result, filtered_message_headers)
|
||||
await message.ack()
|
||||
logger.debug(f"Message with {message.delivery_tag=} acknowledged.")
|
||||
else:
|
||||
raise ValueError(f"Could not process message with {message.body=}.")
|
||||
|
||||
except json.JSONDecodeError:
|
||||
await message.nack(requeue=False)
|
||||
logger.error(f"Invalid JSON in input message: {message.body}")
|
||||
except FileNotFoundError as e:
|
||||
logger.warning(f"{e}, declining message with {message.delivery_tag=}.")
|
||||
await message.nack(requeue=False)
|
||||
except Exception as e:
|
||||
await message.nack(requeue=False)
|
||||
logger.error(f"Error processing input message: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
async def publish_to_output_exchange(self, tenant_id: str, result: Dict[str, Any], headers: Dict[str, Any]) -> None:
|
||||
await self.output_exchange.publish(
|
||||
Message(body=json.dumps(result).encode(), headers=headers),
|
||||
routing_key=tenant_id,
|
||||
)
|
||||
logger.info(f"Published result to queue {tenant_id}.")
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_exponential_jitter(initial=1, max=10),
|
||||
retry=retry_if_exception_type(aiohttp.ClientResponseError),
|
||||
reraise=True,
|
||||
)
|
||||
async def fetch_active_tenants(self) -> Set[str]:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(self.tenant_service_url) as response:
|
||||
response.raise_for_status()
|
||||
if response.headers["content-type"].lower() == "application/json":
|
||||
data = await response.json()
|
||||
return {tenant["tenantId"] for tenant in data}
|
||||
else:
|
||||
logger.error(
|
||||
f"Failed to fetch active tenants. Content type is not JSON: {response.headers['content-type'].lower()}"
|
||||
)
|
||||
return set()
|
||||
|
||||
async def initialize_tenant_queues(self) -> None:
|
||||
try:
|
||||
active_tenants = await self.fetch_active_tenants()
|
||||
except aiohttp.ClientResponseError:
|
||||
logger.warning("API calls to tenant server failed. No tenant queues initialized.")
|
||||
active_tenants = set()
|
||||
for tenant_id in active_tenants:
|
||||
await self.create_tenant_queues(tenant_id)
|
||||
|
||||
async def run(self) -> None:
|
||||
stop = asyncio.Event()
|
||||
|
||||
def signal_handler(*_):
|
||||
logger.info("Signal received, shutting down...")
|
||||
stop.set()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
for sig in (signal.SIGINT, signal.SIGTERM):
|
||||
loop.add_signal_handler(sig, signal_handler)
|
||||
|
||||
try:
|
||||
await self.connect()
|
||||
await self.setup_exchanges()
|
||||
await self.initialize_tenant_queues()
|
||||
await self.setup_tenant_queue()
|
||||
|
||||
logger.info("RabbitMQ handler is running. Press CTRL+C to exit.")
|
||||
await stop.wait() # Run until stop signal received
|
||||
except asyncio.CancelledError:
|
||||
logger.warning("Operation cancelled.")
|
||||
except Exception as e:
|
||||
logger.error(f"An error occurred: {e}", exc_info=True)
|
||||
finally:
|
||||
await self.shutdown()
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
logger.info("Shutting down RabbitMQ handler...")
|
||||
if self.channel:
|
||||
await self.channel.close()
|
||||
if self.connection:
|
||||
await self.connection.close()
|
||||
logger.info("RabbitMQ handler shut down successfully.")
|
||||
@ -19,6 +19,17 @@ class DossierIdFileIdDownloadPayload(BaseModel):
|
||||
return f"{self.dossierId}/{self.fileId}.{self.targetFileExtension}"
|
||||
|
||||
|
||||
class TenantIdDossierIdFileIdDownloadPayload(BaseModel):
|
||||
tenantId: str
|
||||
dossierId: str
|
||||
fileId: str
|
||||
targetFileExtension: str
|
||||
|
||||
@property
|
||||
def targetFilePath(self):
|
||||
return f"{self.tenantId}/{self.dossierId}/{self.fileId}.{self.targetFileExtension}"
|
||||
|
||||
|
||||
class DossierIdFileIdUploadPayload(BaseModel):
|
||||
dossierId: str
|
||||
fileId: str
|
||||
@ -27,6 +38,17 @@ class DossierIdFileIdUploadPayload(BaseModel):
|
||||
@property
|
||||
def responseFilePath(self):
|
||||
return f"{self.dossierId}/{self.fileId}.{self.responseFileExtension}"
|
||||
|
||||
|
||||
class TenantIdDossierIdFileIdUploadPayload(BaseModel):
|
||||
tenantId: str
|
||||
dossierId: str
|
||||
fileId: str
|
||||
responseFileExtension: str
|
||||
|
||||
@property
|
||||
def responseFilePath(self):
|
||||
return f"{self.tenantId}/{self.dossierId}/{self.fileId}.{self.responseFileExtension}"
|
||||
|
||||
|
||||
class TargetResponseFilePathDownloadPayload(BaseModel):
|
||||
@ -55,7 +77,9 @@ def download_data_as_specified_in_message(storage: Storage, raw_payload: dict) -
|
||||
"""
|
||||
|
||||
try:
|
||||
if "dossierId" in raw_payload:
|
||||
if "tenantId" in raw_payload and "dossierId" in raw_payload:
|
||||
payload = TenantIdDossierIdFileIdDownloadPayload(**raw_payload)
|
||||
elif "tenantId" not in raw_payload and "dossierId" in raw_payload:
|
||||
payload = DossierIdFileIdDownloadPayload(**raw_payload)
|
||||
else:
|
||||
payload = TargetResponseFilePathDownloadPayload(**raw_payload)
|
||||
@ -106,7 +130,9 @@ def upload_data_as_specified_in_message(storage: Storage, raw_payload: dict, dat
|
||||
"""
|
||||
|
||||
try:
|
||||
if "dossierId" in raw_payload:
|
||||
if "tenantId" in raw_payload and "dossierId" in raw_payload:
|
||||
payload = TenantIdDossierIdFileIdUploadPayload(**raw_payload)
|
||||
elif "tenantId" not in raw_payload and "dossierId" in raw_payload:
|
||||
payload = DossierIdFileIdUploadPayload(**raw_payload)
|
||||
else:
|
||||
payload = TargetResponseFilePathUploadPayload(**raw_payload)
|
||||
|
||||
@ -39,13 +39,16 @@ def setup_trace(settings: Dynaconf, service_name: str = None, exporter: SpanExpo
|
||||
if tracing_type == "azure_monitor":
|
||||
# Configure OpenTelemetry to use Azure Monitor with the
|
||||
# APPLICATIONINSIGHTS_CONNECTION_STRING environment variable.
|
||||
logger.info("Azure Monitor tracing enabled.")
|
||||
configure_azure_monitor()
|
||||
try:
|
||||
configure_azure_monitor()
|
||||
logger.info("Azure Monitor tracing enabled.")
|
||||
except Exception as exception:
|
||||
logger.warning(f"Azure Monitor tracing could not be enabled: {exception}")
|
||||
elif tracing_type == "opentelemetry":
|
||||
logger.info("OpenTelemetry tracing enabled.")
|
||||
configure_opentelemtry_tracing(settings, service_name, exporter)
|
||||
logger.info("OpenTelemetry tracing enabled.")
|
||||
else:
|
||||
raise Exception(f"Unknown tracing type: {tracing_type}")
|
||||
logger.warning(f"Unknown tracing type: {tracing_type}. Tracing could not be enabled.")
|
||||
|
||||
|
||||
def configure_opentelemtry_tracing(settings: Dynaconf, service_name: str = None, exporter: SpanExporter = None):
|
||||
|
||||
@ -1,3 +1,4 @@
|
|||||||||
import inspect
|
|||||||||
import logging
|
|||||||||
import threading
|
|||||||||
from typing import Callable
|
|||||||||
@ -8,15 +9,10 @@ from fastapi import FastAPI
|
|||||||||
|
|||||||||
from pyinfra.config.loader import validate_settings
|
|||||||||
from pyinfra.config.validators import webserver_validators
|
|||||||||
from pyinfra.utils.opentelemetry import instrument_app, setup_trace
|
|||||||||
|
|||||||||
|
|||||||||
def create_webserver_thread_from_settings(app: FastAPI, settings: Dynaconf) -> threading.Thread:
|
|||||||||
validate_settings(settings, validators=webserver_validators)
|
|||||||||
|
|||||||||
if settings.tracing.enabled:
|
|||||||||
return create_webserver_thread_with_tracing(app, settings)
|
|||||||||
|
|||||||||
return create_webserver_thread(app=app, port=settings.webserver.port, host=settings.webserver.host)
|
|||||||||
|
|||||||||
|
|||||||||
@ -29,18 +25,6 @@ def create_webserver_thread(app: FastAPI, port: int, host: str) -> threading.Thr
|
|||||||||
return thread
|
|||||||||
|
|||||||||
|
|||||||||
def create_webserver_thread_with_tracing(app: FastAPI, settings: Dynaconf) -> threading.Thread:
|
def inner():
|
||||||||
setup_trace(settings)
|
|||||||||
instrument_app(app)
|
|||||||||
uvicorn.run(app, port=settings.webserver.port, host=settings.webserver.host, log_level=logging.WARNING)
|
|||||||||
|
|||||||||
thread = threading.Thread(target=inner)
|
|||||||||
thread.daemon = True
|
|||||||||
|
|||||||||
return thread
|
|||||||||
|
|||||||||
|
|||||||||
HealthFunction = Callable[[], bool]
|
|||||||||
|
|||||||||
|
|||||||||
@ -48,13 +32,23 @@ def add_health_check_endpoint(app: FastAPI, health_function: HealthFunction) ->
|
|||||||||
"""Add a health check endpoint to the app. The health function should return True if the service is healthy,
|
|||||||||
and False otherwise. The health function is called when the endpoint is hit.
|
|||||||||
"""
|
|||||||||
if inspect.iscoroutinefunction(health_function):
|
|||||||||
|
|||||||||
@app.get("/health")
|
|||||||||
@app.get("/ready")
|
|||||||||
def check_health():
|
|||||||||
if health_function():
|
|||||||||
return {"status": "OK"}, 200
|
|||||||||
else:
|
|||||||||
@app.get("/health")
|
|||||||||
@app.get("/ready")
|
|||||||||
async def async_check_health():
|
|||||||||
alive = await health_function()
|
|||||||||
if alive:
|
|||||||||
return {"status": "OK"}, 200
|
|||||||||
return {"status": "Service Unavailable"}, 503
|
|||||||||
|
|||||||||
else:
|
|||||||||
|
|||||||||
@app.get("/health")
|
|||||||||
@app.get("/ready")
|
|||||||||
def check_health():
|
|||||||||
if health_function():
|
|||||||||
return {"status": "OK"}, 200
|
|||||||||
return {"status": "Service Unavailable"}, 503
|
|||||||||
|
|||||||||
return app
|
|||||||||
|
|||||||||
@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "pyinfra"
|
||||
version = "2.3.0"
|
||||
version = "3.0.0"
|
||||
description = ""
|
||||
authors = ["Team Research <research@knecon.com>"]
|
||||
license = "All rights reseverd"
|
||||
@ -34,6 +34,9 @@ opentelemetry-instrumentation-requests = "^0.46b0"
|
||||
opentelemetry-instrumentation-fastapi = "^0.46b0"
|
||||
wcwidth = "<=0.2.12"
|
||||
azure-monitor-opentelemetry = "^1.6.0"
|
||||
aio-pika = "^9.4.2"
|
||||
aiohttp = "^3.9.5"
|
||||
tenacity = "^8.5.0"
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
pytest = "^7"
|
||||
|
||||
150
scripts/send_async_request.py
Normal file
150
scripts/send_async_request.py
Normal file
@ -0,0 +1,150 @@
|
||||
import asyncio
|
||||
import gzip
|
||||
import json
|
||||
from operator import itemgetter
|
||||
from typing import Any, Dict
|
||||
|
||||
from aio_pika import Message
|
||||
from aio_pika.abc import AbstractIncomingMessage
|
||||
from kn_utils.logging import logger
|
||||
|
||||
from pyinfra.config.loader import load_settings, local_pyinfra_root_path
|
||||
from pyinfra.queue.async_manager import AsyncQueueManager, RabbitMQConfig
|
||||
from pyinfra.storage.storages.s3 import S3Storage, get_s3_storage_from_settings
|
||||
|
||||
settings = load_settings(local_pyinfra_root_path / "config/")
|
||||
|
||||
|
||||
async def dummy_message_processor(message: Dict[str, Any]) -> Dict[str, Any]:
|
||||
logger.info(f"Processing message: {message}")
|
||||
# await asyncio.sleep(1) # Simulate processing time
|
||||
|
||||
storage = get_s3_storage_from_settings(settings)
|
||||
tenant_id, dossier_id, file_id = itemgetter("tenantId", "dossierId", "fileId")(message)
|
||||
suffix = message["responseFileExtension"]
|
||||
|
||||
object_name = f"{tenant_id}/{dossier_id}/{file_id}.{message['targetFileExtension']}"
|
||||
original_content = json.loads(gzip.decompress(storage.get_object(object_name)))
|
||||
processed_content = {
|
||||
"processedPages": original_content["numberOfPages"],
|
||||
"processedSectionTexts": f"Processed: {original_content['sectionTexts']}",
|
||||
}
|
||||
|
||||
processed_object_name = f"{tenant_id}/{dossier_id}/{file_id}.{suffix}"
|
||||
processed_data = gzip.compress(json.dumps(processed_content).encode("utf-8"))
|
||||
storage.put_object(processed_object_name, processed_data)
|
||||
|
||||
processed_message = message.copy()
|
||||
processed_message["processed"] = True
|
||||
processed_message["processor_message"] = "This message was processed by the dummy processor"
|
||||
|
||||
logger.info(f"Finished processing message. Result: {processed_message}")
|
||||
return processed_message
|
||||
|
||||
|
||||
async def on_response_message_callback(storage: S3Storage):
|
||||
async def on_message(message: AbstractIncomingMessage) -> None:
|
||||
async with message.process(ignore_processed=True):
|
||||
if not message.body:
|
||||
raise ValueError
|
||||
response = json.loads(message.body)
|
||||
logger.info(f"Received {response}")
|
||||
logger.info(f"Message headers: {message.properties.headers}")
|
||||
await message.ack()
|
||||
tenant_id, dossier_id, file_id = itemgetter("tenantId", "dossierId", "fileId")(response)
|
||||
suffix = response["responseFileExtension"]
|
||||
result = storage.get_object(f"{tenant_id}/{dossier_id}/{file_id}.{suffix}")
|
||||
result = json.loads(gzip.decompress(result))
|
||||
logger.info(f"Contents of result on storage: {result}")
|
||||
|
||||
return on_message
|
||||
|
||||
|
||||
def upload_json_and_make_message_body(tenant_id: str):
|
||||
dossier_id, file_id, suffix = "dossier", "file", "json.gz"
|
||||
content = {
|
||||
"numberOfPages": 7,
|
||||
"sectionTexts": "data",
|
||||
}
|
||||
|
||||
object_name = f"{tenant_id}/{dossier_id}/{file_id}.{suffix}"
|
||||
data = gzip.compress(json.dumps(content).encode("utf-8"))
|
||||
|
||||
storage = get_s3_storage_from_settings(settings)
|
||||
if not storage.has_bucket():
|
||||
storage.make_bucket()
|
||||
storage.put_object(object_name, data)
|
||||
|
||||
message_body = {
|
||||
"tenantId": tenant_id,
|
||||
"dossierId": dossier_id,
|
||||
"fileId": file_id,
|
||||
"targetFileExtension": suffix,
|
||||
"responseFileExtension": f"result.{suffix}",
|
||||
}
|
||||
return message_body, storage
|
||||
|
||||
|
||||
async def test_rabbitmq_handler() -> None:
|
||||
tenant_service_url = settings.storage.tenant_server.endpoint
|
||||
|
||||
config = RabbitMQConfig(
|
||||
host=settings.rabbitmq.host,
|
||||
port=settings.rabbitmq.port,
|
||||
username=settings.rabbitmq.username,
|
||||
password=settings.rabbitmq.password,
|
||||
heartbeat=settings.rabbitmq.heartbeat,
|
||||
input_queue_prefix=settings.rabbitmq.service_request_queue_prefix,
|
||||
tenant_event_queue_suffix=settings.rabbitmq.tenant_event_queue_suffix,
|
||||
tenant_exchange_name=settings.rabbitmq.tenant_exchange_name,
|
||||
service_request_exchange_name=settings.rabbitmq.service_request_exchange_name,
|
||||
service_response_exchange_name=settings.rabbitmq.service_response_exchange_name,
|
||||
service_dead_letter_queue_name=settings.rabbitmq.service_dlq_name,
|
||||
queue_expiration_time=settings.rabbitmq.queue_expiration_time,
|
||||
pod_name=settings.kubernetes.pod_name,
|
||||
)
|
||||
|
||||
handler = AsyncQueueManager(config, tenant_service_url, dummy_message_processor)
|
||||
|
||||
await handler.connect()
|
||||
await handler.setup_exchanges()
|
||||
|
||||
tenant_id = "test_tenant"
|
||||
|
||||
# Test tenant creation
|
||||
create_message = {"tenantId": tenant_id}
|
||||
await handler.tenant_exchange.publish(
|
||||
Message(body=json.dumps(create_message).encode()), routing_key="tenant.created"
|
||||
)
|
||||
logger.info(f"Sent create tenant message for {tenant_id}")
|
||||
await asyncio.sleep(0.5) # Wait for queue creation
|
||||
|
||||
# Prepare service request
|
||||
service_request, storage = upload_json_and_make_message_body(tenant_id)
|
||||
|
||||
# Test service request
|
||||
await handler.input_exchange.publish(Message(body=json.dumps(service_request).encode()), routing_key=tenant_id)
|
||||
logger.info(f"Sent service request for {tenant_id}")
|
||||
await asyncio.sleep(5) # Wait for message processing
|
||||
|
||||
# Consume service request
|
||||
response_queue = await handler.channel.declare_queue(name=f"response_queue_{tenant_id}")
|
||||
await response_queue.bind(exchange=handler.output_exchange, routing_key=tenant_id)
|
||||
callback = await on_response_message_callback(storage)
|
||||
await response_queue.consume(callback=callback)
|
||||
|
||||
await asyncio.sleep(5) # Wait for message processing
|
||||
|
||||
# Test tenant deletion
|
||||
delete_message = {"tenantId": tenant_id}
|
||||
await handler.tenant_exchange.publish(
|
||||
Message(body=json.dumps(delete_message).encode()), routing_key="tenant.delete"
|
||||
)
|
||||
logger.info(f"Sent delete tenant message for {tenant_id}")
|
||||
await asyncio.sleep(0.5) # Wait for queue deletion
|
||||
|
||||
await handler.connection.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(test_rabbitmq_handler())
|
||||
@ -1,31 +1,41 @@
|
||||
version: '2'
|
||||
version: '3.8'
|
||||
services:
|
||||
minio:
|
||||
image: minio/minio:RELEASE.2022-06-11T19-55-32Z
|
||||
image: minio/minio:latest
|
||||
container_name: minio
|
||||
ports:
|
||||
- "9000:9000"
|
||||
environment:
|
||||
- MINIO_ROOT_PASSWORD=password
|
||||
- MINIO_ROOT_USER=root
|
||||
volumes:
|
||||
- /tmp/minio_store:/data
|
||||
- /tmp/data/minio_store:/data
|
||||
command: server /data
|
||||
network_mode: "bridge"
|
||||
network_mode: "bridge"
|
||||
extra_hosts:
|
||||
- "host.docker.internal:host-gateway"
|
||||
rabbitmq:
|
||||
image: docker.io/bitnami/rabbitmq:3.9.8
|
||||
image: docker.io/bitnami/rabbitmq:latest
|
||||
container_name: rabbitmq
|
||||
ports:
|
||||
- '4369:4369'
|
||||
- '5551:5551'
|
||||
- '5552:5552'
|
||||
# - '4369:4369'
|
||||
# - '5551:5551'
|
||||
# - '5552:5552'
|
||||
- '5672:5672'
|
||||
- '25672:25672'
|
||||
- '15672:15672'
|
||||
# - '25672:25672'
|
||||
environment:
|
||||
- RABBITMQ_SECURE_PASSWORD=yes
|
||||
- RABBITMQ_VM_MEMORY_HIGH_WATERMARK=100%
|
||||
- RABBITMQ_DISK_FREE_ABSOLUTE_LIMIT=20Gi
|
||||
- RABBITMQ_MANAGEMENT_ALLOW_WEB_ACCESS=true
|
||||
network_mode: "bridge"
|
||||
volumes:
|
||||
- /opt/bitnami/rabbitmq/.rabbitmq/:/data/bitnami
|
||||
volumes:
|
||||
mdata:
|
||||
- /tmp/bitnami/rabbitmq/.rabbitmq/:/data/bitnami
|
||||
healthcheck:
|
||||
test: [ "CMD", "curl", "-f", "http://localhost:15672" ]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 5
|
||||
extra_hosts:
|
||||
- "host.docker.internal:host-gateway"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user