2024-07-24 17:28:13 +02:00
2024-07-24 17:28:13 +02:00
2024-07-05 13:27:16 +02:00
2024-01-31 12:03:07 +01:00
2024-07-12 12:10:31 +02:00
2024-07-22 16:54:28 +02:00
2024-07-22 17:31:32 +02:00

PyInfra

  1. About
  2. Configuration
  3. Queue Manager
  4. Module Installation
  5. Scripts
  6. Tests

About

Shared library for the research team, containing code related to infrastructure and communication with other services. Offers a simple interface for processing data and sending responses via AMQP, monitoring via Prometheus and storage access via S3 or Azure. Also export traces via OpenTelemetry for queue messages and webserver requests.

To start, see the complete example which shows how to use all features of the service and can be imported and used directly for default research service pipelines (data ID in message, download data from storage, upload result while offering Prometheus monitoring, /health and /ready endpoints and multi tenancy support).

Configuration

Configuration is done via Dynaconf. This means that you can use environment variables, a .env file or .toml file(s) to configure the service. You can also combine these methods. The precedence is environment variables > .env > .toml. It is recommended to load settings with the provided load_settings function, which you can combine with the provided parse_args function. This allows you to load settings from a .toml file or a folder with .toml files and override them with environment variables.

The following table shows all necessary settings. You can find a preconfigured settings file for this service in bitbucket. These are the complete settings, you only need all if using all features of the service as described in the complete example.

Environment Variable Internal / .toml Name Description
LOGGING__LEVEL logging.level Log level
CONCURRENCY__ENABLED concurrency.enabled Enable multi tenant queue mode
METRICS__PROMETHEUS__ENABLED metrics.prometheus.enabled Enable Prometheus metrics collection
METRICS__PROMETHEUS__PREFIX metrics.prometheus.prefix Prefix for Prometheus metrics (e.g. {product}-{service})
WEBSERVER__HOST webserver.host Host of the webserver (offering e.g. /prometheus, /ready and /health endpoints)
WEBSERVER__PORT webserver.port Port of the webserver
RABBITMQ__HOST rabbitmq.host Host of the RabbitMQ server
RABBITMQ__PORT rabbitmq.port Port of the RabbitMQ server
RABBITMQ__USERNAME rabbitmq.username Username for the RabbitMQ server
RABBITMQ__PASSWORD rabbitmq.password Password for the RabbitMQ server
RABBITMQ__HEARTBEAT rabbitmq.heartbeat Heartbeat for the RabbitMQ server
RABBITMQ__CONNECTION_SLEEP rabbitmq.connection_sleep Sleep time intervals during message processing. Has to be a divider of heartbeat, and shouldn't be too big, since only in these intervals queue interactions happen (like receiving new messages) This is also the minimum time the service needs to process a message.
RABBITMQ__INPUT_QUEUE rabbitmq.input_queue Name of the input queue in single queue setting
RABBITMQ__OUTPUT_QUEUE rabbitmq.output_queue Name of the output queue in single queue setting
RABBITMQ__DEAD_LETTER_QUEUE rabbitmq.dead_letter_queue Name of the dead letter queue in single queue setting
RABBITMQ__TENANT_EVENT_QUEUE_SUFFIX rabbitmq.tenant_event_queue_suffix Suffix for the tenant event queue in multi tenant/queue setting
RABBITMQ__TENANT_EVENT_DLQ_SUFFIX rabbitmq.tenant_event_dlq_suffix Suffix for the dead letter queue in multi tenant/queue setting
RABBITMQ__TENANT_EXCHANGE_NAME rabbitmq.tenant_exchange_name Name of tenant exchange in multi tenant/queue setting
RABBITMQ__QUEUE_EXPIRATION_TIME rabbitmq.queue_expiration_time Time until queue expiration in multi tenant/queue setting
RABBITMQ__SERVICE_REQUEST_QUEUE_PREFIX rabbitmq.service_request_queue_prefix Service request queue prefix in multi tenant/queue setting
RABBITMQ__SERVICE_REQUEST_EXCHANGE_NAME rabbitmq.service_request_exchange_name Service request exchange name in multi tenant/queue setting
RABBITMQ__SERVICE_RESPONSE_EXCHANGE_NAME rabbitmq.service_response_exchange_name Service response exchange name in multi tenant/queue setting
RABBITMQ__SERVICE_DLQ_NAME rabbitmq.service_dlq_name Service dead letter queue name in multi tenant/queue setting
STORAGE__BACKEND storage.backend Storage backend to use (currently only "s3" and "azure" are supported)
STORAGE__S3__BUCKET storage.s3.bucket Name of the S3 bucket
STORAGE__S3__ENDPOINT storage.s3.endpoint Endpoint of the S3 server
STORAGE__S3__KEY storage.s3.key Access key for the S3 server
STORAGE__S3__SECRET storage.s3.secret Secret key for the S3 server
STORAGE__S3__REGION storage.s3.region Region of the S3 server
STORAGE__AZURE__CONTAINER storage.azure.container_name Name of the Azure container
STORAGE__AZURE__CONNECTION_STRING storage.azure.connection_string Connection string for the Azure server
STORAGE__TENANT_SERVER__PUBLIC_KEY storage.tenant_server.public_key Public key of the tenant server
STORAGE__TENANT_SERVER__ENDPOINT storage.tenant_server.endpoint Endpoint of the tenant server
TRACING__ENABLED tracing.enabled Enable tracing
TRACING__TYPE tracing.type Tracing mode - possible values: "opentelemetry", "azure_monitor" (Excpects APPLICATIONINSIGHTS_CONNECTION_STRING environment variable.)
TRACING__OPENTELEMETRY__ENDPOINT tracing.opentelemetry.endpoint Endpoint to which OpenTelemetry traces are exported
TRACING__OPENTELEMETRY__SERVICE_NAME tracing.opentelemetry.service_name Name of the service as displayed in the traces collected
TRACING__OPENTELEMETRY__EXPORTER tracing.opentelemetry.exporter Name of exporter
KUBERNETES__POD_NAME kubernetes.pod_name Service pod name

