Compare commits
101 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3ef4246d1e | ||
|
|
841c492639 | ||
|
|
ead069d3a7 | ||
|
|
044ea6cf0a | ||
|
|
ff7547e2c6 | ||
|
|
fbf79ef758 | ||
|
|
f382887d40 | ||
|
|
5c4400aa8b | ||
|
|
5ce66f18a0 | ||
|
|
ea0c55930a | ||
|
|
87f57e2244 | ||
|
|
3fb8c4e641 | ||
|
|
e23f63acf0 | ||
|
|
d3fecc518e | ||
|
|
341500d463 | ||
|
|
e002f77fd5 | ||
|
|
3c6d8f2dcc | ||
|
|
f6d6ba40bb | ||
|
|
6a0bbad108 | ||
|
|
527a671a75 | ||
|
|
cf91189728 | ||
|
|
61a6d0eeed | ||
|
|
bc0b355ff9 | ||
|
|
235e27b74c | ||
|
|
1540c2894e | ||
|
|
9b60594ce1 | ||
|
|
3d3c76b466 | ||
|
|
9d4ec84b49 | ||
|
|
8891249d7a | ||
|
|
e51e5c33eb | ||
|
|
04c90533b6 | ||
|
|
86af05c12c | ||
|
|
c6e336cb35 | ||
|
|
bf6f95f3e0 | ||
|
|
ed2bd1ec86 | ||
|
|
9906f68e0a | ||
|
|
0af648d66c | ||
|
|
46dc1fdce4 | ||
|
|
bd2f0b9b9a | ||
|
|
131afd7d3e | ||
|
|
98532c60ed | ||
|
|
45377ba172 | ||
|
|
f855224e29 | ||
|
|
541219177f | ||
|
|
4119a7d7d7 | ||
|
|
e2edfa7260 | ||
|
|
b70b16c541 | ||
|
|
e8d9326e48 | ||
|
|
9669152e14 | ||
|
|
ed3f8088e1 | ||
|
|
66eaa9a748 | ||
|
|
3a04359320 | ||
|
|
b46fcbd977 | ||
|
|
e75df42bec | ||
|
|
3bab86fe83 | ||
|
|
c5d53b8665 | ||
|
|
09d39930e7 | ||
|
|
a81f1bf31a | ||
|
|
0783e95d22 | ||
|
|
8ec13502a9 | ||
|
|
43881de526 | ||
|
|
67c30a5620 | ||
|
|
8e21b2144c | ||
|
|
5b45cae9a0 | ||
|
|
f2a5a2ea0e | ||
|
|
2133933d25 | ||
|
|
4c8dc6ccc0 | ||
|
|
5f31e2b15f | ||
|
|
88aef57c5f | ||
|
|
2b129b35f4 | ||
|
|
facb9726f9 | ||
|
|
b6a2069a6a | ||
|
|
f626ef2e6f | ||
|
|
318779413a | ||
|
|
f27b1fbba1 | ||
|
|
f2018f9c86 | ||
|
|
a5167d1230 | ||
|
|
1e939febc2 | ||
|
|
564f2cbb43 | ||
|
|
fa44f36088 | ||
|
|
2970823cc1 | ||
|
|
dba348a621 | ||
|
|
5020e54dcc | ||
|
|
2bc332831e | ||
|
|
b3f1529be2 | ||
|
|
789f6a7f7c | ||
|
|
06ce8bbb22 | ||
|
|
536284ed84 | ||
|
|
aeac1c58f9 | ||
|
|
b12b1ce42b | ||
|
|
50b7a877e9 | ||
|
|
f3d0f24ea6 | ||
|
|
70d3a210a1 | ||
|
|
f935056fa9 | ||
|
|
f175633f30 | ||
|
|
ceac21c1ef | ||
|
|
0d232226fd | ||
|
|
9d55b3be89 | ||
|
|
edba6fc4da | ||
|
|
c5d8a6ed84 | ||
|
|
c16000c774 |
2
.dvc/.gitignore
vendored
Normal file
2
.dvc/.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
/config.local
|
||||
/cache
|
||||
5
.dvc/config
Normal file
5
.dvc/config
Normal file
@ -0,0 +1,5 @@
|
||||
[core]
|
||||
remote = azure
|
||||
['remote "azure"']
|
||||
url = azure://pyinfra-dvc
|
||||
connection_string =
|
||||
3
.dvcignore
Normal file
3
.dvcignore
Normal file
@ -0,0 +1,3 @@
|
||||
# Add patterns of files dvc should ignore, which could improve
|
||||
# the performance. Learn more at
|
||||
# https://dvc.org/doc/user-guide/dvcignore
|
||||
@ -1,49 +1,23 @@
|
||||
# CI for services, check gitlab repo for python package CI
|
||||
include:
|
||||
- project: "Gitlab/gitlab"
|
||||
ref: 0.3.0
|
||||
file: "/ci-templates/research/python_pkg_venv_test_build_release_gitlab-ci.yml"
|
||||
|
||||
default:
|
||||
image: python:3.10
|
||||
ref: main
|
||||
file: "/ci-templates/research/python_pkg-test-build-release.gitlab-ci.yml"
|
||||
|
||||
# set project variables here
|
||||
variables:
|
||||
PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip"
|
||||
GITLAB_PYPI_URL: https://gitlab.knecon.com/api/v4/projects/${CI_PROJECT_ID}/packages/pypi
|
||||
PYPI_REGISTRY_RESEARCH: https://gitlab.knecon.com/api/v4/groups/19/-/packages/pypi
|
||||
POETRY_SOURCE_REF_RESEARCH: gitlab-research
|
||||
PYPI_REGISTRY_RED: https://gitlab.knecon.com/api/v4/groups/12/-/packages/pypi
|
||||
POETRY_SOURCE_REF_RED: gitlab-red
|
||||
PYPI_REGISTRY_FFORESIGHT: https://gitlab.knecon.com/api/v4/groups/269/-/packages/pypi
|
||||
POETRY_SOURCE_REF_FFORESIGHT: gitlab-fforesight
|
||||
# POETRY_HOME: /opt/poetry
|
||||
NEXUS_PROJECT_DIR: research # subfolder in Nexus docker-gin where your container will be stored
|
||||
IMAGENAME: $CI_PROJECT_NAME # if the project URL is gitlab.example.com/group-name/project-1, CI_PROJECT_NAME is project-1
|
||||
REPORTS_DIR: reports
|
||||
FF_USE_FASTZIP: "true" # enable fastzip - a faster zip implementation that also supports level configuration.
|
||||
ARTIFACT_COMPRESSION_LEVEL: default # can also be set to fastest, fast, slow and slowest. If just enabling fastzip is not enough try setting this to fastest or fast.
|
||||
CACHE_COMPRESSION_LEVEL: default # same as above, but for caches
|
||||
# TRANSFER_METER_FREQUENCY: 5s # will display transfer progress every 5 seconds for artifacts and remote caches. For debugging purposes.
|
||||
|
||||
setup-poetry-venv:
|
||||
stage: setup
|
||||
script:
|
||||
- env # check env vars
|
||||
# install poetry & return versions
|
||||
- pip install --upgrade pip
|
||||
- pip -V
|
||||
- python -V
|
||||
- pip install poetry
|
||||
- poetry -V
|
||||
# configure poetry
|
||||
- poetry config installer.max-workers 10
|
||||
- poetry config virtualenvs.in-project true
|
||||
- poetry config repositories.${POETRY_SOURCE_REF_RESEARCH} ${PYPI_REGISTRY_RESEARCH}
|
||||
- poetry config http-basic.${POETRY_SOURCE_REF_RESEARCH} ${CI_REGISTRY_USER} ${CI_JOB_TOKEN}
|
||||
- poetry config repositories.${POETRY_SOURCE_REF_RED} ${PYPI_REGISTRY_RED}
|
||||
- poetry config http-basic.${POETRY_SOURCE_REF_RED} ${CI_REGISTRY_USER} ${CI_JOB_TOKEN}
|
||||
- poetry config repositories.${POETRY_SOURCE_REF_FFORESIGHT} ${PYPI_REGISTRY_FFORESIGHT}
|
||||
- poetry config http-basic.${POETRY_SOURCE_REF_FFORESIGHT} ${CI_REGISTRY_USER} ${CI_JOB_TOKEN}
|
||||
# create and activate venv
|
||||
- poetry env use $(which python)
|
||||
- source .venv/bin/activate
|
||||
- python -m ensurepip
|
||||
- env # check env vars again
|
||||
# install from poetry.lock file
|
||||
- poetry install --all-extras -vvv
|
||||
|
||||
run-tests:
|
||||
script:
|
||||
- echo "Disabled until we have an automated way to run docker compose before tests."
|
||||
############
|
||||
# UNIT TESTS
|
||||
unit-tests:
|
||||
variables:
|
||||
###### UPDATE/EDIT ######
|
||||
UNIT_TEST_DIR: "tests/unit_test"
|
||||
|
||||
@ -5,7 +5,7 @@ default_language_version:
|
||||
python: python3.10
|
||||
repos:
|
||||
- repo: https://github.com/pre-commit/pre-commit-hooks
|
||||
rev: v4.6.0
|
||||
rev: v5.0.0
|
||||
hooks:
|
||||
- id: trailing-whitespace
|
||||
- id: end-of-file-fixer
|
||||
@ -26,6 +26,7 @@ repos:
|
||||
rev: v3.0.0a5
|
||||
hooks:
|
||||
- id: pylint
|
||||
language: system
|
||||
args:
|
||||
- --disable=C0111,R0903
|
||||
- --max-line-length=120
|
||||
@ -38,7 +39,7 @@ repos:
|
||||
- --profile black
|
||||
|
||||
- repo: https://github.com/psf/black
|
||||
rev: 24.4.2
|
||||
rev: 24.10.0
|
||||
hooks:
|
||||
- id: black
|
||||
# exclude: ^(docs/|notebooks/|data/|src/secrets/)
|
||||
@ -46,7 +47,7 @@ repos:
|
||||
- --line-length=120
|
||||
|
||||
- repo: https://github.com/compilerla/conventional-pre-commit
|
||||
rev: v3.2.0
|
||||
rev: v3.6.0
|
||||
hooks:
|
||||
- id: conventional-pre-commit
|
||||
pass_filenames: false
|
||||
|
||||
@ -1 +1 @@
|
||||
3.10.12
|
||||
3.10
|
||||
|
||||
20
README.md
20
README.md
@ -6,6 +6,7 @@
|
||||
4. [ Module Installation ](#module-installation)
|
||||
5. [ Scripts ](#scripts)
|
||||
6. [ Tests ](#tests)
|
||||
7. [ Opentelemetry protobuf dependency hell ](#opentelemetry-protobuf-dependency-hell)
|
||||
|
||||
## About
|
||||
|
||||
@ -33,7 +34,7 @@ the [complete example](pyinfra/examples.py).
|
||||
| Environment Variable | Internal / .toml Name | Description |
|
||||
| ------------------------------------------ | --------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| LOGGING\_\_LEVEL | logging.level | Log level |
|
||||
| CONCURRENCY\_\_ENABLED | concurrency.enabled | Enable multi tenant queue mode |
|
||||
| DYNAMIC_TENANT_QUEUES\_\_ENABLED | dynamic_tenant_queues.enabled | Enable queues per tenant that are dynamically created mode |
|
||||
| METRICS\_\_PROMETHEUS\_\_ENABLED | metrics.prometheus.enabled | Enable Prometheus metrics collection |
|
||||
| METRICS\_\_PROMETHEUS\_\_PREFIX | metrics.prometheus.prefix | Prefix for Prometheus metrics (e.g. {product}-{service}) |
|
||||
| WEBSERVER\_\_HOST | webserver.host | Host of the webserver (offering e.g. /prometheus, /ready and /health endpoints) |
|
||||
@ -72,6 +73,17 @@ the [complete example](pyinfra/examples.py).
|
||||
| TRACING\_\_OPENTELEMETRY\_\_EXPORTER | tracing.opentelemetry.exporter | Name of exporter |
|
||||
| KUBERNETES\_\_POD_NAME | kubernetes.pod_name | Service pod name |
|
||||
|
||||
## Setup
|
||||
**IMPORTANT** you need to set the following environment variables before running the setup script:
|
||||
- ``$NEXUS_USER`` your Nexus user (usually equal to firstname.lastname@knecon.com)
|
||||
- ``$NEXUS_PASSWORD`` your Nexus password (usually equal to your Azure Login)
|
||||
|
||||
```shell
|
||||
# create venv and activate it
|
||||
source ./scripts/setup/devenvsetup.sh {{ cookiecutter.python_version }} $NEXUS_USER $NEXUS_PASSWORD
|
||||
source .venv/bin/activate
|
||||
```
|
||||
|
||||
### OpenTelemetry
|
||||
|
||||
Open telemetry (vis its Python SDK) is set up to be as unobtrusive as possible; for typical use cases it can be
|
||||
@ -200,3 +212,9 @@ $ python scripts/send_request.py
|
||||
|
||||
Tests require a running minio and rabbitmq container, meaning you have to run `docker compose up` in the tests folder
|
||||
before running the tests.
|
||||
|
||||
## OpenTelemetry Protobuf Dependency Hell
|
||||
|
||||
**Note**: Status 2025/01/09: the currently used `opentelemetry-exporter-otlp-proto-http` version `1.25.0` requires
|
||||
a `protobuf` version < `5.x.x` and is not compatible with the latest protobuf version `5.27.x`. This is an [open issue](https://github.com/open-telemetry/opentelemetry-python/issues/3958) in opentelemetry, because [support for 4.25.x ends in Q2 '25](https://protobuf.dev/support/version-support/#python).
|
||||
Therefore, we should keep this in mind and update the dependency once opentelemetry includes support for `protobuf 5.27.x`.
|
||||
|
||||
6380
poetry.lock
generated
6380
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@ -1,5 +1,9 @@
|
||||
import asyncio
|
||||
import signal
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from aiormq.exceptions import AMQPConnectionError
|
||||
from dynaconf import Dynaconf
|
||||
from fastapi import FastAPI
|
||||
from kn_utils.logging import logger
|
||||
@ -16,8 +20,79 @@ from pyinfra.webserver.prometheus import (
|
||||
from pyinfra.webserver.utils import (
|
||||
add_health_check_endpoint,
|
||||
create_webserver_thread_from_settings,
|
||||
run_async_webserver,
|
||||
)
|
||||
|
||||
shutdown_flag = False
|
||||
|
||||
|
||||
async def graceful_shutdown(manager: AsyncQueueManager, queue_task, webserver_task):
|
||||
global shutdown_flag
|
||||
shutdown_flag = True
|
||||
logger.info("SIGTERM received, shutting down gracefully...")
|
||||
|
||||
if queue_task and not queue_task.done():
|
||||
queue_task.cancel()
|
||||
|
||||
# await queue manager shutdown
|
||||
await asyncio.gather(queue_task, manager.shutdown(), return_exceptions=True)
|
||||
|
||||
if webserver_task and not webserver_task.done():
|
||||
webserver_task.cancel()
|
||||
|
||||
# await webserver shutdown
|
||||
await asyncio.gather(webserver_task, return_exceptions=True)
|
||||
|
||||
logger.info("Shutdown complete.")
|
||||
|
||||
|
||||
async def run_async_queues(manager: AsyncQueueManager, app, port, host):
|
||||
"""Run the async webserver and the async queue manager concurrently."""
|
||||
queue_task = None
|
||||
webserver_task = None
|
||||
tenant_api_available = True
|
||||
|
||||
# add signal handler for SIGTERM and SIGINT
|
||||
loop = asyncio.get_running_loop()
|
||||
loop.add_signal_handler(
|
||||
signal.SIGTERM, lambda: asyncio.create_task(graceful_shutdown(manager, queue_task, webserver_task))
|
||||
)
|
||||
loop.add_signal_handler(
|
||||
signal.SIGINT, lambda: asyncio.create_task(graceful_shutdown(manager, queue_task, webserver_task))
|
||||
)
|
||||
|
||||
try:
|
||||
active_tenants = await manager.fetch_active_tenants()
|
||||
|
||||
queue_task = asyncio.create_task(manager.run(active_tenants=active_tenants), name="queues")
|
||||
webserver_task = asyncio.create_task(run_async_webserver(app, port, host), name="webserver")
|
||||
await asyncio.gather(queue_task, webserver_task)
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Main task was cancelled, initiating shutdown.")
|
||||
except AMQPConnectionError as e:
|
||||
logger.warning(f"AMQPConnectionError: {e} - shutting down.")
|
||||
except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError):
|
||||
logger.warning("Tenant server did not answer - shutting down.")
|
||||
tenant_api_available = False
|
||||
except Exception as e:
|
||||
logger.error(f"An error occurred while running async queues: {e}", exc_info=True)
|
||||
sys.exit(1)
|
||||
finally:
|
||||
if shutdown_flag:
|
||||
logger.debug("Graceful shutdown already in progress.")
|
||||
else:
|
||||
logger.warning("Initiating shutdown due to error or manual interruption.")
|
||||
if not tenant_api_available:
|
||||
sys.exit(0)
|
||||
if queue_task and not queue_task.done():
|
||||
queue_task.cancel()
|
||||
|
||||
if webserver_task and not webserver_task.done():
|
||||
webserver_task.cancel()
|
||||
|
||||
await asyncio.gather(queue_task, manager.shutdown(), webserver_task, return_exceptions=True)
|
||||
logger.info("Shutdown complete.")
|
||||
|
||||
|
||||
def start_standard_queue_consumer(
|
||||
callback: Callback,
|
||||
@ -45,10 +120,11 @@ def start_standard_queue_consumer(
|
||||
if settings.tracing.enabled:
|
||||
setup_trace(settings)
|
||||
|
||||
instrument_pika()
|
||||
instrument_pika(dynamic_queues=settings.dynamic_tenant_queues.enabled)
|
||||
instrument_app(app)
|
||||
|
||||
if settings.concurrency.enabled:
|
||||
if settings.dynamic_tenant_queues.enabled:
|
||||
logger.info("Dynamic tenant queues enabled. Running async queues.")
|
||||
config = RabbitMQConfig(
|
||||
host=settings.rabbitmq.host,
|
||||
port=settings.rabbitmq.port,
|
||||
@ -65,19 +141,29 @@ def start_standard_queue_consumer(
|
||||
pod_name=settings.kubernetes.pod_name,
|
||||
)
|
||||
manager = AsyncQueueManager(
|
||||
config=config, tenant_service_url=settings.storage.tenant_server.endpoint, message_processor=callback
|
||||
config=config,
|
||||
tenant_service_url=settings.storage.tenant_server.endpoint,
|
||||
message_processor=callback,
|
||||
max_concurrent_tasks=(
|
||||
settings.asyncio.max_concurrent_tasks if hasattr(settings.asyncio, "max_concurrent_tasks") else 10
|
||||
),
|
||||
)
|
||||
else:
|
||||
logger.info("Dynamic tenant queues disabled. Running sync queues.")
|
||||
manager = QueueManager(settings)
|
||||
|
||||
app = add_health_check_endpoint(app, manager.is_ready)
|
||||
|
||||
webserver_thread = create_webserver_thread_from_settings(app, settings)
|
||||
webserver_thread.start()
|
||||
|
||||
if isinstance(manager, AsyncQueueManager):
|
||||
asyncio.run(manager.run())
|
||||
asyncio.run(run_async_queues(manager, app, port=settings.webserver.port, host=settings.webserver.host))
|
||||
|
||||
elif isinstance(manager, QueueManager):
|
||||
manager.start_consuming(callback)
|
||||
webserver = create_webserver_thread_from_settings(app, settings)
|
||||
webserver.start()
|
||||
try:
|
||||
manager.start_consuming(callback)
|
||||
except Exception as e:
|
||||
logger.error(f"An error occurred while consuming messages: {e}", exc_info=True)
|
||||
sys.exit(1)
|
||||
else:
|
||||
logger.warning(f"Behavior for type {type(manager)} is not defined")
|
||||
|
||||
@ -1,12 +1,11 @@
|
||||
import asyncio
|
||||
import concurrent.futures
|
||||
import json
|
||||
import signal
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Callable, Dict, Set
|
||||
|
||||
import aiohttp
|
||||
from aio_pika import ExchangeType, IncomingMessage, Message, connect_robust
|
||||
from aio_pika import ExchangeType, IncomingMessage, Message, connect
|
||||
from aio_pika.abc import (
|
||||
AbstractChannel,
|
||||
AbstractConnection,
|
||||
@ -14,13 +13,14 @@ from aio_pika.abc import (
|
||||
AbstractIncomingMessage,
|
||||
AbstractQueue,
|
||||
)
|
||||
from kn_utils.logging import logger
|
||||
from tenacity import (
|
||||
retry,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
wait_exponential_jitter,
|
||||
from aio_pika.exceptions import (
|
||||
ChannelClosed,
|
||||
ChannelInvalidStateError,
|
||||
ConnectionClosed,
|
||||
)
|
||||
from aiormq.exceptions import AMQPConnectionError
|
||||
from kn_utils.logging import logger
|
||||
from kn_utils.retry import retry
|
||||
|
||||
|
||||
@dataclass
|
||||
@ -57,10 +57,12 @@ class AsyncQueueManager:
|
||||
config: RabbitMQConfig,
|
||||
tenant_service_url: str,
|
||||
message_processor: Callable[[Dict[str, Any]], Dict[str, Any]],
|
||||
max_concurrent_tasks: int = 10,
|
||||
):
|
||||
self.config = config
|
||||
self.tenant_service_url = tenant_service_url
|
||||
self.message_processor = message_processor
|
||||
self.semaphore = asyncio.Semaphore(max_concurrent_tasks)
|
||||
|
||||
self.connection: AbstractConnection | None = None
|
||||
self.channel: AbstractChannel | None = None
|
||||
@ -73,15 +75,42 @@ class AsyncQueueManager:
|
||||
|
||||
self.message_count: int = 0
|
||||
|
||||
@retry(tries=5, exceptions=AMQPConnectionError, reraise=True, logger=logger)
|
||||
async def connect(self) -> None:
|
||||
self.connection = await connect_robust(**self.config.connection_params)
|
||||
logger.info("Attempting to connect to RabbitMQ...")
|
||||
self.connection = await connect(**self.config.connection_params)
|
||||
self.connection.close_callbacks.add(self.on_connection_close)
|
||||
self.channel = await self.connection.channel()
|
||||
await self.channel.set_qos(prefetch_count=1)
|
||||
logger.info("Successfully connected to RabbitMQ")
|
||||
|
||||
async def on_connection_close(self, sender, exc):
|
||||
"""This is a callback for unexpected connection closures."""
|
||||
logger.debug(f"Sender: {sender}")
|
||||
if isinstance(exc, ConnectionClosed):
|
||||
logger.warning("Connection to RabbitMQ lost. Attempting to reconnect...")
|
||||
try:
|
||||
active_tenants = await self.fetch_active_tenants()
|
||||
await self.run(active_tenants=active_tenants)
|
||||
logger.debug("Reconnected to RabbitMQ successfully")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to reconnect to RabbitMQ: {e}")
|
||||
# cancel queue manager and webserver to shutdown service
|
||||
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
|
||||
[task.cancel() for task in tasks if task.get_name() in ["queues", "webserver"]]
|
||||
else:
|
||||
logger.debug("Connection closed on purpose.")
|
||||
|
||||
async def is_ready(self) -> bool:
|
||||
await self.connect()
|
||||
return await self.channel.is_open
|
||||
if self.connection is None or self.connection.is_closed:
|
||||
try:
|
||||
await self.connect()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to RabbitMQ: {e}")
|
||||
return False
|
||||
return True
|
||||
|
||||
@retry(tries=5, exceptions=(AMQPConnectionError, ChannelInvalidStateError), reraise=True, logger=logger)
|
||||
async def setup_exchanges(self) -> None:
|
||||
self.tenant_exchange = await self.channel.declare_exchange(
|
||||
self.config.tenant_exchange_name, ExchangeType.TOPIC, durable=True
|
||||
@ -93,6 +122,12 @@ class AsyncQueueManager:
|
||||
self.config.service_response_exchange_name, ExchangeType.DIRECT, durable=True
|
||||
)
|
||||
|
||||
# we must declare DLQ to handle error messages
|
||||
self.dead_letter_queue = await self.channel.declare_queue(
|
||||
self.config.service_dead_letter_queue_name, durable=True
|
||||
)
|
||||
|
||||
@retry(tries=5, exceptions=(AMQPConnectionError, ChannelInvalidStateError), reraise=True, logger=logger)
|
||||
async def setup_tenant_queue(self) -> None:
|
||||
self.tenant_exchange_queue = await self.channel.declare_queue(
|
||||
f"{self.config.pod_name}_{self.config.tenant_event_queue_suffix}",
|
||||
@ -101,7 +136,6 @@ class AsyncQueueManager:
|
||||
"x-dead-letter-exchange": "",
|
||||
"x-dead-letter-routing-key": self.config.service_dead_letter_queue_name,
|
||||
"x-expires": self.config.queue_expiration_time,
|
||||
"x-max-priority": 2,
|
||||
},
|
||||
)
|
||||
await self.tenant_exchange_queue.bind(self.tenant_exchange, routing_key="tenant.*")
|
||||
@ -110,34 +144,38 @@ class AsyncQueueManager:
|
||||
)
|
||||
|
||||
async def process_tenant_message(self, message: AbstractIncomingMessage) -> None:
|
||||
async with message.process():
|
||||
message_body = json.loads(message.body.decode())
|
||||
logger.debug(f"Tenant message received: {message_body}")
|
||||
tenant_id = message_body["tenantId"]
|
||||
routing_key = message.routing_key
|
||||
try:
|
||||
async with message.process():
|
||||
message_body = json.loads(message.body.decode())
|
||||
logger.debug(f"Tenant message received: {message_body}")
|
||||
tenant_id = message_body["tenantId"]
|
||||
routing_key = message.routing_key
|
||||
|
||||
if routing_key == "tenant.created":
|
||||
await self.create_tenant_queues(tenant_id)
|
||||
elif routing_key == "tenant.delete":
|
||||
await self.delete_tenant_queues(tenant_id)
|
||||
if routing_key == "tenant.created":
|
||||
await self.create_tenant_queues(tenant_id)
|
||||
elif routing_key == "tenant.delete":
|
||||
await self.delete_tenant_queues(tenant_id)
|
||||
except Exception as e:
|
||||
logger.error(e, exc_info=True)
|
||||
|
||||
async def create_tenant_queues(self, tenant_id: str) -> None:
|
||||
queue_name = f"{self.config.input_queue_prefix}_{tenant_id}"
|
||||
logger.info(f"Declaring queue: {queue_name}")
|
||||
input_queue = await self.channel.declare_queue(
|
||||
queue_name,
|
||||
durable=True,
|
||||
arguments={
|
||||
"x-dead-letter-exchange": "",
|
||||
"x-dead-letter-routing-key": self.config.service_dead_letter_queue_name,
|
||||
"x-expires": self.config.queue_expiration_time,
|
||||
"x-max-priority": 2,
|
||||
},
|
||||
)
|
||||
await input_queue.bind(self.input_exchange, routing_key=tenant_id)
|
||||
self.consumer_tags[tenant_id] = await input_queue.consume(self.process_input_message)
|
||||
self.tenant_queues[tenant_id] = input_queue
|
||||
logger.info(f"Created queues for tenant {tenant_id}")
|
||||
try:
|
||||
input_queue = await self.channel.declare_queue(
|
||||
queue_name,
|
||||
durable=True,
|
||||
arguments={
|
||||
"x-dead-letter-exchange": "",
|
||||
"x-dead-letter-routing-key": self.config.service_dead_letter_queue_name,
|
||||
},
|
||||
)
|
||||
await input_queue.bind(self.input_exchange, routing_key=tenant_id)
|
||||
self.consumer_tags[tenant_id] = await input_queue.consume(self.process_input_message)
|
||||
self.tenant_queues[tenant_id] = input_queue
|
||||
logger.info(f"Created and started consuming queue for tenant {tenant_id}")
|
||||
except Exception as e:
|
||||
logger.error(e, exc_info=True)
|
||||
|
||||
async def delete_tenant_queues(self, tenant_id: str) -> None:
|
||||
if tenant_id in self.tenant_queues:
|
||||
@ -149,7 +187,14 @@ class AsyncQueueManager:
|
||||
|
||||
async def process_input_message(self, message: IncomingMessage) -> None:
|
||||
async def process_message_body_and_await_result(unpacked_message_body):
|
||||
return self.message_processor(unpacked_message_body)
|
||||
async with self.semaphore:
|
||||
loop = asyncio.get_running_loop()
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_pool_executor:
|
||||
logger.info("Processing payload in a separate thread.")
|
||||
result = await loop.run_in_executor(
|
||||
thread_pool_executor, self.message_processor, unpacked_message_body
|
||||
)
|
||||
return result
|
||||
|
||||
async with message.process(ignore_processed=True):
|
||||
if message.redelivered:
|
||||
@ -189,14 +234,13 @@ class AsyncQueueManager:
|
||||
|
||||
except json.JSONDecodeError:
|
||||
await message.nack(requeue=False)
|
||||
logger.error(f"Invalid JSON in input message: {message.body}")
|
||||
logger.error(f"Invalid JSON in input message: {message.body}", exc_info=True)
|
||||
except FileNotFoundError as e:
|
||||
logger.warning(f"{e}, declining message with {message.delivery_tag=}.")
|
||||
logger.warning(f"{e}, declining message with {message.delivery_tag=}.", exc_info=True)
|
||||
await message.nack(requeue=False)
|
||||
except Exception as e:
|
||||
await message.nack(requeue=False)
|
||||
logger.error(f"Error processing input message: {e}", exc_info=True)
|
||||
raise
|
||||
finally:
|
||||
self.message_count -= 1
|
||||
|
||||
@ -207,12 +251,7 @@ class AsyncQueueManager:
|
||||
)
|
||||
logger.info(f"Published result to queue {tenant_id}.")
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_exponential_jitter(initial=1, max=10),
|
||||
retry=retry_if_exception_type(aiohttp.ClientResponseError),
|
||||
reraise=True,
|
||||
)
|
||||
@retry(tries=5, exceptions=(aiohttp.ClientResponseError, aiohttp.ClientConnectorError), reraise=True, logger=logger)
|
||||
async def fetch_active_tenants(self) -> Set[str]:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(self.tenant_service_url) as response:
|
||||
@ -226,53 +265,65 @@ class AsyncQueueManager:
|
||||
)
|
||||
return set()
|
||||
|
||||
async def initialize_tenant_queues(self) -> None:
|
||||
try:
|
||||
active_tenants = await self.fetch_active_tenants()
|
||||
except aiohttp.ClientResponseError:
|
||||
logger.warning("API calls to tenant server failed. No tenant queues initialized.")
|
||||
active_tenants = set()
|
||||
@retry(
|
||||
tries=5,
|
||||
exceptions=(
|
||||
AMQPConnectionError,
|
||||
ChannelInvalidStateError,
|
||||
),
|
||||
reraise=True,
|
||||
logger=logger,
|
||||
)
|
||||
async def initialize_tenant_queues(self, active_tenants: set) -> None:
|
||||
for tenant_id in active_tenants:
|
||||
await self.create_tenant_queues(tenant_id)
|
||||
|
||||
async def run(self) -> None:
|
||||
stop = asyncio.Event()
|
||||
async def run(self, active_tenants: set) -> None:
|
||||
|
||||
def signal_handler(*_):
|
||||
logger.info("Signal received, shutting down...")
|
||||
stop.set()
|
||||
await self.connect()
|
||||
await self.setup_exchanges()
|
||||
await self.initialize_tenant_queues(active_tenants=active_tenants)
|
||||
await self.setup_tenant_queue()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
for sig in (signal.SIGINT, signal.SIGTERM):
|
||||
loop.add_signal_handler(sig, signal_handler)
|
||||
logger.info("RabbitMQ handler is running. Press CTRL+C to exit.")
|
||||
|
||||
async def close_channels(self) -> None:
|
||||
try:
|
||||
await self.connect()
|
||||
await self.setup_exchanges()
|
||||
await self.initialize_tenant_queues()
|
||||
await self.setup_tenant_queue()
|
||||
|
||||
logger.info("RabbitMQ handler is running. Press CTRL+C to exit.")
|
||||
await stop.wait() # Run until stop signal received
|
||||
except asyncio.CancelledError:
|
||||
logger.warning("Operation cancelled.")
|
||||
if self.channel and not self.channel.is_closed:
|
||||
# Cancel queues to stop fetching messages
|
||||
logger.debug("Cancelling queues...")
|
||||
for tenant, queue in self.tenant_queues.items():
|
||||
await queue.cancel(self.consumer_tags[tenant])
|
||||
if self.tenant_exchange_queue:
|
||||
await self.tenant_exchange_queue.cancel(self.consumer_tags["tenant_exchange_queue"])
|
||||
while self.message_count != 0:
|
||||
logger.debug(f"Messages are still being processed: {self.message_count=} ")
|
||||
await asyncio.sleep(2)
|
||||
await self.channel.close(exc=asyncio.CancelledError)
|
||||
logger.debug("Channel closed.")
|
||||
else:
|
||||
logger.debug("No channel to close.")
|
||||
except ChannelClosed:
|
||||
logger.warning("Channel was already closed.")
|
||||
except ConnectionClosed:
|
||||
logger.warning("Connection was lost, unable to close channel.")
|
||||
except Exception as e:
|
||||
logger.error(f"An error occurred: {e}", exc_info=True)
|
||||
finally:
|
||||
await self.shutdown()
|
||||
logger.error(f"Error during channel shutdown: {e}")
|
||||
|
||||
async def close_connection(self) -> None:
|
||||
try:
|
||||
if self.connection and not self.connection.is_closed:
|
||||
await self.connection.close(exc=asyncio.CancelledError)
|
||||
logger.debug("Connection closed.")
|
||||
else:
|
||||
logger.debug("No connection to close.")
|
||||
except ConnectionClosed:
|
||||
logger.warning("Connection was already closed.")
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing connection: {e}")
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
logger.info("Shutting down RabbitMQ handler...")
|
||||
if self.channel:
|
||||
# Cancel queues to stop fetching messages
|
||||
logger.debug("Cancelling queues...")
|
||||
for tenant, queue in self.tenant_queues.items():
|
||||
await queue.cancel(self.consumer_tags[tenant])
|
||||
await self.tenant_exchange_queue.cancel(self.consumer_tags["tenant_exchange_queue"])
|
||||
while self.message_count != 0:
|
||||
logger.debug(f"Messages are still being processed: {self.message_count=} ")
|
||||
time.sleep(2)
|
||||
await self.channel.close()
|
||||
if self.connection:
|
||||
await self.connection.close()
|
||||
await self.close_channels()
|
||||
await self.close_connection()
|
||||
logger.info("RabbitMQ handler shut down successfully.")
|
||||
|
||||
@ -1,15 +1,16 @@
|
||||
from typing import Callable, Union
|
||||
from typing import Callable
|
||||
|
||||
from dynaconf import Dynaconf
|
||||
from kn_utils.logging import logger
|
||||
|
||||
from pyinfra.storage.connection import get_storage
|
||||
from pyinfra.storage.utils import (
|
||||
download_data_as_specified_in_message,
|
||||
download_data_bytes_as_specified_in_message,
|
||||
upload_data_as_specified_in_message,
|
||||
DownloadedData,
|
||||
)
|
||||
|
||||
DataProcessor = Callable[[Union[dict, bytes], dict], Union[dict, list, str]]
|
||||
DataProcessor = Callable[[dict[str, DownloadedData] | DownloadedData, dict], dict | list | str]
|
||||
Callback = Callable[[dict], dict]
|
||||
|
||||
|
||||
@ -28,7 +29,9 @@ def make_download_process_upload_callback(data_processor: DataProcessor, setting
|
||||
|
||||
storage = get_storage(settings, queue_message_payload.get("X-TENANT-ID"))
|
||||
|
||||
data = download_data_as_specified_in_message(storage, queue_message_payload)
|
||||
data: dict[str, DownloadedData] | DownloadedData = download_data_bytes_as_specified_in_message(
|
||||
storage, queue_message_payload
|
||||
)
|
||||
|
||||
result = data_processor(data, queue_message_payload)
|
||||
|
||||
|
||||
@ -10,8 +10,8 @@ import pika
|
||||
import pika.exceptions
|
||||
from dynaconf import Dynaconf
|
||||
from kn_utils.logging import logger
|
||||
from kn_utils.retry import retry
|
||||
from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection
|
||||
from retry import retry
|
||||
|
||||
from pyinfra.config.loader import validate_settings
|
||||
from pyinfra.config.validators import queue_manager_validators
|
||||
@ -42,6 +42,9 @@ class QueueManager:
|
||||
signal.signal(signal.SIGTERM, self._handle_stop_signal)
|
||||
signal.signal(signal.SIGINT, self._handle_stop_signal)
|
||||
|
||||
self.max_retries = settings.rabbitmq.max_retries or 5
|
||||
self.max_delay = settings.rabbitmq.max_delay or 60
|
||||
|
||||
@staticmethod
|
||||
def create_connection_parameters(settings: Dynaconf):
|
||||
credentials = pika.PlainCredentials(username=settings.rabbitmq.username, password=settings.rabbitmq.password)
|
||||
@ -54,9 +57,12 @@ class QueueManager:
|
||||
|
||||
return pika.ConnectionParameters(**pika_connection_params)
|
||||
|
||||
@retry(tries=3, delay=5, jitter=(1, 3), logger=logger)
|
||||
@retry(
|
||||
tries=5,
|
||||
exceptions=(pika.exceptions.AMQPConnectionError, pika.exceptions.ChannelClosedByBroker),
|
||||
reraise=True,
|
||||
)
|
||||
def establish_connection(self):
|
||||
# TODO: set sensible retry parameters
|
||||
if self.connection and self.connection.is_open:
|
||||
logger.debug("Connection to RabbitMQ already established.")
|
||||
return
|
||||
@ -79,19 +85,31 @@ class QueueManager:
|
||||
logger.info("Connection to RabbitMQ established, channel open.")
|
||||
|
||||
def is_ready(self):
|
||||
self.establish_connection()
|
||||
return self.channel.is_open
|
||||
try:
|
||||
self.establish_connection()
|
||||
return self.channel.is_open
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to establish connection: {e}")
|
||||
return False
|
||||
|
||||
@retry(exceptions=pika.exceptions.AMQPConnectionError, tries=3, delay=5, jitter=(1, 3), logger=logger)
|
||||
@retry(
|
||||
tries=5,
|
||||
exceptions=pika.exceptions.AMQPConnectionError,
|
||||
reraise=True,
|
||||
)
|
||||
def start_consuming(self, message_processor: Callable):
|
||||
on_message_callback = self._make_on_message_callback(message_processor)
|
||||
|
||||
try:
|
||||
self.establish_connection()
|
||||
self.channel.basic_consume(self.input_queue, on_message_callback)
|
||||
logger.info("Starting to consume messages...")
|
||||
self.channel.start_consuming()
|
||||
except Exception:
|
||||
logger.error("An unexpected error occurred while consuming messages. Consuming will stop.", exc_info=True)
|
||||
except pika.exceptions.AMQPConnectionError as e:
|
||||
logger.error(f"AMQP Connection Error: {e}")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"An unexpected error occurred while consuming messages: {e}", exc_info=True)
|
||||
raise
|
||||
finally:
|
||||
self.stop_consuming()
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
import gzip
|
||||
import json
|
||||
from functools import singledispatch
|
||||
from typing import Union
|
||||
from typing import TypedDict
|
||||
|
||||
from kn_utils.logging import logger
|
||||
from pydantic import BaseModel, ValidationError
|
||||
@ -52,28 +52,27 @@ class TenantIdDossierIdFileIdUploadPayload(BaseModel):
|
||||
|
||||
|
||||
class TargetResponseFilePathDownloadPayload(BaseModel):
|
||||
targetFilePath: Union[str, dict]
|
||||
targetFilePath: str | dict[str, str]
|
||||
|
||||
|
||||
class TargetResponseFilePathUploadPayload(BaseModel):
|
||||
responseFilePath: str
|
||||
|
||||
|
||||
def download_data_as_specified_in_message(storage: Storage, raw_payload: dict) -> Union[dict, bytes]:
|
||||
class DownloadedData(TypedDict):
|
||||
data: bytes
|
||||
file_path: str
|
||||
|
||||
|
||||
def download_data_bytes_as_specified_in_message(
|
||||
storage: Storage, raw_payload: dict
|
||||
) -> dict[str, DownloadedData] | DownloadedData:
|
||||
"""Convenience function to download a file specified in a message payload.
|
||||
Supports both legacy and new payload formats. Also supports downloading multiple files at once, which should
|
||||
be specified in a dictionary under the 'targetFilePath' key with the file path as value.
|
||||
|
||||
If the content is compressed with gzip (.gz), it will be decompressed (-> bytes).
|
||||
If the content is a json file, it will be decoded (-> dict).
|
||||
If no file is specified in the payload or the file does not exist in storage, an exception will be raised.
|
||||
In all other cases, the content will be returned as is (-> bytes).
|
||||
|
||||
This function can be extended in the future as needed (e.g. handling of more file types), but since further
|
||||
requirements are not specified at this point in time, and it is unclear what these would entail, the code is kept
|
||||
simple for now to improve readability, maintainability and avoid refactoring efforts of generic solutions that
|
||||
weren't as generic as they seemed.
|
||||
|
||||
The data is downloaded as bytes and returned as a dictionary with the file path as key and the data as value.
|
||||
In case of several download targets, a nested dictionary is returned with the same keys and dictionaries with
|
||||
the file path and data as values.
|
||||
"""
|
||||
|
||||
try:
|
||||
@ -92,26 +91,25 @@ def download_data_as_specified_in_message(storage: Storage, raw_payload: dict) -
|
||||
|
||||
|
||||
@singledispatch
|
||||
def _download(file_path_or_file_path_dict: Union[str, dict], storage: Storage) -> Union[dict, bytes]:
|
||||
def _download(
|
||||
file_path_or_file_path_dict: str | dict[str, str], storage: Storage
|
||||
) -> dict[str, DownloadedData] | DownloadedData:
|
||||
pass
|
||||
|
||||
|
||||
@_download.register(str)
|
||||
def _download_single_file(file_path: str, storage: Storage) -> bytes:
|
||||
def _download_single_file(file_path: str, storage: Storage) -> DownloadedData:
|
||||
if not storage.exists(file_path):
|
||||
raise FileNotFoundError(f"File '{file_path}' does not exist in storage.")
|
||||
|
||||
data = storage.get_object(file_path)
|
||||
|
||||
data = gzip.decompress(data) if ".gz" in file_path else data
|
||||
data = json.loads(data.decode("utf-8")) if ".json" in file_path else data
|
||||
logger.info(f"Downloaded {file_path} from storage.")
|
||||
|
||||
return data
|
||||
return DownloadedData(data=data, file_path=file_path)
|
||||
|
||||
|
||||
@_download.register(dict)
|
||||
def _download_multiple_files(file_path_dict: dict, storage: Storage) -> dict:
|
||||
def _download_multiple_files(file_path_dict: dict, storage: Storage) -> dict[str, DownloadedData]:
|
||||
return {key: _download(value, storage) for key, value in file_path_dict.items()}
|
||||
|
||||
|
||||
|
||||
@ -3,8 +3,10 @@ import json
|
||||
from azure.monitor.opentelemetry import configure_azure_monitor
|
||||
from dynaconf import Dynaconf
|
||||
from fastapi import FastAPI
|
||||
from kn_utils.logging import logger
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
|
||||
from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor
|
||||
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
|
||||
from opentelemetry.instrumentation.pika import PikaInstrumentor
|
||||
from opentelemetry.sdk.resources import Resource
|
||||
@ -18,7 +20,6 @@ from opentelemetry.sdk.trace.export import (
|
||||
|
||||
from pyinfra.config.loader import validate_settings
|
||||
from pyinfra.config.validators import opentelemetry_validators
|
||||
from kn_utils.logging import logger
|
||||
|
||||
|
||||
class JsonSpanExporter(SpanExporter):
|
||||
@ -84,8 +85,11 @@ def get_exporter(settings: Dynaconf):
|
||||
)
|
||||
|
||||
|
||||
def instrument_pika():
|
||||
PikaInstrumentor().instrument()
|
||||
def instrument_pika(dynamic_queues: bool):
|
||||
if dynamic_queues:
|
||||
AioPikaInstrumentor().instrument()
|
||||
else:
|
||||
PikaInstrumentor().instrument()
|
||||
|
||||
|
||||
def instrument_app(app: FastAPI, excluded_urls: str = "/health,/ready,/prometheus"):
|
||||
|
||||
@ -1,16 +1,33 @@
|
||||
import asyncio
|
||||
import inspect
|
||||
import logging
|
||||
import signal
|
||||
import threading
|
||||
import time
|
||||
from typing import Callable
|
||||
|
||||
import uvicorn
|
||||
from dynaconf import Dynaconf
|
||||
from fastapi import FastAPI
|
||||
from kn_utils.logging import logger
|
||||
from kn_utils.retry import retry
|
||||
|
||||
from pyinfra.config.loader import validate_settings
|
||||
from pyinfra.config.validators import webserver_validators
|
||||
|
||||
|
||||
class PyInfraUvicornServer(uvicorn.Server):
|
||||
# this is a workaround to enable custom signal handlers
|
||||
# https://github.com/encode/uvicorn/issues/1579
|
||||
def install_signal_handlers(self):
|
||||
pass
|
||||
|
||||
|
||||
@retry(
|
||||
tries=5,
|
||||
exceptions=Exception,
|
||||
reraise=True,
|
||||
)
|
||||
def create_webserver_thread_from_settings(app: FastAPI, settings: Dynaconf) -> threading.Thread:
|
||||
validate_settings(settings, validators=webserver_validators)
|
||||
return create_webserver_thread(app=app, port=settings.webserver.port, host=settings.webserver.host)
|
||||
@ -20,11 +37,43 @@ def create_webserver_thread(app: FastAPI, port: int, host: str) -> threading.Thr
|
||||
"""Creates a thread that runs a FastAPI webserver. Start with thread.start(), and join with thread.join().
|
||||
Note that the thread is a daemon thread, so it will be terminated when the main thread is terminated.
|
||||
"""
|
||||
thread = threading.Thread(target=lambda: uvicorn.run(app, port=port, host=host, log_level=logging.WARNING))
|
||||
|
||||
def run_server():
|
||||
retries = 5
|
||||
for attempt in range(retries):
|
||||
try:
|
||||
uvicorn.run(app, port=port, host=host, log_level=logging.WARNING)
|
||||
break
|
||||
except Exception as e:
|
||||
if attempt < retries - 1: # if it's not the last attempt
|
||||
logger.warning(f"Attempt {attempt + 1} failed to start the server: {e}. Retrying...")
|
||||
time.sleep(2**attempt) # exponential backoff
|
||||
else:
|
||||
logger.error(f"Failed to start the server after {retries} attempts: {e}")
|
||||
raise
|
||||
|
||||
thread = threading.Thread(target=run_server)
|
||||
thread.daemon = True
|
||||
return thread
|
||||
|
||||
|
||||
async def run_async_webserver(app: FastAPI, port: int, host: str):
|
||||
"""Run the FastAPI web server async."""
|
||||
config = uvicorn.Config(app, host=host, port=port, log_level=logging.WARNING)
|
||||
server = PyInfraUvicornServer(config)
|
||||
|
||||
try:
|
||||
await server.serve()
|
||||
except asyncio.CancelledError:
|
||||
logger.debug("Webserver was cancelled.")
|
||||
server.should_exit = True
|
||||
await server.shutdown()
|
||||
except Exception as e:
|
||||
logger.error(f"Error while running the webserver: {e}", exc_info=True)
|
||||
finally:
|
||||
logger.info("Webserver has been shut down.")
|
||||
|
||||
|
||||
HealthFunction = Callable[[], bool]
|
||||
|
||||
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "pyinfra"
|
||||
version = "3.1.0"
|
||||
version = "4.1.0"
|
||||
description = ""
|
||||
authors = ["Team Research <research@knecon.com>"]
|
||||
license = "All rights reseverd"
|
||||
@ -18,35 +18,43 @@ azure-storage-blob = "^12.13"
|
||||
# misc utils
|
||||
funcy = "^2"
|
||||
pycryptodome = "^3.19"
|
||||
# research shared packages
|
||||
kn-utils = { version = "^0.2.7", source = "gitlab-research" }
|
||||
fastapi = "^0.109.0"
|
||||
uvicorn = "^0.26.0"
|
||||
# [tool.poetry.group.telemetry.dependencies]
|
||||
opentelemetry-instrumentation-pika = "^0.46b0"
|
||||
opentelemetry-exporter-otlp = "^1.25.0"
|
||||
opentelemetry-instrumentation = "^0.46b0"
|
||||
opentelemetry-api = "^1.25.0"
|
||||
opentelemetry-sdk = "^1.25.0"
|
||||
opentelemetry-exporter-otlp-proto-http = "^1.25.0"
|
||||
opentelemetry-instrumentation-flask = "^0.46b0"
|
||||
opentelemetry-instrumentation-requests = "^0.46b0"
|
||||
opentelemetry-instrumentation-fastapi = "^0.46b0"
|
||||
|
||||
# DONT USE GROUPS BECAUSE THEY ARE NOT INSTALLED FOR PACKAGES
|
||||
# [tool.poetry.group.internal.dependencies] <<< THIS IS NOT WORKING
|
||||
kn-utils = { version = ">=0.4.0", source = "nexus" }
|
||||
# We set all opentelemetry dependencies to lower bound because the image classification service depends on a protobuf version <4, but does not use proto files.
|
||||
# Therefore, we allow latest possible protobuf version in the services which use proto files. As soon as the dependency issue is fixed set this to the latest possible opentelemetry version
|
||||
opentelemetry-instrumentation-pika = ">=0.46b0,<0.50"
|
||||
opentelemetry-exporter-otlp = ">=1.25.0,<1.29"
|
||||
opentelemetry-instrumentation = ">=0.46b0,<0.50"
|
||||
opentelemetry-api = ">=1.25.0,<1.29"
|
||||
opentelemetry-sdk = ">=1.25.0,<1.29"
|
||||
opentelemetry-exporter-otlp-proto-http = ">=1.25.0,<1.29"
|
||||
opentelemetry-instrumentation-flask = ">=0.46b0,<0.50"
|
||||
opentelemetry-instrumentation-requests = ">=0.46b0,<0.50"
|
||||
opentelemetry-instrumentation-fastapi = ">=0.46b0,<0.50"
|
||||
opentelemetry-instrumentation-aio-pika = ">=0.46b0,<0.50"
|
||||
wcwidth = "<=0.2.12"
|
||||
azure-monitor-opentelemetry = "^1.6.0"
|
||||
aio-pika = "^9.4.2"
|
||||
aiohttp = "^3.9.5"
|
||||
tenacity = "^8.5.0"
|
||||
|
||||
# THIS IS NOT AVAILABLE FOR SERVICES THAT IMPLEMENT PYINFRA
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
pytest = "^7"
|
||||
ipykernel = "^6.26.0"
|
||||
black = "^23.10"
|
||||
black = "^24.10"
|
||||
pylint = "^3"
|
||||
coverage = "^7.3"
|
||||
requests = "^2.31"
|
||||
pre-commit = "^3.6.0"
|
||||
cyclonedx-bom = "^4.1.1"
|
||||
dvc = "^3.51.2"
|
||||
dvc-azure = "^3.1.0"
|
||||
deepdiff = "^7.0.1"
|
||||
pytest-cov = "^5.0.0"
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
minversion = "6.0"
|
||||
@ -76,17 +84,18 @@ disable = [
|
||||
"R0904",
|
||||
"R0913",
|
||||
"R0914",
|
||||
"W0511"
|
||||
"W0511",
|
||||
]
|
||||
docstring-min-length = 3
|
||||
|
||||
[[tool.poetry.source]]
|
||||
name = "PyPI"
|
||||
name = "pypi-proxy"
|
||||
url = "https://nexus.knecon.com/repository/pypi-proxy/simple"
|
||||
priority = "primary"
|
||||
|
||||
[[tool.poetry.source]]
|
||||
name = "gitlab-research"
|
||||
url = "https://gitlab.knecon.com/api/v4/groups/19/-/packages/pypi/simple"
|
||||
name = "nexus"
|
||||
url = "https://nexus.knecon.com/repository/python/simple"
|
||||
priority = "explicit"
|
||||
|
||||
[build-system]
|
||||
|
||||
17
scripts/send_sigterm.py
Normal file
17
scripts/send_sigterm.py
Normal file
@ -0,0 +1,17 @@
|
||||
import os
|
||||
import signal
|
||||
import time
|
||||
|
||||
# BE CAREFUL WITH THIS SCRIPT - THIS SIMULATES A SIGTERM FROM KUBERNETES
|
||||
target_pid = int(input("Enter the PID of the target script: "))
|
||||
|
||||
print(f"Sending SIGTERM to PID {target_pid}...")
|
||||
time.sleep(1)
|
||||
|
||||
try:
|
||||
os.kill(target_pid, signal.SIGTERM)
|
||||
print("SIGTERM sent.")
|
||||
except ProcessLookupError:
|
||||
print("Process not found.")
|
||||
except PermissionError:
|
||||
print("Permission denied. Are you trying to signal a process you don't own?")
|
||||
39
scripts/setup/devenvsetup.sh
Normal file
39
scripts/setup/devenvsetup.sh
Normal file
@ -0,0 +1,39 @@
|
||||
#!/bin/bash
|
||||
python_version=$1
|
||||
nexus_user=$2
|
||||
nexus_password=$3
|
||||
|
||||
# cookiecutter https://gitlab.knecon.com/knecon/research/template-python-project.git --checkout master
|
||||
# latest_dir=$(ls -td -- */ | head -n 1) # should be the dir cookiecutter just created
|
||||
|
||||
# cd $latest_dir
|
||||
|
||||
pyenv install $python_version
|
||||
pyenv local $python_version
|
||||
pyenv shell $python_version
|
||||
|
||||
# install poetry globally (PREFERRED), only need to install it once
|
||||
# curl -sSL https://install.python-poetry.org | python3 -
|
||||
|
||||
# remember to update poetry once in a while
|
||||
poetry self update
|
||||
|
||||
# install poetry in current python environment, can lead to multiple instances of poetry being installed on one system (DISPREFERRED)
|
||||
# pip install --upgrade pip
|
||||
# pip install poetry
|
||||
|
||||
poetry config virtualenvs.in-project true
|
||||
poetry config installer.max-workers 10
|
||||
poetry config repositories.pypi-proxy "https://nexus.knecon.com/repository/pypi-proxy/simple"
|
||||
poetry config http-basic.pypi-proxy ${nexus_user} ${nexus_password}
|
||||
poetry config repositories.nexus https://nexus.knecon.com/repository/python/simple
|
||||
poetry config http-basic.nexus ${nexus_user} ${nexus_password}
|
||||
|
||||
poetry env use $(pyenv which python)
|
||||
poetry install --with=dev
|
||||
poetry update
|
||||
|
||||
source .venv/bin/activate
|
||||
|
||||
pre-commit install
|
||||
pre-commit autoupdate
|
||||
@ -27,6 +27,8 @@ def storage(storage_backend, settings):
|
||||
def queue_manager(settings):
|
||||
settings.rabbitmq_heartbeat = 10
|
||||
settings.connection_sleep = 5
|
||||
settings.rabbitmq.max_retries = 3
|
||||
settings.rabbitmq.max_delay = 10
|
||||
queue_manager = QueueManager(settings)
|
||||
yield queue_manager
|
||||
|
||||
|
||||
6
tests/data.dvc
Normal file
6
tests/data.dvc
Normal file
@ -0,0 +1,6 @@
|
||||
outs:
|
||||
- md5: 75cc98b7c8fcf782a7d4941594e6bc12.dir
|
||||
size: 134913
|
||||
nfiles: 9
|
||||
hash: md5
|
||||
path: data
|
||||
@ -11,10 +11,18 @@ def exporter(settings):
|
||||
return get_exporter(settings)
|
||||
|
||||
|
||||
class TestOpenTelemetry:
|
||||
def test_queue_messages_are_traced(self, queue_manager, input_message, stop_message, settings, exporter):
|
||||
setup_trace(settings, exporter=exporter)
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_test_trace(settings, exporter, tracing_type):
|
||||
settings.tracing.type = tracing_type
|
||||
setup_trace(settings, exporter=exporter)
|
||||
|
||||
|
||||
class TestOpenTelemetry:
|
||||
@pytest.mark.xfail(
|
||||
reason="Azure Monitor requires a connection string. Therefore the test is allowed to fail in this case."
|
||||
)
|
||||
@pytest.mark.parametrize("tracing_type", ["opentelemetry", "azure_monitor"])
|
||||
def test_queue_messages_are_traced(self, queue_manager, input_message, stop_message, settings, exporter):
|
||||
instrument_pika()
|
||||
|
||||
queue_manager.purge_queues()
|
||||
@ -31,21 +39,3 @@ class TestOpenTelemetry:
|
||||
assert (
|
||||
exported_trace["resource"]["attributes"]["service.name"] == settings.tracing.opentelemetry.service_name
|
||||
)
|
||||
|
||||
# def test_webserver_requests_are_traced(self, settings):
|
||||
# settings.tracing.opentelemetry.exporter = "console"
|
||||
# settings.tracing.enabled = True
|
||||
#
|
||||
# app = FastAPI()
|
||||
#
|
||||
# @app.get("/test")
|
||||
# def test():
|
||||
# return {"test": "test"}
|
||||
#
|
||||
# thread = create_webserver_thread_from_settings(app, settings)
|
||||
# thread.start()
|
||||
# sleep(1)
|
||||
#
|
||||
# requests.get(f"http://{settings.webserver.host}:{settings.webserver.port}/test")
|
||||
#
|
||||
# thread.join(timeout=1)
|
||||
@ -3,7 +3,6 @@ from sys import stdout
|
||||
from time import sleep
|
||||
|
||||
import pika
|
||||
import pytest
|
||||
from kn_utils.logging import logger
|
||||
|
||||
logger.remove()
|
||||
@ -7,7 +7,7 @@ from fastapi import FastAPI
|
||||
|
||||
from pyinfra.storage.connection import get_storage_for_tenant
|
||||
from pyinfra.storage.utils import (
|
||||
download_data_as_specified_in_message,
|
||||
download_data_bytes_as_specified_in_message,
|
||||
upload_data_as_specified_in_message,
|
||||
)
|
||||
from pyinfra.utils.cipher import encrypt
|
||||
@ -139,16 +139,6 @@ def payload(payload_type):
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def expected_data(payload_type):
|
||||
if payload_type == "target_response_file_path":
|
||||
return {"data": "success"}
|
||||
elif payload_type == "dossier_id_file_id":
|
||||
return {"dossierId": "test", "fileId": "file", "data": "success"}
|
||||
elif payload_type == "target_file_dict":
|
||||
return {"file_1": {"data": "success"}, "file_2": {"data": "success"}}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"payload_type",
|
||||
[
|
||||
@ -160,17 +150,17 @@ def expected_data(payload_type):
|
||||
)
|
||||
@pytest.mark.parametrize("storage_backend", ["azure", "s3"], scope="class")
|
||||
class TestDownloadAndUploadFromMessage:
|
||||
def test_download_and_upload_from_message(self, storage, payload, expected_data, payload_type):
|
||||
def test_download_and_upload_from_message(self, storage, payload, payload_type):
|
||||
storage.clear_bucket()
|
||||
|
||||
upload_data = expected_data if payload_type != "target_file_dict" else expected_data["file_1"]
|
||||
storage.put_object("test/file.target.json.gz", gzip.compress(json.dumps(upload_data).encode()))
|
||||
result = {"process_result": "success"}
|
||||
storage_data = {**payload, "data": result}
|
||||
packed_data = gzip.compress(json.dumps(storage_data).encode())
|
||||
|
||||
data = download_data_as_specified_in_message(storage, payload)
|
||||
storage.put_object("test/file.target.json.gz", packed_data)
|
||||
|
||||
assert data == expected_data
|
||||
|
||||
upload_data_as_specified_in_message(storage, payload, expected_data)
|
||||
_ = download_data_bytes_as_specified_in_message(storage, payload)
|
||||
upload_data_as_specified_in_message(storage, payload, result)
|
||||
data = json.loads(gzip.decompress(storage.get_object("test/file.response.json.gz")).decode())
|
||||
|
||||
assert data == {**payload, "data": expected_data}
|
||||
assert data == storage_data
|
||||
83
tests/unit_test/utils_download_test.py
Normal file
83
tests/unit_test/utils_download_test.py
Normal file
@ -0,0 +1,83 @@
|
||||
import json
|
||||
|
||||
import pytest
|
||||
from unittest.mock import patch
|
||||
from pyinfra.storage.utils import (
|
||||
download_data_bytes_as_specified_in_message,
|
||||
upload_data_as_specified_in_message,
|
||||
DownloadedData,
|
||||
)
|
||||
from pyinfra.storage.storages.storage import Storage
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_storage():
|
||||
with patch("pyinfra.storage.utils.Storage") as MockStorage:
|
||||
yield MockStorage()
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
params=[
|
||||
{
|
||||
"raw_payload": {
|
||||
"tenantId": "tenant1",
|
||||
"dossierId": "dossier1",
|
||||
"fileId": "file1",
|
||||
"targetFileExtension": "txt",
|
||||
"responseFileExtension": "json",
|
||||
},
|
||||
"expected_result": {
|
||||
"data": b'{"key": "value"}',
|
||||
"file_path": "tenant1/dossier1/file1.txt"
|
||||
}
|
||||
},
|
||||
{
|
||||
"raw_payload": {
|
||||
"targetFilePath": "some/path/to/file.txt.gz",
|
||||
"responseFilePath": "some/path/to/file.json"
|
||||
},
|
||||
"expected_result": {
|
||||
"data": b'{"key": "value"}',
|
||||
"file_path": "some/path/to/file.txt.gz"
|
||||
}
|
||||
},
|
||||
{
|
||||
"raw_payload": {
|
||||
"targetFilePath": {
|
||||
"file1": "some/path/to/file1.txt.gz",
|
||||
"file2": "some/path/to/file2.txt.gz"
|
||||
},
|
||||
"responseFilePath": "some/path/to/file.json"
|
||||
},
|
||||
"expected_result": {
|
||||
"file1": {
|
||||
"data": b'{"key": "value"}',
|
||||
"file_path": "some/path/to/file1.txt.gz"
|
||||
},
|
||||
"file2": {
|
||||
"data": b'{"key": "value"}',
|
||||
"file_path": "some/path/to/file2.txt.gz"
|
||||
}
|
||||
}
|
||||
},
|
||||
]
|
||||
)
|
||||
def payload_and_expected_result(request):
|
||||
return request.param
|
||||
|
||||
def test_download_data_bytes_as_specified_in_message(mock_storage, payload_and_expected_result):
|
||||
raw_payload = payload_and_expected_result["raw_payload"]
|
||||
expected_result = payload_and_expected_result["expected_result"]
|
||||
mock_storage.get_object.return_value = b'{"key": "value"}'
|
||||
|
||||
result = download_data_bytes_as_specified_in_message(mock_storage, raw_payload)
|
||||
|
||||
assert isinstance(result, dict)
|
||||
assert result == expected_result
|
||||
mock_storage.get_object.assert_called()
|
||||
|
||||
def test_upload_data_as_specified_in_message(mock_storage, payload_and_expected_result):
|
||||
raw_payload = payload_and_expected_result["raw_payload"]
|
||||
data = {"key": "value"}
|
||||
upload_data_as_specified_in_message(mock_storage, raw_payload, data)
|
||||
mock_storage.put_object.assert_called_once()
|
||||
Loading…
x
Reference in New Issue
Block a user