Compare commits
104 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3ef4246d1e | ||
|
|
841c492639 | ||
|
|
ead069d3a7 | ||
|
|
044ea6cf0a | ||
|
|
ff7547e2c6 | ||
|
|
fbf79ef758 | ||
|
|
f382887d40 | ||
|
|
5c4400aa8b | ||
|
|
5ce66f18a0 | ||
|
|
ea0c55930a | ||
|
|
87f57e2244 | ||
|
|
3fb8c4e641 | ||
|
|
e23f63acf0 | ||
|
|
d3fecc518e | ||
|
|
341500d463 | ||
|
|
e002f77fd5 | ||
|
|
3c6d8f2dcc | ||
|
|
f6d6ba40bb | ||
|
|
6a0bbad108 | ||
|
|
527a671a75 | ||
|
|
cf91189728 | ||
|
|
61a6d0eeed | ||
|
|
bc0b355ff9 | ||
|
|
235e27b74c | ||
|
|
1540c2894e | ||
|
|
9b60594ce1 | ||
|
|
3d3c76b466 | ||
|
|
9d4ec84b49 | ||
|
|
8891249d7a | ||
|
|
e51e5c33eb | ||
|
|
04c90533b6 | ||
|
|
86af05c12c | ||
|
|
c6e336cb35 | ||
|
|
bf6f95f3e0 | ||
|
|
ed2bd1ec86 | ||
|
|
9906f68e0a | ||
|
|
0af648d66c | ||
|
|
46dc1fdce4 | ||
|
|
bd2f0b9b9a | ||
|
|
131afd7d3e | ||
|
|
98532c60ed | ||
|
|
45377ba172 | ||
|
|
f855224e29 | ||
|
|
541219177f | ||
|
|
4119a7d7d7 | ||
|
|
e2edfa7260 | ||
|
|
b70b16c541 | ||
|
|
e8d9326e48 | ||
|
|
9669152e14 | ||
|
|
ed3f8088e1 | ||
|
|
66eaa9a748 | ||
|
|
3a04359320 | ||
|
|
b46fcbd977 | ||
|
|
e75df42bec | ||
|
|
3bab86fe83 | ||
|
|
c5d53b8665 | ||
|
|
09d39930e7 | ||
|
|
a81f1bf31a | ||
|
|
0783e95d22 | ||
|
|
8ec13502a9 | ||
|
|
43881de526 | ||
|
|
67c30a5620 | ||
|
|
8e21b2144c | ||
|
|
5b45cae9a0 | ||
|
|
f2a5a2ea0e | ||
|
|
2133933d25 | ||
|
|
4c8dc6ccc0 | ||
|
|
5f31e2b15f | ||
|
|
88aef57c5f | ||
|
|
2b129b35f4 | ||
|
|
facb9726f9 | ||
|
|
b6a2069a6a | ||
|
|
f626ef2e6f | ||
|
|
318779413a | ||
|
|
f27b1fbba1 | ||
|
|
f2018f9c86 | ||
|
|
a5167d1230 | ||
|
|
1e939febc2 | ||
|
|
564f2cbb43 | ||
|
|
fa44f36088 | ||
|
|
2970823cc1 | ||
|
|
dba348a621 | ||
|
|
5020e54dcc | ||
|
|
2bc332831e | ||
|
|
b3f1529be2 | ||
|
|
789f6a7f7c | ||
|
|
06ce8bbb22 | ||
|
|
fdde56991b | ||
|
|
cb8509b120 | ||
|
|
47b42e95e2 | ||
|
|
536284ed84 | ||
|
|
aeac1c58f9 | ||
|
|
b12b1ce42b | ||
|
|
50b7a877e9 | ||
|
|
f3d0f24ea6 | ||
|
|
70d3a210a1 | ||
|
|
f935056fa9 | ||
|
|
f175633f30 | ||
|
|
ceac21c1ef | ||
|
|
0d232226fd | ||
|
|
9d55b3be89 | ||
|
|
edba6fc4da | ||
|
|
c5d8a6ed84 | ||
|
|
c16000c774 |
2
.dvc/.gitignore
vendored
Normal file
2
.dvc/.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
/config.local
|
||||||
|
/cache
|
||||||
5
.dvc/config
Normal file
5
.dvc/config
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
[core]
|
||||||
|
remote = azure
|
||||||
|
['remote "azure"']
|
||||||
|
url = azure://pyinfra-dvc
|
||||||
|
connection_string =
|
||||||
3
.dvcignore
Normal file
3
.dvcignore
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
# Add patterns of files dvc should ignore, which could improve
|
||||||
|
# the performance. Learn more at
|
||||||
|
# https://dvc.org/doc/user-guide/dvcignore
|
||||||
@ -1,49 +1,23 @@
|
|||||||
|
# CI for services, check gitlab repo for python package CI
|
||||||
include:
|
include:
|
||||||
- project: "Gitlab/gitlab"
|
- project: "Gitlab/gitlab"
|
||||||
ref: 0.3.0
|
ref: main
|
||||||
file: "/ci-templates/research/python_pkg_venv_test_build_release_gitlab-ci.yml"
|
file: "/ci-templates/research/python_pkg-test-build-release.gitlab-ci.yml"
|
||||||
|
|
||||||
default:
|
|
||||||
image: python:3.10
|
|
||||||
|
|
||||||
|
# set project variables here
|
||||||
variables:
|
variables:
|
||||||
PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip"
|
NEXUS_PROJECT_DIR: research # subfolder in Nexus docker-gin where your container will be stored
|
||||||
GITLAB_PYPI_URL: https://gitlab.knecon.com/api/v4/projects/${CI_PROJECT_ID}/packages/pypi
|
IMAGENAME: $CI_PROJECT_NAME # if the project URL is gitlab.example.com/group-name/project-1, CI_PROJECT_NAME is project-1
|
||||||
PYPI_REGISTRY_RESEARCH: https://gitlab.knecon.com/api/v4/groups/19/-/packages/pypi
|
REPORTS_DIR: reports
|
||||||
POETRY_SOURCE_REF_RESEARCH: gitlab-research
|
FF_USE_FASTZIP: "true" # enable fastzip - a faster zip implementation that also supports level configuration.
|
||||||
PYPI_REGISTRY_RED: https://gitlab.knecon.com/api/v4/groups/12/-/packages/pypi
|
ARTIFACT_COMPRESSION_LEVEL: default # can also be set to fastest, fast, slow and slowest. If just enabling fastzip is not enough try setting this to fastest or fast.
|
||||||
POETRY_SOURCE_REF_RED: gitlab-red
|
CACHE_COMPRESSION_LEVEL: default # same as above, but for caches
|
||||||
PYPI_REGISTRY_FFORESIGHT: https://gitlab.knecon.com/api/v4/groups/269/-/packages/pypi
|
# TRANSFER_METER_FREQUENCY: 5s # will display transfer progress every 5 seconds for artifacts and remote caches. For debugging purposes.
|
||||||
POETRY_SOURCE_REF_FFORESIGHT: gitlab-fforesight
|
|
||||||
# POETRY_HOME: /opt/poetry
|
|
||||||
|
|
||||||
setup-poetry-venv:
|
|
||||||
stage: setup
|
|
||||||
script:
|
|
||||||
- env # check env vars
|
|
||||||
# install poetry & return versions
|
|
||||||
- pip install --upgrade pip
|
|
||||||
- pip -V
|
|
||||||
- python -V
|
|
||||||
- pip install poetry
|
|
||||||
- poetry -V
|
|
||||||
# configure poetry
|
|
||||||
- poetry config installer.max-workers 10
|
|
||||||
- poetry config virtualenvs.in-project true
|
|
||||||
- poetry config repositories.${POETRY_SOURCE_REF_RESEARCH} ${PYPI_REGISTRY_RESEARCH}
|
|
||||||
- poetry config http-basic.${POETRY_SOURCE_REF_RESEARCH} ${CI_REGISTRY_USER} ${CI_JOB_TOKEN}
|
|
||||||
- poetry config repositories.${POETRY_SOURCE_REF_RED} ${PYPI_REGISTRY_RED}
|
|
||||||
- poetry config http-basic.${POETRY_SOURCE_REF_RED} ${CI_REGISTRY_USER} ${CI_JOB_TOKEN}
|
|
||||||
- poetry config repositories.${POETRY_SOURCE_REF_FFORESIGHT} ${PYPI_REGISTRY_FFORESIGHT}
|
|
||||||
- poetry config http-basic.${POETRY_SOURCE_REF_FFORESIGHT} ${CI_REGISTRY_USER} ${CI_JOB_TOKEN}
|
|
||||||
# create and activate venv
|
|
||||||
- poetry env use $(which python)
|
|
||||||
- source .venv/bin/activate
|
|
||||||
- python -m ensurepip
|
|
||||||
- env # check env vars again
|
|
||||||
# install from poetry.lock file
|
|
||||||
- poetry install --all-extras -vvv
|
|
||||||
|
|
||||||
run-tests:
|
############
|
||||||
script:
|
# UNIT TESTS
|
||||||
- echo "Disabled until we have an automated way to run docker compose before tests."
|
unit-tests:
|
||||||
|
variables:
|
||||||
|
###### UPDATE/EDIT ######
|
||||||
|
UNIT_TEST_DIR: "tests/unit_test"
|
||||||
|
|||||||
@ -5,7 +5,7 @@ default_language_version:
|
|||||||
python: python3.10
|
python: python3.10
|
||||||
repos:
|
repos:
|
||||||
- repo: https://github.com/pre-commit/pre-commit-hooks
|
- repo: https://github.com/pre-commit/pre-commit-hooks
|
||||||
rev: v4.6.0
|
rev: v5.0.0
|
||||||
hooks:
|
hooks:
|
||||||
- id: trailing-whitespace
|
- id: trailing-whitespace
|
||||||
- id: end-of-file-fixer
|
- id: end-of-file-fixer
|
||||||
@ -26,6 +26,7 @@ repos:
|
|||||||
rev: v3.0.0a5
|
rev: v3.0.0a5
|
||||||
hooks:
|
hooks:
|
||||||
- id: pylint
|
- id: pylint
|
||||||
|
language: system
|
||||||
args:
|
args:
|
||||||
- --disable=C0111,R0903
|
- --disable=C0111,R0903
|
||||||
- --max-line-length=120
|
- --max-line-length=120
|
||||||
@ -38,7 +39,7 @@ repos:
|
|||||||
- --profile black
|
- --profile black
|
||||||
|
|
||||||
- repo: https://github.com/psf/black
|
- repo: https://github.com/psf/black
|
||||||
rev: 24.4.2
|
rev: 24.10.0
|
||||||
hooks:
|
hooks:
|
||||||
- id: black
|
- id: black
|
||||||
# exclude: ^(docs/|notebooks/|data/|src/secrets/)
|
# exclude: ^(docs/|notebooks/|data/|src/secrets/)
|
||||||
@ -46,7 +47,7 @@ repos:
|
|||||||
- --line-length=120
|
- --line-length=120
|
||||||
|
|
||||||
- repo: https://github.com/compilerla/conventional-pre-commit
|
- repo: https://github.com/compilerla/conventional-pre-commit
|
||||||
rev: v3.2.0
|
rev: v3.6.0
|
||||||
hooks:
|
hooks:
|
||||||
- id: conventional-pre-commit
|
- id: conventional-pre-commit
|
||||||
pass_filenames: false
|
pass_filenames: false
|
||||||
|
|||||||
@ -1 +1 @@
|
|||||||
3.10.12
|
3.10
|
||||||
|
|||||||
20
README.md
20
README.md
@ -6,6 +6,7 @@
|
|||||||
4. [ Module Installation ](#module-installation)
|
4. [ Module Installation ](#module-installation)
|
||||||
5. [ Scripts ](#scripts)
|
5. [ Scripts ](#scripts)
|
||||||
6. [ Tests ](#tests)
|
6. [ Tests ](#tests)
|
||||||
|
7. [ Opentelemetry protobuf dependency hell ](#opentelemetry-protobuf-dependency-hell)
|
||||||
|
|
||||||
## About
|
## About
|
||||||
|
|
||||||
@ -33,7 +34,7 @@ the [complete example](pyinfra/examples.py).
|
|||||||
| Environment Variable | Internal / .toml Name | Description |
|
| Environment Variable | Internal / .toml Name | Description |
|
||||||
| ------------------------------------------ | --------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
| ------------------------------------------ | --------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||||
| LOGGING\_\_LEVEL | logging.level | Log level |
|
| LOGGING\_\_LEVEL | logging.level | Log level |
|
||||||
| CONCURRENCY\_\_ENABLED | concurrency.enabled | Enable multi tenant queue mode |
|
| DYNAMIC_TENANT_QUEUES\_\_ENABLED | dynamic_tenant_queues.enabled | Enable queues per tenant that are dynamically created mode |
|
||||||
| METRICS\_\_PROMETHEUS\_\_ENABLED | metrics.prometheus.enabled | Enable Prometheus metrics collection |
|
| METRICS\_\_PROMETHEUS\_\_ENABLED | metrics.prometheus.enabled | Enable Prometheus metrics collection |
|
||||||
| METRICS\_\_PROMETHEUS\_\_PREFIX | metrics.prometheus.prefix | Prefix for Prometheus metrics (e.g. {product}-{service}) |
|
| METRICS\_\_PROMETHEUS\_\_PREFIX | metrics.prometheus.prefix | Prefix for Prometheus metrics (e.g. {product}-{service}) |
|
||||||
| WEBSERVER\_\_HOST | webserver.host | Host of the webserver (offering e.g. /prometheus, /ready and /health endpoints) |
|
| WEBSERVER\_\_HOST | webserver.host | Host of the webserver (offering e.g. /prometheus, /ready and /health endpoints) |
|
||||||
@ -72,6 +73,17 @@ the [complete example](pyinfra/examples.py).
|
|||||||
| TRACING\_\_OPENTELEMETRY\_\_EXPORTER | tracing.opentelemetry.exporter | Name of exporter |
|
| TRACING\_\_OPENTELEMETRY\_\_EXPORTER | tracing.opentelemetry.exporter | Name of exporter |
|
||||||
| KUBERNETES\_\_POD_NAME | kubernetes.pod_name | Service pod name |
|
| KUBERNETES\_\_POD_NAME | kubernetes.pod_name | Service pod name |
|
||||||
|
|
||||||
|
## Setup
|
||||||
|
**IMPORTANT** you need to set the following environment variables before running the setup script:
|
||||||
|
- ``$NEXUS_USER`` your Nexus user (usually equal to firstname.lastname@knecon.com)
|
||||||
|
- ``$NEXUS_PASSWORD`` your Nexus password (usually equal to your Azure Login)
|
||||||
|
|
||||||
|
```shell
|
||||||
|
# create venv and activate it
|
||||||
|
source ./scripts/setup/devenvsetup.sh {{ cookiecutter.python_version }} $NEXUS_USER $NEXUS_PASSWORD
|
||||||
|
source .venv/bin/activate
|
||||||
|
```
|
||||||
|
|
||||||
### OpenTelemetry
|
### OpenTelemetry
|
||||||
|
|
||||||
Open telemetry (vis its Python SDK) is set up to be as unobtrusive as possible; for typical use cases it can be
|
Open telemetry (vis its Python SDK) is set up to be as unobtrusive as possible; for typical use cases it can be
|
||||||
@ -200,3 +212,9 @@ $ python scripts/send_request.py
|
|||||||
|
|
||||||
Tests require a running minio and rabbitmq container, meaning you have to run `docker compose up` in the tests folder
|
Tests require a running minio and rabbitmq container, meaning you have to run `docker compose up` in the tests folder
|
||||||
before running the tests.
|
before running the tests.
|
||||||
|
|
||||||
|
## OpenTelemetry Protobuf Dependency Hell
|
||||||
|
|
||||||
|
**Note**: Status 2025/01/09: the currently used `opentelemetry-exporter-otlp-proto-http` version `1.25.0` requires
|
||||||
|
a `protobuf` version < `5.x.x` and is not compatible with the latest protobuf version `5.27.x`. This is an [open issue](https://github.com/open-telemetry/opentelemetry-python/issues/3958) in opentelemetry, because [support for 4.25.x ends in Q2 '25](https://protobuf.dev/support/version-support/#python).
|
||||||
|
Therefore, we should keep this in mind and update the dependency once opentelemetry includes support for `protobuf 5.27.x`.
|
||||||
|
|||||||
6380
poetry.lock
generated
6380
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@ -1,5 +1,9 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
|
import signal
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import aiohttp
|
||||||
|
from aiormq.exceptions import AMQPConnectionError
|
||||||
from dynaconf import Dynaconf
|
from dynaconf import Dynaconf
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
from kn_utils.logging import logger
|
from kn_utils.logging import logger
|
||||||
@ -16,8 +20,79 @@ from pyinfra.webserver.prometheus import (
|
|||||||
from pyinfra.webserver.utils import (
|
from pyinfra.webserver.utils import (
|
||||||
add_health_check_endpoint,
|
add_health_check_endpoint,
|
||||||
create_webserver_thread_from_settings,
|
create_webserver_thread_from_settings,
|
||||||
|
run_async_webserver,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
shutdown_flag = False
|
||||||
|
|
||||||
|
|
||||||
|
async def graceful_shutdown(manager: AsyncQueueManager, queue_task, webserver_task):
|
||||||
|
global shutdown_flag
|
||||||
|
shutdown_flag = True
|
||||||
|
logger.info("SIGTERM received, shutting down gracefully...")
|
||||||
|
|
||||||
|
if queue_task and not queue_task.done():
|
||||||
|
queue_task.cancel()
|
||||||
|
|
||||||
|
# await queue manager shutdown
|
||||||
|
await asyncio.gather(queue_task, manager.shutdown(), return_exceptions=True)
|
||||||
|
|
||||||
|
if webserver_task and not webserver_task.done():
|
||||||
|
webserver_task.cancel()
|
||||||
|
|
||||||
|
# await webserver shutdown
|
||||||
|
await asyncio.gather(webserver_task, return_exceptions=True)
|
||||||
|
|
||||||
|
logger.info("Shutdown complete.")
|
||||||
|
|
||||||
|
|
||||||
|
async def run_async_queues(manager: AsyncQueueManager, app, port, host):
|
||||||
|
"""Run the async webserver and the async queue manager concurrently."""
|
||||||
|
queue_task = None
|
||||||
|
webserver_task = None
|
||||||
|
tenant_api_available = True
|
||||||
|
|
||||||
|
# add signal handler for SIGTERM and SIGINT
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
loop.add_signal_handler(
|
||||||
|
signal.SIGTERM, lambda: asyncio.create_task(graceful_shutdown(manager, queue_task, webserver_task))
|
||||||
|
)
|
||||||
|
loop.add_signal_handler(
|
||||||
|
signal.SIGINT, lambda: asyncio.create_task(graceful_shutdown(manager, queue_task, webserver_task))
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
active_tenants = await manager.fetch_active_tenants()
|
||||||
|
|
||||||
|
queue_task = asyncio.create_task(manager.run(active_tenants=active_tenants), name="queues")
|
||||||
|
webserver_task = asyncio.create_task(run_async_webserver(app, port, host), name="webserver")
|
||||||
|
await asyncio.gather(queue_task, webserver_task)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
logger.info("Main task was cancelled, initiating shutdown.")
|
||||||
|
except AMQPConnectionError as e:
|
||||||
|
logger.warning(f"AMQPConnectionError: {e} - shutting down.")
|
||||||
|
except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError):
|
||||||
|
logger.warning("Tenant server did not answer - shutting down.")
|
||||||
|
tenant_api_available = False
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"An error occurred while running async queues: {e}", exc_info=True)
|
||||||
|
sys.exit(1)
|
||||||
|
finally:
|
||||||
|
if shutdown_flag:
|
||||||
|
logger.debug("Graceful shutdown already in progress.")
|
||||||
|
else:
|
||||||
|
logger.warning("Initiating shutdown due to error or manual interruption.")
|
||||||
|
if not tenant_api_available:
|
||||||
|
sys.exit(0)
|
||||||
|
if queue_task and not queue_task.done():
|
||||||
|
queue_task.cancel()
|
||||||
|
|
||||||
|
if webserver_task and not webserver_task.done():
|
||||||
|
webserver_task.cancel()
|
||||||
|
|
||||||
|
await asyncio.gather(queue_task, manager.shutdown(), webserver_task, return_exceptions=True)
|
||||||
|
logger.info("Shutdown complete.")
|
||||||
|
|
||||||
|
|
||||||
def start_standard_queue_consumer(
|
def start_standard_queue_consumer(
|
||||||
callback: Callback,
|
callback: Callback,
|
||||||
@ -45,10 +120,11 @@ def start_standard_queue_consumer(
|
|||||||
if settings.tracing.enabled:
|
if settings.tracing.enabled:
|
||||||
setup_trace(settings)
|
setup_trace(settings)
|
||||||
|
|
||||||
instrument_pika()
|
instrument_pika(dynamic_queues=settings.dynamic_tenant_queues.enabled)
|
||||||
instrument_app(app)
|
instrument_app(app)
|
||||||
|
|
||||||
if settings.concurrency.enabled:
|
if settings.dynamic_tenant_queues.enabled:
|
||||||
|
logger.info("Dynamic tenant queues enabled. Running async queues.")
|
||||||
config = RabbitMQConfig(
|
config = RabbitMQConfig(
|
||||||
host=settings.rabbitmq.host,
|
host=settings.rabbitmq.host,
|
||||||
port=settings.rabbitmq.port,
|
port=settings.rabbitmq.port,
|
||||||
@ -65,19 +141,29 @@ def start_standard_queue_consumer(
|
|||||||
pod_name=settings.kubernetes.pod_name,
|
pod_name=settings.kubernetes.pod_name,
|
||||||
)
|
)
|
||||||
manager = AsyncQueueManager(
|
manager = AsyncQueueManager(
|
||||||
config=config, tenant_service_url=settings.storage.tenant_server.endpoint, message_processor=callback
|
config=config,
|
||||||
|
tenant_service_url=settings.storage.tenant_server.endpoint,
|
||||||
|
message_processor=callback,
|
||||||
|
max_concurrent_tasks=(
|
||||||
|
settings.asyncio.max_concurrent_tasks if hasattr(settings.asyncio, "max_concurrent_tasks") else 10
|
||||||
|
),
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
logger.info("Dynamic tenant queues disabled. Running sync queues.")
|
||||||
manager = QueueManager(settings)
|
manager = QueueManager(settings)
|
||||||
|
|
||||||
app = add_health_check_endpoint(app, manager.is_ready)
|
app = add_health_check_endpoint(app, manager.is_ready)
|
||||||
|
|
||||||
webserver_thread = create_webserver_thread_from_settings(app, settings)
|
|
||||||
webserver_thread.start()
|
|
||||||
|
|
||||||
if isinstance(manager, AsyncQueueManager):
|
if isinstance(manager, AsyncQueueManager):
|
||||||
asyncio.run(manager.run())
|
asyncio.run(run_async_queues(manager, app, port=settings.webserver.port, host=settings.webserver.host))
|
||||||
|
|
||||||
elif isinstance(manager, QueueManager):
|
elif isinstance(manager, QueueManager):
|
||||||
manager.start_consuming(callback)
|
webserver = create_webserver_thread_from_settings(app, settings)
|
||||||
|
webserver.start()
|
||||||
|
try:
|
||||||
|
manager.start_consuming(callback)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"An error occurred while consuming messages: {e}", exc_info=True)
|
||||||
|
sys.exit(1)
|
||||||
else:
|
else:
|
||||||
logger.warning(f"Behavior for type {type(manager)} is not defined")
|
logger.warning(f"Behavior for type {type(manager)} is not defined")
|
||||||
|
|||||||
@ -1,24 +1,26 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
|
import concurrent.futures
|
||||||
import json
|
import json
|
||||||
import signal
|
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from typing import Any, Callable, Dict, Set
|
from typing import Any, Callable, Dict, Set
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
from aio_pika import ExchangeType, IncomingMessage, Message, connect_robust
|
from aio_pika import ExchangeType, IncomingMessage, Message, connect
|
||||||
from aio_pika.abc import (
|
from aio_pika.abc import (
|
||||||
AbstractChannel,
|
AbstractChannel,
|
||||||
AbstractConnection,
|
AbstractConnection,
|
||||||
AbstractExchange,
|
AbstractExchange,
|
||||||
AbstractIncomingMessage,
|
AbstractIncomingMessage,
|
||||||
|
AbstractQueue,
|
||||||
)
|
)
|
||||||
|
from aio_pika.exceptions import (
|
||||||
|
ChannelClosed,
|
||||||
|
ChannelInvalidStateError,
|
||||||
|
ConnectionClosed,
|
||||||
|
)
|
||||||
|
from aiormq.exceptions import AMQPConnectionError
|
||||||
from kn_utils.logging import logger
|
from kn_utils.logging import logger
|
||||||
from tenacity import (
|
from kn_utils.retry import retry
|
||||||
retry,
|
|
||||||
retry_if_exception_type,
|
|
||||||
stop_after_attempt,
|
|
||||||
wait_exponential_jitter,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@ -55,27 +57,60 @@ class AsyncQueueManager:
|
|||||||
config: RabbitMQConfig,
|
config: RabbitMQConfig,
|
||||||
tenant_service_url: str,
|
tenant_service_url: str,
|
||||||
message_processor: Callable[[Dict[str, Any]], Dict[str, Any]],
|
message_processor: Callable[[Dict[str, Any]], Dict[str, Any]],
|
||||||
|
max_concurrent_tasks: int = 10,
|
||||||
):
|
):
|
||||||
self.config = config
|
self.config = config
|
||||||
self.tenant_service_url = tenant_service_url
|
self.tenant_service_url = tenant_service_url
|
||||||
self.message_processor = message_processor
|
self.message_processor = message_processor
|
||||||
|
self.semaphore = asyncio.Semaphore(max_concurrent_tasks)
|
||||||
|
|
||||||
self.connection: AbstractConnection | None = None
|
self.connection: AbstractConnection | None = None
|
||||||
self.channel: AbstractChannel | None = None
|
self.channel: AbstractChannel | None = None
|
||||||
self.tenant_exchange: AbstractExchange | None = None
|
self.tenant_exchange: AbstractExchange | None = None
|
||||||
self.input_exchange: AbstractExchange | None = None
|
self.input_exchange: AbstractExchange | None = None
|
||||||
self.output_exchange: AbstractExchange | None = None
|
self.output_exchange: AbstractExchange | None = None
|
||||||
|
self.tenant_exchange_queue: AbstractQueue | None = None
|
||||||
self.tenant_queues: Dict[str, AbstractChannel] = {}
|
self.tenant_queues: Dict[str, AbstractChannel] = {}
|
||||||
|
self.consumer_tags: Dict[str, str] = {}
|
||||||
|
|
||||||
|
self.message_count: int = 0
|
||||||
|
|
||||||
|
@retry(tries=5, exceptions=AMQPConnectionError, reraise=True, logger=logger)
|
||||||
async def connect(self) -> None:
|
async def connect(self) -> None:
|
||||||
self.connection = await connect_robust(**self.config.connection_params)
|
logger.info("Attempting to connect to RabbitMQ...")
|
||||||
|
self.connection = await connect(**self.config.connection_params)
|
||||||
|
self.connection.close_callbacks.add(self.on_connection_close)
|
||||||
self.channel = await self.connection.channel()
|
self.channel = await self.connection.channel()
|
||||||
await self.channel.set_qos(prefetch_count=1)
|
await self.channel.set_qos(prefetch_count=1)
|
||||||
|
logger.info("Successfully connected to RabbitMQ")
|
||||||
|
|
||||||
|
async def on_connection_close(self, sender, exc):
|
||||||
|
"""This is a callback for unexpected connection closures."""
|
||||||
|
logger.debug(f"Sender: {sender}")
|
||||||
|
if isinstance(exc, ConnectionClosed):
|
||||||
|
logger.warning("Connection to RabbitMQ lost. Attempting to reconnect...")
|
||||||
|
try:
|
||||||
|
active_tenants = await self.fetch_active_tenants()
|
||||||
|
await self.run(active_tenants=active_tenants)
|
||||||
|
logger.debug("Reconnected to RabbitMQ successfully")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to reconnect to RabbitMQ: {e}")
|
||||||
|
# cancel queue manager and webserver to shutdown service
|
||||||
|
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
|
||||||
|
[task.cancel() for task in tasks if task.get_name() in ["queues", "webserver"]]
|
||||||
|
else:
|
||||||
|
logger.debug("Connection closed on purpose.")
|
||||||
|
|
||||||
async def is_ready(self) -> bool:
|
async def is_ready(self) -> bool:
|
||||||
await self.connect()
|
if self.connection is None or self.connection.is_closed:
|
||||||
return await self.channel.is_open
|
try:
|
||||||
|
await self.connect()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to connect to RabbitMQ: {e}")
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
@retry(tries=5, exceptions=(AMQPConnectionError, ChannelInvalidStateError), reraise=True, logger=logger)
|
||||||
async def setup_exchanges(self) -> None:
|
async def setup_exchanges(self) -> None:
|
||||||
self.tenant_exchange = await self.channel.declare_exchange(
|
self.tenant_exchange = await self.channel.declare_exchange(
|
||||||
self.config.tenant_exchange_name, ExchangeType.TOPIC, durable=True
|
self.config.tenant_exchange_name, ExchangeType.TOPIC, durable=True
|
||||||
@ -87,61 +122,79 @@ class AsyncQueueManager:
|
|||||||
self.config.service_response_exchange_name, ExchangeType.DIRECT, durable=True
|
self.config.service_response_exchange_name, ExchangeType.DIRECT, durable=True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# we must declare DLQ to handle error messages
|
||||||
|
self.dead_letter_queue = await self.channel.declare_queue(
|
||||||
|
self.config.service_dead_letter_queue_name, durable=True
|
||||||
|
)
|
||||||
|
|
||||||
|
@retry(tries=5, exceptions=(AMQPConnectionError, ChannelInvalidStateError), reraise=True, logger=logger)
|
||||||
async def setup_tenant_queue(self) -> None:
|
async def setup_tenant_queue(self) -> None:
|
||||||
queue = await self.channel.declare_queue(
|
self.tenant_exchange_queue = await self.channel.declare_queue(
|
||||||
f"{self.config.pod_name}_{self.config.tenant_event_queue_suffix}",
|
f"{self.config.pod_name}_{self.config.tenant_event_queue_suffix}",
|
||||||
durable=True,
|
durable=True,
|
||||||
arguments={
|
arguments={
|
||||||
"x-dead-letter-exchange": "",
|
"x-dead-letter-exchange": "",
|
||||||
"x-dead-letter-routing-key": self.config.service_dead_letter_queue_name,
|
"x-dead-letter-routing-key": self.config.service_dead_letter_queue_name,
|
||||||
"x-expires": self.config.queue_expiration_time,
|
"x-expires": self.config.queue_expiration_time,
|
||||||
"x-max-priority": 2,
|
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
await queue.bind(self.tenant_exchange, routing_key="tenant.*")
|
await self.tenant_exchange_queue.bind(self.tenant_exchange, routing_key="tenant.*")
|
||||||
await queue.consume(self.process_tenant_message)
|
self.consumer_tags["tenant_exchange_queue"] = await self.tenant_exchange_queue.consume(
|
||||||
|
self.process_tenant_message
|
||||||
|
)
|
||||||
|
|
||||||
async def process_tenant_message(self, message: AbstractIncomingMessage) -> None:
|
async def process_tenant_message(self, message: AbstractIncomingMessage) -> None:
|
||||||
async with message.process():
|
try:
|
||||||
message_body = json.loads(message.body.decode())
|
async with message.process():
|
||||||
logger.debug(f"Tenant message received: {message_body}")
|
message_body = json.loads(message.body.decode())
|
||||||
tenant_id = message_body["tenantId"]
|
logger.debug(f"Tenant message received: {message_body}")
|
||||||
routing_key = message.routing_key
|
tenant_id = message_body["tenantId"]
|
||||||
|
routing_key = message.routing_key
|
||||||
|
|
||||||
if routing_key == "tenant.created":
|
if routing_key == "tenant.created":
|
||||||
await self.create_tenant_queues(tenant_id)
|
await self.create_tenant_queues(tenant_id)
|
||||||
elif routing_key == "tenant.delete":
|
elif routing_key == "tenant.delete":
|
||||||
await self.delete_tenant_queues(tenant_id)
|
await self.delete_tenant_queues(tenant_id)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e, exc_info=True)
|
||||||
|
|
||||||
async def create_tenant_queues(self, tenant_id: str) -> None:
|
async def create_tenant_queues(self, tenant_id: str) -> None:
|
||||||
queue_name = f"{self.config.input_queue_prefix}_{tenant_id}"
|
queue_name = f"{self.config.input_queue_prefix}_{tenant_id}"
|
||||||
logger.info(f"Declaring queue: {queue_name}")
|
logger.info(f"Declaring queue: {queue_name}")
|
||||||
input_queue = await self.channel.declare_queue(
|
try:
|
||||||
queue_name,
|
input_queue = await self.channel.declare_queue(
|
||||||
durable=True,
|
queue_name,
|
||||||
arguments={
|
durable=True,
|
||||||
"x-dead-letter-exchange": "",
|
arguments={
|
||||||
"x-dead-letter-routing-key": self.config.service_dead_letter_queue_name,
|
"x-dead-letter-exchange": "",
|
||||||
"x-expires": self.config.queue_expiration_time,
|
"x-dead-letter-routing-key": self.config.service_dead_letter_queue_name,
|
||||||
"x-max-priority": 2,
|
},
|
||||||
},
|
)
|
||||||
)
|
await input_queue.bind(self.input_exchange, routing_key=tenant_id)
|
||||||
await input_queue.bind(self.input_exchange, routing_key=tenant_id)
|
self.consumer_tags[tenant_id] = await input_queue.consume(self.process_input_message)
|
||||||
await input_queue.consume(self.process_input_message)
|
self.tenant_queues[tenant_id] = input_queue
|
||||||
|
logger.info(f"Created and started consuming queue for tenant {tenant_id}")
|
||||||
self.tenant_queues[tenant_id] = input_queue
|
except Exception as e:
|
||||||
logger.info(f"Created queues for tenant {tenant_id}")
|
logger.error(e, exc_info=True)
|
||||||
|
|
||||||
async def delete_tenant_queues(self, tenant_id: str) -> None:
|
async def delete_tenant_queues(self, tenant_id: str) -> None:
|
||||||
if tenant_id in self.tenant_queues:
|
if tenant_id in self.tenant_queues:
|
||||||
# somehow queue.delete() does not work here
|
# somehow queue.delete() does not work here
|
||||||
await self.channel.queue_delete(f"{self.config.input_queue_prefix}_{tenant_id}")
|
await self.channel.queue_delete(f"{self.config.input_queue_prefix}_{tenant_id}")
|
||||||
del self.tenant_queues[tenant_id]
|
del self.tenant_queues[tenant_id]
|
||||||
|
del self.consumer_tags[tenant_id]
|
||||||
logger.info(f"Deleted queues for tenant {tenant_id}")
|
logger.info(f"Deleted queues for tenant {tenant_id}")
|
||||||
|
|
||||||
async def process_input_message(self, message: IncomingMessage) -> None:
|
async def process_input_message(self, message: IncomingMessage) -> None:
|
||||||
async def process_message_body_and_await_result(unpacked_message_body):
|
async def process_message_body_and_await_result(unpacked_message_body):
|
||||||
return self.message_processor(unpacked_message_body)
|
async with self.semaphore:
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_pool_executor:
|
||||||
|
logger.info("Processing payload in a separate thread.")
|
||||||
|
result = await loop.run_in_executor(
|
||||||
|
thread_pool_executor, self.message_processor, unpacked_message_body
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
|
||||||
async with message.process(ignore_processed=True):
|
async with message.process(ignore_processed=True):
|
||||||
if message.redelivered:
|
if message.redelivered:
|
||||||
@ -156,6 +209,8 @@ class AsyncQueueManager:
|
|||||||
await self.shutdown()
|
await self.shutdown()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
self.message_count += 1
|
||||||
|
|
||||||
try:
|
try:
|
||||||
tenant_id = message.routing_key
|
tenant_id = message.routing_key
|
||||||
|
|
||||||
@ -179,14 +234,15 @@ class AsyncQueueManager:
|
|||||||
|
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
await message.nack(requeue=False)
|
await message.nack(requeue=False)
|
||||||
logger.error(f"Invalid JSON in input message: {message.body}")
|
logger.error(f"Invalid JSON in input message: {message.body}", exc_info=True)
|
||||||
except FileNotFoundError as e:
|
except FileNotFoundError as e:
|
||||||
logger.warning(f"{e}, declining message with {message.delivery_tag=}.")
|
logger.warning(f"{e}, declining message with {message.delivery_tag=}.", exc_info=True)
|
||||||
await message.nack(requeue=False)
|
await message.nack(requeue=False)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
await message.nack(requeue=False)
|
await message.nack(requeue=False)
|
||||||
logger.error(f"Error processing input message: {e}", exc_info=True)
|
logger.error(f"Error processing input message: {e}", exc_info=True)
|
||||||
raise
|
finally:
|
||||||
|
self.message_count -= 1
|
||||||
|
|
||||||
async def publish_to_output_exchange(self, tenant_id: str, result: Dict[str, Any], headers: Dict[str, Any]) -> None:
|
async def publish_to_output_exchange(self, tenant_id: str, result: Dict[str, Any], headers: Dict[str, Any]) -> None:
|
||||||
await self.output_exchange.publish(
|
await self.output_exchange.publish(
|
||||||
@ -195,12 +251,7 @@ class AsyncQueueManager:
|
|||||||
)
|
)
|
||||||
logger.info(f"Published result to queue {tenant_id}.")
|
logger.info(f"Published result to queue {tenant_id}.")
|
||||||
|
|
||||||
@retry(
|
@retry(tries=5, exceptions=(aiohttp.ClientResponseError, aiohttp.ClientConnectorError), reraise=True, logger=logger)
|
||||||
stop=stop_after_attempt(5),
|
|
||||||
wait=wait_exponential_jitter(initial=1, max=10),
|
|
||||||
retry=retry_if_exception_type(aiohttp.ClientResponseError),
|
|
||||||
reraise=True,
|
|
||||||
)
|
|
||||||
async def fetch_active_tenants(self) -> Set[str]:
|
async def fetch_active_tenants(self) -> Set[str]:
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
async with session.get(self.tenant_service_url) as response:
|
async with session.get(self.tenant_service_url) as response:
|
||||||
@ -214,45 +265,65 @@ class AsyncQueueManager:
|
|||||||
)
|
)
|
||||||
return set()
|
return set()
|
||||||
|
|
||||||
async def initialize_tenant_queues(self) -> None:
|
@retry(
|
||||||
try:
|
tries=5,
|
||||||
active_tenants = await self.fetch_active_tenants()
|
exceptions=(
|
||||||
except aiohttp.ClientResponseError:
|
AMQPConnectionError,
|
||||||
logger.warning("API calls to tenant server failed. No tenant queues initialized.")
|
ChannelInvalidStateError,
|
||||||
active_tenants = set()
|
),
|
||||||
|
reraise=True,
|
||||||
|
logger=logger,
|
||||||
|
)
|
||||||
|
async def initialize_tenant_queues(self, active_tenants: set) -> None:
|
||||||
for tenant_id in active_tenants:
|
for tenant_id in active_tenants:
|
||||||
await self.create_tenant_queues(tenant_id)
|
await self.create_tenant_queues(tenant_id)
|
||||||
|
|
||||||
async def run(self) -> None:
|
async def run(self, active_tenants: set) -> None:
|
||||||
stop = asyncio.Event()
|
|
||||||
|
|
||||||
def signal_handler(*_):
|
await self.connect()
|
||||||
logger.info("Signal received, shutting down...")
|
await self.setup_exchanges()
|
||||||
stop.set()
|
await self.initialize_tenant_queues(active_tenants=active_tenants)
|
||||||
|
await self.setup_tenant_queue()
|
||||||
|
|
||||||
loop = asyncio.get_running_loop()
|
logger.info("RabbitMQ handler is running. Press CTRL+C to exit.")
|
||||||
for sig in (signal.SIGINT, signal.SIGTERM):
|
|
||||||
loop.add_signal_handler(sig, signal_handler)
|
|
||||||
|
|
||||||
|
async def close_channels(self) -> None:
|
||||||
try:
|
try:
|
||||||
await self.connect()
|
if self.channel and not self.channel.is_closed:
|
||||||
await self.setup_exchanges()
|
# Cancel queues to stop fetching messages
|
||||||
await self.initialize_tenant_queues()
|
logger.debug("Cancelling queues...")
|
||||||
await self.setup_tenant_queue()
|
for tenant, queue in self.tenant_queues.items():
|
||||||
|
await queue.cancel(self.consumer_tags[tenant])
|
||||||
logger.info("RabbitMQ handler is running. Press CTRL+C to exit.")
|
if self.tenant_exchange_queue:
|
||||||
await stop.wait() # Run until stop signal received
|
await self.tenant_exchange_queue.cancel(self.consumer_tags["tenant_exchange_queue"])
|
||||||
except asyncio.CancelledError:
|
while self.message_count != 0:
|
||||||
logger.warning("Operation cancelled.")
|
logger.debug(f"Messages are still being processed: {self.message_count=} ")
|
||||||
|
await asyncio.sleep(2)
|
||||||
|
await self.channel.close(exc=asyncio.CancelledError)
|
||||||
|
logger.debug("Channel closed.")
|
||||||
|
else:
|
||||||
|
logger.debug("No channel to close.")
|
||||||
|
except ChannelClosed:
|
||||||
|
logger.warning("Channel was already closed.")
|
||||||
|
except ConnectionClosed:
|
||||||
|
logger.warning("Connection was lost, unable to close channel.")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"An error occurred: {e}", exc_info=True)
|
logger.error(f"Error during channel shutdown: {e}")
|
||||||
finally:
|
|
||||||
await self.shutdown()
|
async def close_connection(self) -> None:
|
||||||
|
try:
|
||||||
|
if self.connection and not self.connection.is_closed:
|
||||||
|
await self.connection.close(exc=asyncio.CancelledError)
|
||||||
|
logger.debug("Connection closed.")
|
||||||
|
else:
|
||||||
|
logger.debug("No connection to close.")
|
||||||
|
except ConnectionClosed:
|
||||||
|
logger.warning("Connection was already closed.")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error closing connection: {e}")
|
||||||
|
|
||||||
async def shutdown(self) -> None:
|
async def shutdown(self) -> None:
|
||||||
logger.info("Shutting down RabbitMQ handler...")
|
logger.info("Shutting down RabbitMQ handler...")
|
||||||
if self.channel:
|
await self.close_channels()
|
||||||
await self.channel.close()
|
await self.close_connection()
|
||||||
if self.connection:
|
|
||||||
await self.connection.close()
|
|
||||||
logger.info("RabbitMQ handler shut down successfully.")
|
logger.info("RabbitMQ handler shut down successfully.")
|
||||||
|
|||||||
@ -1,15 +1,16 @@
|
|||||||
from typing import Callable, Union
|
from typing import Callable
|
||||||
|
|
||||||
from dynaconf import Dynaconf
|
from dynaconf import Dynaconf
|
||||||
from kn_utils.logging import logger
|
from kn_utils.logging import logger
|
||||||
|
|
||||||
from pyinfra.storage.connection import get_storage
|
from pyinfra.storage.connection import get_storage
|
||||||
from pyinfra.storage.utils import (
|
from pyinfra.storage.utils import (
|
||||||
download_data_as_specified_in_message,
|
download_data_bytes_as_specified_in_message,
|
||||||
upload_data_as_specified_in_message,
|
upload_data_as_specified_in_message,
|
||||||
|
DownloadedData,
|
||||||
)
|
)
|
||||||
|
|
||||||
DataProcessor = Callable[[Union[dict, bytes], dict], Union[dict, list, str]]
|
DataProcessor = Callable[[dict[str, DownloadedData] | DownloadedData, dict], dict | list | str]
|
||||||
Callback = Callable[[dict], dict]
|
Callback = Callable[[dict], dict]
|
||||||
|
|
||||||
|
|
||||||
@ -28,7 +29,9 @@ def make_download_process_upload_callback(data_processor: DataProcessor, setting
|
|||||||
|
|
||||||
storage = get_storage(settings, queue_message_payload.get("X-TENANT-ID"))
|
storage = get_storage(settings, queue_message_payload.get("X-TENANT-ID"))
|
||||||
|
|
||||||
data = download_data_as_specified_in_message(storage, queue_message_payload)
|
data: dict[str, DownloadedData] | DownloadedData = download_data_bytes_as_specified_in_message(
|
||||||
|
storage, queue_message_payload
|
||||||
|
)
|
||||||
|
|
||||||
result = data_processor(data, queue_message_payload)
|
result = data_processor(data, queue_message_payload)
|
||||||
|
|
||||||
|
|||||||
@ -10,8 +10,8 @@ import pika
|
|||||||
import pika.exceptions
|
import pika.exceptions
|
||||||
from dynaconf import Dynaconf
|
from dynaconf import Dynaconf
|
||||||
from kn_utils.logging import logger
|
from kn_utils.logging import logger
|
||||||
|
from kn_utils.retry import retry
|
||||||
from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection
|
from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection
|
||||||
from retry import retry
|
|
||||||
|
|
||||||
from pyinfra.config.loader import validate_settings
|
from pyinfra.config.loader import validate_settings
|
||||||
from pyinfra.config.validators import queue_manager_validators
|
from pyinfra.config.validators import queue_manager_validators
|
||||||
@ -35,11 +35,16 @@ class QueueManager:
|
|||||||
self.connection: Union[BlockingConnection, None] = None
|
self.connection: Union[BlockingConnection, None] = None
|
||||||
self.channel: Union[BlockingChannel, None] = None
|
self.channel: Union[BlockingChannel, None] = None
|
||||||
self.connection_sleep = settings.rabbitmq.connection_sleep
|
self.connection_sleep = settings.rabbitmq.connection_sleep
|
||||||
|
self.processing_callback = False
|
||||||
|
self.received_signal = False
|
||||||
|
|
||||||
atexit.register(self.stop_consuming)
|
atexit.register(self.stop_consuming)
|
||||||
signal.signal(signal.SIGTERM, self._handle_stop_signal)
|
signal.signal(signal.SIGTERM, self._handle_stop_signal)
|
||||||
signal.signal(signal.SIGINT, self._handle_stop_signal)
|
signal.signal(signal.SIGINT, self._handle_stop_signal)
|
||||||
|
|
||||||
|
self.max_retries = settings.rabbitmq.max_retries or 5
|
||||||
|
self.max_delay = settings.rabbitmq.max_delay or 60
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def create_connection_parameters(settings: Dynaconf):
|
def create_connection_parameters(settings: Dynaconf):
|
||||||
credentials = pika.PlainCredentials(username=settings.rabbitmq.username, password=settings.rabbitmq.password)
|
credentials = pika.PlainCredentials(username=settings.rabbitmq.username, password=settings.rabbitmq.password)
|
||||||
@ -52,9 +57,12 @@ class QueueManager:
|
|||||||
|
|
||||||
return pika.ConnectionParameters(**pika_connection_params)
|
return pika.ConnectionParameters(**pika_connection_params)
|
||||||
|
|
||||||
@retry(tries=3, delay=5, jitter=(1, 3), logger=logger)
|
@retry(
|
||||||
|
tries=5,
|
||||||
|
exceptions=(pika.exceptions.AMQPConnectionError, pika.exceptions.ChannelClosedByBroker),
|
||||||
|
reraise=True,
|
||||||
|
)
|
||||||
def establish_connection(self):
|
def establish_connection(self):
|
||||||
# TODO: set sensible retry parameters
|
|
||||||
if self.connection and self.connection.is_open:
|
if self.connection and self.connection.is_open:
|
||||||
logger.debug("Connection to RabbitMQ already established.")
|
logger.debug("Connection to RabbitMQ already established.")
|
||||||
return
|
return
|
||||||
@ -77,19 +85,31 @@ class QueueManager:
|
|||||||
logger.info("Connection to RabbitMQ established, channel open.")
|
logger.info("Connection to RabbitMQ established, channel open.")
|
||||||
|
|
||||||
def is_ready(self):
|
def is_ready(self):
|
||||||
self.establish_connection()
|
try:
|
||||||
return self.channel.is_open
|
self.establish_connection()
|
||||||
|
return self.channel.is_open
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to establish connection: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
@retry(exceptions=pika.exceptions.AMQPConnectionError, tries=3, delay=5, jitter=(1, 3), logger=logger)
|
@retry(
|
||||||
|
tries=5,
|
||||||
|
exceptions=pika.exceptions.AMQPConnectionError,
|
||||||
|
reraise=True,
|
||||||
|
)
|
||||||
def start_consuming(self, message_processor: Callable):
|
def start_consuming(self, message_processor: Callable):
|
||||||
on_message_callback = self._make_on_message_callback(message_processor)
|
on_message_callback = self._make_on_message_callback(message_processor)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.establish_connection()
|
self.establish_connection()
|
||||||
self.channel.basic_consume(self.input_queue, on_message_callback)
|
self.channel.basic_consume(self.input_queue, on_message_callback)
|
||||||
|
logger.info("Starting to consume messages...")
|
||||||
self.channel.start_consuming()
|
self.channel.start_consuming()
|
||||||
except Exception:
|
except pika.exceptions.AMQPConnectionError as e:
|
||||||
logger.error("An unexpected error occurred while consuming messages. Consuming will stop.", exc_info=True)
|
logger.error(f"AMQP Connection Error: {e}")
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"An unexpected error occurred while consuming messages: {e}", exc_info=True)
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
self.stop_consuming()
|
self.stop_consuming()
|
||||||
@ -151,6 +171,7 @@ class QueueManager:
|
|||||||
|
|
||||||
def on_message_callback(channel, method, properties, body):
|
def on_message_callback(channel, method, properties, body):
|
||||||
logger.info(f"Received message from queue with delivery_tag {method.delivery_tag}.")
|
logger.info(f"Received message from queue with delivery_tag {method.delivery_tag}.")
|
||||||
|
self.processing_callback = True
|
||||||
|
|
||||||
if method.redelivered:
|
if method.redelivered:
|
||||||
logger.warning(f"Declining message with {method.delivery_tag=} due to it being redelivered.")
|
logger.warning(f"Declining message with {method.delivery_tag=} due to it being redelivered.")
|
||||||
@ -192,9 +213,17 @@ class QueueManager:
|
|||||||
channel.basic_nack(method.delivery_tag, requeue=False)
|
channel.basic_nack(method.delivery_tag, requeue=False)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
finally:
|
||||||
|
self.processing_callback = False
|
||||||
|
if self.received_signal:
|
||||||
|
self.stop_consuming()
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
return on_message_callback
|
return on_message_callback
|
||||||
|
|
||||||
def _handle_stop_signal(self, signum, *args, **kwargs):
|
def _handle_stop_signal(self, signum, *args, **kwargs):
|
||||||
logger.info(f"Received signal {signum}, stopping consuming...")
|
logger.info(f"Received signal {signum}, stopping consuming...")
|
||||||
self.stop_consuming()
|
self.received_signal = True
|
||||||
sys.exit(0)
|
if not self.processing_callback:
|
||||||
|
self.stop_consuming()
|
||||||
|
sys.exit(0)
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
import gzip
|
import gzip
|
||||||
import json
|
import json
|
||||||
from functools import singledispatch
|
from functools import singledispatch
|
||||||
from typing import Union
|
from typing import TypedDict
|
||||||
|
|
||||||
from kn_utils.logging import logger
|
from kn_utils.logging import logger
|
||||||
from pydantic import BaseModel, ValidationError
|
from pydantic import BaseModel, ValidationError
|
||||||
@ -52,28 +52,27 @@ class TenantIdDossierIdFileIdUploadPayload(BaseModel):
|
|||||||
|
|
||||||
|
|
||||||
class TargetResponseFilePathDownloadPayload(BaseModel):
|
class TargetResponseFilePathDownloadPayload(BaseModel):
|
||||||
targetFilePath: Union[str, dict]
|
targetFilePath: str | dict[str, str]
|
||||||
|
|
||||||
|
|
||||||
class TargetResponseFilePathUploadPayload(BaseModel):
|
class TargetResponseFilePathUploadPayload(BaseModel):
|
||||||
responseFilePath: str
|
responseFilePath: str
|
||||||
|
|
||||||
|
|
||||||
def download_data_as_specified_in_message(storage: Storage, raw_payload: dict) -> Union[dict, bytes]:
|
class DownloadedData(TypedDict):
|
||||||
|
data: bytes
|
||||||
|
file_path: str
|
||||||
|
|
||||||
|
|
||||||
|
def download_data_bytes_as_specified_in_message(
|
||||||
|
storage: Storage, raw_payload: dict
|
||||||
|
) -> dict[str, DownloadedData] | DownloadedData:
|
||||||
"""Convenience function to download a file specified in a message payload.
|
"""Convenience function to download a file specified in a message payload.
|
||||||
Supports both legacy and new payload formats. Also supports downloading multiple files at once, which should
|
Supports both legacy and new payload formats. Also supports downloading multiple files at once, which should
|
||||||
be specified in a dictionary under the 'targetFilePath' key with the file path as value.
|
be specified in a dictionary under the 'targetFilePath' key with the file path as value.
|
||||||
|
The data is downloaded as bytes and returned as a dictionary with the file path as key and the data as value.
|
||||||
If the content is compressed with gzip (.gz), it will be decompressed (-> bytes).
|
In case of several download targets, a nested dictionary is returned with the same keys and dictionaries with
|
||||||
If the content is a json file, it will be decoded (-> dict).
|
the file path and data as values.
|
||||||
If no file is specified in the payload or the file does not exist in storage, an exception will be raised.
|
|
||||||
In all other cases, the content will be returned as is (-> bytes).
|
|
||||||
|
|
||||||
This function can be extended in the future as needed (e.g. handling of more file types), but since further
|
|
||||||
requirements are not specified at this point in time, and it is unclear what these would entail, the code is kept
|
|
||||||
simple for now to improve readability, maintainability and avoid refactoring efforts of generic solutions that
|
|
||||||
weren't as generic as they seemed.
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -92,26 +91,25 @@ def download_data_as_specified_in_message(storage: Storage, raw_payload: dict) -
|
|||||||
|
|
||||||
|
|
||||||
@singledispatch
|
@singledispatch
|
||||||
def _download(file_path_or_file_path_dict: Union[str, dict], storage: Storage) -> Union[dict, bytes]:
|
def _download(
|
||||||
|
file_path_or_file_path_dict: str | dict[str, str], storage: Storage
|
||||||
|
) -> dict[str, DownloadedData] | DownloadedData:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
@_download.register(str)
|
@_download.register(str)
|
||||||
def _download_single_file(file_path: str, storage: Storage) -> bytes:
|
def _download_single_file(file_path: str, storage: Storage) -> DownloadedData:
|
||||||
if not storage.exists(file_path):
|
if not storage.exists(file_path):
|
||||||
raise FileNotFoundError(f"File '{file_path}' does not exist in storage.")
|
raise FileNotFoundError(f"File '{file_path}' does not exist in storage.")
|
||||||
|
|
||||||
data = storage.get_object(file_path)
|
data = storage.get_object(file_path)
|
||||||
|
|
||||||
data = gzip.decompress(data) if ".gz" in file_path else data
|
|
||||||
data = json.loads(data.decode("utf-8")) if ".json" in file_path else data
|
|
||||||
logger.info(f"Downloaded {file_path} from storage.")
|
logger.info(f"Downloaded {file_path} from storage.")
|
||||||
|
|
||||||
return data
|
return DownloadedData(data=data, file_path=file_path)
|
||||||
|
|
||||||
|
|
||||||
@_download.register(dict)
|
@_download.register(dict)
|
||||||
def _download_multiple_files(file_path_dict: dict, storage: Storage) -> dict:
|
def _download_multiple_files(file_path_dict: dict, storage: Storage) -> dict[str, DownloadedData]:
|
||||||
return {key: _download(value, storage) for key, value in file_path_dict.items()}
|
return {key: _download(value, storage) for key, value in file_path_dict.items()}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -3,8 +3,10 @@ import json
|
|||||||
from azure.monitor.opentelemetry import configure_azure_monitor
|
from azure.monitor.opentelemetry import configure_azure_monitor
|
||||||
from dynaconf import Dynaconf
|
from dynaconf import Dynaconf
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
|
from kn_utils.logging import logger
|
||||||
from opentelemetry import trace
|
from opentelemetry import trace
|
||||||
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
|
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
|
||||||
|
from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor
|
||||||
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
|
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
|
||||||
from opentelemetry.instrumentation.pika import PikaInstrumentor
|
from opentelemetry.instrumentation.pika import PikaInstrumentor
|
||||||
from opentelemetry.sdk.resources import Resource
|
from opentelemetry.sdk.resources import Resource
|
||||||
@ -18,7 +20,6 @@ from opentelemetry.sdk.trace.export import (
|
|||||||
|
|
||||||
from pyinfra.config.loader import validate_settings
|
from pyinfra.config.loader import validate_settings
|
||||||
from pyinfra.config.validators import opentelemetry_validators
|
from pyinfra.config.validators import opentelemetry_validators
|
||||||
from kn_utils.logging import logger
|
|
||||||
|
|
||||||
|
|
||||||
class JsonSpanExporter(SpanExporter):
|
class JsonSpanExporter(SpanExporter):
|
||||||
@ -84,8 +85,11 @@ def get_exporter(settings: Dynaconf):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def instrument_pika():
|
def instrument_pika(dynamic_queues: bool):
|
||||||
PikaInstrumentor().instrument()
|
if dynamic_queues:
|
||||||
|
AioPikaInstrumentor().instrument()
|
||||||
|
else:
|
||||||
|
PikaInstrumentor().instrument()
|
||||||
|
|
||||||
|
|
||||||
def instrument_app(app: FastAPI, excluded_urls: str = "/health,/ready,/prometheus"):
|
def instrument_app(app: FastAPI, excluded_urls: str = "/health,/ready,/prometheus"):
|
||||||
|
|||||||
@ -1,16 +1,33 @@
|
|||||||
|
import asyncio
|
||||||
import inspect
|
import inspect
|
||||||
import logging
|
import logging
|
||||||
|
import signal
|
||||||
import threading
|
import threading
|
||||||
|
import time
|
||||||
from typing import Callable
|
from typing import Callable
|
||||||
|
|
||||||
import uvicorn
|
import uvicorn
|
||||||
from dynaconf import Dynaconf
|
from dynaconf import Dynaconf
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
|
from kn_utils.logging import logger
|
||||||
|
from kn_utils.retry import retry
|
||||||
|
|
||||||
from pyinfra.config.loader import validate_settings
|
from pyinfra.config.loader import validate_settings
|
||||||
from pyinfra.config.validators import webserver_validators
|
from pyinfra.config.validators import webserver_validators
|
||||||
|
|
||||||
|
|
||||||
|
class PyInfraUvicornServer(uvicorn.Server):
|
||||||
|
# this is a workaround to enable custom signal handlers
|
||||||
|
# https://github.com/encode/uvicorn/issues/1579
|
||||||
|
def install_signal_handlers(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@retry(
|
||||||
|
tries=5,
|
||||||
|
exceptions=Exception,
|
||||||
|
reraise=True,
|
||||||
|
)
|
||||||
def create_webserver_thread_from_settings(app: FastAPI, settings: Dynaconf) -> threading.Thread:
|
def create_webserver_thread_from_settings(app: FastAPI, settings: Dynaconf) -> threading.Thread:
|
||||||
validate_settings(settings, validators=webserver_validators)
|
validate_settings(settings, validators=webserver_validators)
|
||||||
return create_webserver_thread(app=app, port=settings.webserver.port, host=settings.webserver.host)
|
return create_webserver_thread(app=app, port=settings.webserver.port, host=settings.webserver.host)
|
||||||
@ -20,11 +37,43 @@ def create_webserver_thread(app: FastAPI, port: int, host: str) -> threading.Thr
|
|||||||
"""Creates a thread that runs a FastAPI webserver. Start with thread.start(), and join with thread.join().
|
"""Creates a thread that runs a FastAPI webserver. Start with thread.start(), and join with thread.join().
|
||||||
Note that the thread is a daemon thread, so it will be terminated when the main thread is terminated.
|
Note that the thread is a daemon thread, so it will be terminated when the main thread is terminated.
|
||||||
"""
|
"""
|
||||||
thread = threading.Thread(target=lambda: uvicorn.run(app, port=port, host=host, log_level=logging.WARNING))
|
|
||||||
|
def run_server():
|
||||||
|
retries = 5
|
||||||
|
for attempt in range(retries):
|
||||||
|
try:
|
||||||
|
uvicorn.run(app, port=port, host=host, log_level=logging.WARNING)
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
if attempt < retries - 1: # if it's not the last attempt
|
||||||
|
logger.warning(f"Attempt {attempt + 1} failed to start the server: {e}. Retrying...")
|
||||||
|
time.sleep(2**attempt) # exponential backoff
|
||||||
|
else:
|
||||||
|
logger.error(f"Failed to start the server after {retries} attempts: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
thread = threading.Thread(target=run_server)
|
||||||
thread.daemon = True
|
thread.daemon = True
|
||||||
return thread
|
return thread
|
||||||
|
|
||||||
|
|
||||||
|
async def run_async_webserver(app: FastAPI, port: int, host: str):
|
||||||
|
"""Run the FastAPI web server async."""
|
||||||
|
config = uvicorn.Config(app, host=host, port=port, log_level=logging.WARNING)
|
||||||
|
server = PyInfraUvicornServer(config)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await server.serve()
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
logger.debug("Webserver was cancelled.")
|
||||||
|
server.should_exit = True
|
||||||
|
await server.shutdown()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error while running the webserver: {e}", exc_info=True)
|
||||||
|
finally:
|
||||||
|
logger.info("Webserver has been shut down.")
|
||||||
|
|
||||||
|
|
||||||
HealthFunction = Callable[[], bool]
|
HealthFunction = Callable[[], bool]
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
[tool.poetry]
|
[tool.poetry]
|
||||||
name = "pyinfra"
|
name = "pyinfra"
|
||||||
version = "3.0.0"
|
version = "4.1.0"
|
||||||
description = ""
|
description = ""
|
||||||
authors = ["Team Research <research@knecon.com>"]
|
authors = ["Team Research <research@knecon.com>"]
|
||||||
license = "All rights reseverd"
|
license = "All rights reseverd"
|
||||||
@ -18,35 +18,43 @@ azure-storage-blob = "^12.13"
|
|||||||
# misc utils
|
# misc utils
|
||||||
funcy = "^2"
|
funcy = "^2"
|
||||||
pycryptodome = "^3.19"
|
pycryptodome = "^3.19"
|
||||||
# research shared packages
|
|
||||||
kn-utils = { version = "^0.2.7", source = "gitlab-research" }
|
|
||||||
fastapi = "^0.109.0"
|
fastapi = "^0.109.0"
|
||||||
uvicorn = "^0.26.0"
|
uvicorn = "^0.26.0"
|
||||||
# [tool.poetry.group.telemetry.dependencies]
|
|
||||||
opentelemetry-instrumentation-pika = "^0.46b0"
|
# DONT USE GROUPS BECAUSE THEY ARE NOT INSTALLED FOR PACKAGES
|
||||||
opentelemetry-exporter-otlp = "^1.25.0"
|
# [tool.poetry.group.internal.dependencies] <<< THIS IS NOT WORKING
|
||||||
opentelemetry-instrumentation = "^0.46b0"
|
kn-utils = { version = ">=0.4.0", source = "nexus" }
|
||||||
opentelemetry-api = "^1.25.0"
|
# We set all opentelemetry dependencies to lower bound because the image classification service depends on a protobuf version <4, but does not use proto files.
|
||||||
opentelemetry-sdk = "^1.25.0"
|
# Therefore, we allow latest possible protobuf version in the services which use proto files. As soon as the dependency issue is fixed set this to the latest possible opentelemetry version
|
||||||
opentelemetry-exporter-otlp-proto-http = "^1.25.0"
|
opentelemetry-instrumentation-pika = ">=0.46b0,<0.50"
|
||||||
opentelemetry-instrumentation-flask = "^0.46b0"
|
opentelemetry-exporter-otlp = ">=1.25.0,<1.29"
|
||||||
opentelemetry-instrumentation-requests = "^0.46b0"
|
opentelemetry-instrumentation = ">=0.46b0,<0.50"
|
||||||
opentelemetry-instrumentation-fastapi = "^0.46b0"
|
opentelemetry-api = ">=1.25.0,<1.29"
|
||||||
|
opentelemetry-sdk = ">=1.25.0,<1.29"
|
||||||
|
opentelemetry-exporter-otlp-proto-http = ">=1.25.0,<1.29"
|
||||||
|
opentelemetry-instrumentation-flask = ">=0.46b0,<0.50"
|
||||||
|
opentelemetry-instrumentation-requests = ">=0.46b0,<0.50"
|
||||||
|
opentelemetry-instrumentation-fastapi = ">=0.46b0,<0.50"
|
||||||
|
opentelemetry-instrumentation-aio-pika = ">=0.46b0,<0.50"
|
||||||
wcwidth = "<=0.2.12"
|
wcwidth = "<=0.2.12"
|
||||||
azure-monitor-opentelemetry = "^1.6.0"
|
azure-monitor-opentelemetry = "^1.6.0"
|
||||||
aio-pika = "^9.4.2"
|
aio-pika = "^9.4.2"
|
||||||
aiohttp = "^3.9.5"
|
aiohttp = "^3.9.5"
|
||||||
tenacity = "^8.5.0"
|
|
||||||
|
|
||||||
|
# THIS IS NOT AVAILABLE FOR SERVICES THAT IMPLEMENT PYINFRA
|
||||||
[tool.poetry.group.dev.dependencies]
|
[tool.poetry.group.dev.dependencies]
|
||||||
pytest = "^7"
|
pytest = "^7"
|
||||||
ipykernel = "^6.26.0"
|
ipykernel = "^6.26.0"
|
||||||
black = "^23.10"
|
black = "^24.10"
|
||||||
pylint = "^3"
|
pylint = "^3"
|
||||||
coverage = "^7.3"
|
coverage = "^7.3"
|
||||||
requests = "^2.31"
|
requests = "^2.31"
|
||||||
pre-commit = "^3.6.0"
|
pre-commit = "^3.6.0"
|
||||||
cyclonedx-bom = "^4.1.1"
|
cyclonedx-bom = "^4.1.1"
|
||||||
|
dvc = "^3.51.2"
|
||||||
|
dvc-azure = "^3.1.0"
|
||||||
|
deepdiff = "^7.0.1"
|
||||||
|
pytest-cov = "^5.0.0"
|
||||||
|
|
||||||
[tool.pytest.ini_options]
|
[tool.pytest.ini_options]
|
||||||
minversion = "6.0"
|
minversion = "6.0"
|
||||||
@ -76,17 +84,18 @@ disable = [
|
|||||||
"R0904",
|
"R0904",
|
||||||
"R0913",
|
"R0913",
|
||||||
"R0914",
|
"R0914",
|
||||||
"W0511"
|
"W0511",
|
||||||
]
|
]
|
||||||
docstring-min-length = 3
|
docstring-min-length = 3
|
||||||
|
|
||||||
[[tool.poetry.source]]
|
[[tool.poetry.source]]
|
||||||
name = "PyPI"
|
name = "pypi-proxy"
|
||||||
|
url = "https://nexus.knecon.com/repository/pypi-proxy/simple"
|
||||||
priority = "primary"
|
priority = "primary"
|
||||||
|
|
||||||
[[tool.poetry.source]]
|
[[tool.poetry.source]]
|
||||||
name = "gitlab-research"
|
name = "nexus"
|
||||||
url = "https://gitlab.knecon.com/api/v4/groups/19/-/packages/pypi/simple"
|
url = "https://nexus.knecon.com/repository/python/simple"
|
||||||
priority = "explicit"
|
priority = "explicit"
|
||||||
|
|
||||||
[build-system]
|
[build-system]
|
||||||
|
|||||||
17
scripts/send_sigterm.py
Normal file
17
scripts/send_sigterm.py
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
import os
|
||||||
|
import signal
|
||||||
|
import time
|
||||||
|
|
||||||
|
# BE CAREFUL WITH THIS SCRIPT - THIS SIMULATES A SIGTERM FROM KUBERNETES
|
||||||
|
target_pid = int(input("Enter the PID of the target script: "))
|
||||||
|
|
||||||
|
print(f"Sending SIGTERM to PID {target_pid}...")
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
os.kill(target_pid, signal.SIGTERM)
|
||||||
|
print("SIGTERM sent.")
|
||||||
|
except ProcessLookupError:
|
||||||
|
print("Process not found.")
|
||||||
|
except PermissionError:
|
||||||
|
print("Permission denied. Are you trying to signal a process you don't own?")
|
||||||
39
scripts/setup/devenvsetup.sh
Normal file
39
scripts/setup/devenvsetup.sh
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
python_version=$1
|
||||||
|
nexus_user=$2
|
||||||
|
nexus_password=$3
|
||||||
|
|
||||||
|
# cookiecutter https://gitlab.knecon.com/knecon/research/template-python-project.git --checkout master
|
||||||
|
# latest_dir=$(ls -td -- */ | head -n 1) # should be the dir cookiecutter just created
|
||||||
|
|
||||||
|
# cd $latest_dir
|
||||||
|
|
||||||
|
pyenv install $python_version
|
||||||
|
pyenv local $python_version
|
||||||
|
pyenv shell $python_version
|
||||||
|
|
||||||
|
# install poetry globally (PREFERRED), only need to install it once
|
||||||
|
# curl -sSL https://install.python-poetry.org | python3 -
|
||||||
|
|
||||||
|
# remember to update poetry once in a while
|
||||||
|
poetry self update
|
||||||
|
|
||||||
|
# install poetry in current python environment, can lead to multiple instances of poetry being installed on one system (DISPREFERRED)
|
||||||
|
# pip install --upgrade pip
|
||||||
|
# pip install poetry
|
||||||
|
|
||||||
|
poetry config virtualenvs.in-project true
|
||||||
|
poetry config installer.max-workers 10
|
||||||
|
poetry config repositories.pypi-proxy "https://nexus.knecon.com/repository/pypi-proxy/simple"
|
||||||
|
poetry config http-basic.pypi-proxy ${nexus_user} ${nexus_password}
|
||||||
|
poetry config repositories.nexus https://nexus.knecon.com/repository/python/simple
|
||||||
|
poetry config http-basic.nexus ${nexus_user} ${nexus_password}
|
||||||
|
|
||||||
|
poetry env use $(pyenv which python)
|
||||||
|
poetry install --with=dev
|
||||||
|
poetry update
|
||||||
|
|
||||||
|
source .venv/bin/activate
|
||||||
|
|
||||||
|
pre-commit install
|
||||||
|
pre-commit autoupdate
|
||||||
@ -27,6 +27,8 @@ def storage(storage_backend, settings):
|
|||||||
def queue_manager(settings):
|
def queue_manager(settings):
|
||||||
settings.rabbitmq_heartbeat = 10
|
settings.rabbitmq_heartbeat = 10
|
||||||
settings.connection_sleep = 5
|
settings.connection_sleep = 5
|
||||||
|
settings.rabbitmq.max_retries = 3
|
||||||
|
settings.rabbitmq.max_delay = 10
|
||||||
queue_manager = QueueManager(settings)
|
queue_manager = QueueManager(settings)
|
||||||
yield queue_manager
|
yield queue_manager
|
||||||
|
|
||||||
|
|||||||
6
tests/data.dvc
Normal file
6
tests/data.dvc
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
outs:
|
||||||
|
- md5: 75cc98b7c8fcf782a7d4941594e6bc12.dir
|
||||||
|
size: 134913
|
||||||
|
nfiles: 9
|
||||||
|
hash: md5
|
||||||
|
path: data
|
||||||
@ -11,10 +11,18 @@ def exporter(settings):
|
|||||||
return get_exporter(settings)
|
return get_exporter(settings)
|
||||||
|
|
||||||
|
|
||||||
class TestOpenTelemetry:
|
@pytest.fixture(autouse=True)
|
||||||
def test_queue_messages_are_traced(self, queue_manager, input_message, stop_message, settings, exporter):
|
def setup_test_trace(settings, exporter, tracing_type):
|
||||||
setup_trace(settings, exporter=exporter)
|
settings.tracing.type = tracing_type
|
||||||
|
setup_trace(settings, exporter=exporter)
|
||||||
|
|
||||||
|
|
||||||
|
class TestOpenTelemetry:
|
||||||
|
@pytest.mark.xfail(
|
||||||
|
reason="Azure Monitor requires a connection string. Therefore the test is allowed to fail in this case."
|
||||||
|
)
|
||||||
|
@pytest.mark.parametrize("tracing_type", ["opentelemetry", "azure_monitor"])
|
||||||
|
def test_queue_messages_are_traced(self, queue_manager, input_message, stop_message, settings, exporter):
|
||||||
instrument_pika()
|
instrument_pika()
|
||||||
|
|
||||||
queue_manager.purge_queues()
|
queue_manager.purge_queues()
|
||||||
@ -31,21 +39,3 @@ class TestOpenTelemetry:
|
|||||||
assert (
|
assert (
|
||||||
exported_trace["resource"]["attributes"]["service.name"] == settings.tracing.opentelemetry.service_name
|
exported_trace["resource"]["attributes"]["service.name"] == settings.tracing.opentelemetry.service_name
|
||||||
)
|
)
|
||||||
|
|
||||||
# def test_webserver_requests_are_traced(self, settings):
|
|
||||||
# settings.tracing.opentelemetry.exporter = "console"
|
|
||||||
# settings.tracing.enabled = True
|
|
||||||
#
|
|
||||||
# app = FastAPI()
|
|
||||||
#
|
|
||||||
# @app.get("/test")
|
|
||||||
# def test():
|
|
||||||
# return {"test": "test"}
|
|
||||||
#
|
|
||||||
# thread = create_webserver_thread_from_settings(app, settings)
|
|
||||||
# thread.start()
|
|
||||||
# sleep(1)
|
|
||||||
#
|
|
||||||
# requests.get(f"http://{settings.webserver.host}:{settings.webserver.port}/test")
|
|
||||||
#
|
|
||||||
# thread.join(timeout=1)
|
|
||||||
@ -3,7 +3,6 @@ from sys import stdout
|
|||||||
from time import sleep
|
from time import sleep
|
||||||
|
|
||||||
import pika
|
import pika
|
||||||
import pytest
|
|
||||||
from kn_utils.logging import logger
|
from kn_utils.logging import logger
|
||||||
|
|
||||||
logger.remove()
|
logger.remove()
|
||||||
@ -7,7 +7,7 @@ from fastapi import FastAPI
|
|||||||
|
|
||||||
from pyinfra.storage.connection import get_storage_for_tenant
|
from pyinfra.storage.connection import get_storage_for_tenant
|
||||||
from pyinfra.storage.utils import (
|
from pyinfra.storage.utils import (
|
||||||
download_data_as_specified_in_message,
|
download_data_bytes_as_specified_in_message,
|
||||||
upload_data_as_specified_in_message,
|
upload_data_as_specified_in_message,
|
||||||
)
|
)
|
||||||
from pyinfra.utils.cipher import encrypt
|
from pyinfra.utils.cipher import encrypt
|
||||||
@ -139,16 +139,6 @@ def payload(payload_type):
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def expected_data(payload_type):
|
|
||||||
if payload_type == "target_response_file_path":
|
|
||||||
return {"data": "success"}
|
|
||||||
elif payload_type == "dossier_id_file_id":
|
|
||||||
return {"dossierId": "test", "fileId": "file", "data": "success"}
|
|
||||||
elif payload_type == "target_file_dict":
|
|
||||||
return {"file_1": {"data": "success"}, "file_2": {"data": "success"}}
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"payload_type",
|
"payload_type",
|
||||||
[
|
[
|
||||||
@ -160,17 +150,17 @@ def expected_data(payload_type):
|
|||||||
)
|
)
|
||||||
@pytest.mark.parametrize("storage_backend", ["azure", "s3"], scope="class")
|
@pytest.mark.parametrize("storage_backend", ["azure", "s3"], scope="class")
|
||||||
class TestDownloadAndUploadFromMessage:
|
class TestDownloadAndUploadFromMessage:
|
||||||
def test_download_and_upload_from_message(self, storage, payload, expected_data, payload_type):
|
def test_download_and_upload_from_message(self, storage, payload, payload_type):
|
||||||
storage.clear_bucket()
|
storage.clear_bucket()
|
||||||
|
|
||||||
upload_data = expected_data if payload_type != "target_file_dict" else expected_data["file_1"]
|
result = {"process_result": "success"}
|
||||||
storage.put_object("test/file.target.json.gz", gzip.compress(json.dumps(upload_data).encode()))
|
storage_data = {**payload, "data": result}
|
||||||
|
packed_data = gzip.compress(json.dumps(storage_data).encode())
|
||||||
|
|
||||||
data = download_data_as_specified_in_message(storage, payload)
|
storage.put_object("test/file.target.json.gz", packed_data)
|
||||||
|
|
||||||
assert data == expected_data
|
_ = download_data_bytes_as_specified_in_message(storage, payload)
|
||||||
|
upload_data_as_specified_in_message(storage, payload, result)
|
||||||
upload_data_as_specified_in_message(storage, payload, expected_data)
|
|
||||||
data = json.loads(gzip.decompress(storage.get_object("test/file.response.json.gz")).decode())
|
data = json.loads(gzip.decompress(storage.get_object("test/file.response.json.gz")).decode())
|
||||||
|
|
||||||
assert data == {**payload, "data": expected_data}
|
assert data == storage_data
|
||||||
83
tests/unit_test/utils_download_test.py
Normal file
83
tests/unit_test/utils_download_test.py
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
import json
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import patch
|
||||||
|
from pyinfra.storage.utils import (
|
||||||
|
download_data_bytes_as_specified_in_message,
|
||||||
|
upload_data_as_specified_in_message,
|
||||||
|
DownloadedData,
|
||||||
|
)
|
||||||
|
from pyinfra.storage.storages.storage import Storage
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_storage():
|
||||||
|
with patch("pyinfra.storage.utils.Storage") as MockStorage:
|
||||||
|
yield MockStorage()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(
|
||||||
|
params=[
|
||||||
|
{
|
||||||
|
"raw_payload": {
|
||||||
|
"tenantId": "tenant1",
|
||||||
|
"dossierId": "dossier1",
|
||||||
|
"fileId": "file1",
|
||||||
|
"targetFileExtension": "txt",
|
||||||
|
"responseFileExtension": "json",
|
||||||
|
},
|
||||||
|
"expected_result": {
|
||||||
|
"data": b'{"key": "value"}',
|
||||||
|
"file_path": "tenant1/dossier1/file1.txt"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"raw_payload": {
|
||||||
|
"targetFilePath": "some/path/to/file.txt.gz",
|
||||||
|
"responseFilePath": "some/path/to/file.json"
|
||||||
|
},
|
||||||
|
"expected_result": {
|
||||||
|
"data": b'{"key": "value"}',
|
||||||
|
"file_path": "some/path/to/file.txt.gz"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"raw_payload": {
|
||||||
|
"targetFilePath": {
|
||||||
|
"file1": "some/path/to/file1.txt.gz",
|
||||||
|
"file2": "some/path/to/file2.txt.gz"
|
||||||
|
},
|
||||||
|
"responseFilePath": "some/path/to/file.json"
|
||||||
|
},
|
||||||
|
"expected_result": {
|
||||||
|
"file1": {
|
||||||
|
"data": b'{"key": "value"}',
|
||||||
|
"file_path": "some/path/to/file1.txt.gz"
|
||||||
|
},
|
||||||
|
"file2": {
|
||||||
|
"data": b'{"key": "value"}',
|
||||||
|
"file_path": "some/path/to/file2.txt.gz"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
]
|
||||||
|
)
|
||||||
|
def payload_and_expected_result(request):
|
||||||
|
return request.param
|
||||||
|
|
||||||
|
def test_download_data_bytes_as_specified_in_message(mock_storage, payload_and_expected_result):
|
||||||
|
raw_payload = payload_and_expected_result["raw_payload"]
|
||||||
|
expected_result = payload_and_expected_result["expected_result"]
|
||||||
|
mock_storage.get_object.return_value = b'{"key": "value"}'
|
||||||
|
|
||||||
|
result = download_data_bytes_as_specified_in_message(mock_storage, raw_payload)
|
||||||
|
|
||||||
|
assert isinstance(result, dict)
|
||||||
|
assert result == expected_result
|
||||||
|
mock_storage.get_object.assert_called()
|
||||||
|
|
||||||
|
def test_upload_data_as_specified_in_message(mock_storage, payload_and_expected_result):
|
||||||
|
raw_payload = payload_and_expected_result["raw_payload"]
|
||||||
|
data = {"key": "value"}
|
||||||
|
upload_data_as_specified_in_message(mock_storage, raw_payload, data)
|
||||||
|
mock_storage.put_object.assert_called_once()
|
||||||
Loading…
x
Reference in New Issue
Block a user