OpenTelemetry

Open telemetry (vis its Python SDK) is set up to be as unobtrusive as possible; for typical use cases it can be configured from environment variables, without additional work in the microservice app, although additional confiuration is possible.

TRACING__OPENTELEMETRY__ENDPOINT should typically be set to http://otel-collector-opentelemetry-collector.otel-collector:4318/v1/traces.

Queue Manager

The queue manager is responsible for consuming messages from the input queue, processing them and sending the response to the output queue. The default callback also downloads data from the storage and uploads the result to the storage. The response message does not contain the data itself, but the identifiers from the input message (including headers beginning with "X-").

Standalone Usage

from pyinfra.queue.manager import QueueManager
from pyinfra.queue.callback import make_download_process_upload_callback, DataProcessor
from pyinfra.config.loader import load_settings

settings = load_settings("path/to/settings")
processing_function: DataProcessor  # function should expect a dict (json) or bytes (pdf) as input and should return a json serializable object.

queue_manager = QueueManager(settings)
callback = make_download_process_upload_callback(processing_function, settings)
queue_manager.start_consuming(make_download_process_upload_callback(callback, settings))

Usage in a Service

This is the recommended way to use the module. This includes the webserver, Prometheus metrics and health endpoints. Custom endpoints can be added by adding a new route to the app object beforehand. Settings are loaded from files specified as CLI arguments (e.g. --settings-path path/to/settings.toml). The values can also be set or overriden via environment variables (e.g. LOGGING__LEVEL=DEBUG).

The callback can be replaced with a custom one, for example if the data to process is contained in the message itself and not on the storage.

from pyinfra.config.loader import load_settings, parse_settings_path
from pyinfra.examples import start_standard_queue_consumer
from pyinfra.queue.callback import make_download_process_upload_callback, DataProcessor

processing_function: DataProcessor

arguments = parse_settings_path()
settings = load_settings(arguments.settings_path)

callback = make_download_process_upload_callback(processing_function, settings)
start_standard_queue_consumer(callback, settings)  # optionally also pass a fastAPI app object with preconfigured routes

AMQP input message:

Either use the legacy format with dossierId and fileId as strings or the new format where absolute paths are used. All headers beginning with "X-" are forwarded to the message processor, and returned in the response message (e.g. "X-TENANT-ID" is used to acquire storage information for the tenant).

{
  "targetFilePath": "",
  "responseFilePath": ""
}

or

{
  "dossierId": "",
  "fileId": "",
  "targetFileExtension": "",
  "responseFileExtension": ""
}

Module Installation

Add the respective version of the pyinfra package to your pyproject.toml file. Make sure to add our gitlab registry as a source. For now, all internal packages used by pyinfra also have to be added to the pyproject.toml file (namely kn-utils). Execute poetry lock and poetry install to install the packages.

You can look up the latest version of the package in the gitlab registry. For the used versions of internal dependencies, please refer to the pyproject.toml file.

[tool.poetry.dependencies]
pyinfra = { version = "x.x.x", source = "gitlab-research" }
kn-utils = { version = "x.x.x", source = "gitlab-research" }

[[tool.poetry.source]]
name = "gitlab-research"
url = "https://gitlab.knecon.com/api/v4/groups/19/-/packages/pypi/simple"
priority = "explicit"

Scripts

Run pyinfra locally

Shell 1: Start minio and rabbitmq containers

$ cd tests && docker compose up

Shell 2: Start pyinfra with callback mock

$ python scripts/start_pyinfra.py

Shell 3: Upload dummy content on storage and publish message

$ python scripts/send_request.py

Tests

Tests require a running minio and rabbitmq container, meaning you have to run docker compose up in the tests folder before running the tests.

Description
Infrastructure container for analysis container
Readme 3.2 MiB
Release 4.1.0 Latest
2025-01-22 12:38:26 +01:00
Languages
Python 96.7%
Makefile 2%
Shell 1.3%