Update readme

This commit is contained in:
Julius Unverfehrt 2024-01-23 18:08:57 +01:00
parent be602d8411
commit 725d6dce45
6 changed files with 104 additions and 107 deletions

170
README.md
View File

@ -2,75 +2,89 @@
1. [ About ](#about)
2. [ Configuration ](#configuration)
3. [ Response Format ](#response-format)
4. [ Usage & API ](#usage--api)
3. [ Queue Manager ](#queue-manager)
4. [ Module Installation ](#module-installation)
5. [ Scripts ](#scripts)
6. [ Tests ](#tests)
## About
Common Module with the infrastructure to deploy Research Projects.
The Infrastructure expects to be deployed in the same Pod / local environment as the analysis container and handles all outbound communication.
Shared library for the research team, containing code related to infrastructure and communication with other services.
Offers a simple interface for processing data and sending responses via AMQP, monitoring via Prometheus and storage
access via S3 or Azure.
To start, see the [complete example](pyinfra/examples.py) which shows how to use all features of the service and can be
imported and used directly for default research service pipelines (data ID in message, download data from storage,
upload result while offering Prometheus monitoring, /health and /ready endpoints and multi tenancy support).
## Configuration
A configuration is located in `/config.yaml`. All relevant variables can be configured via exporting environment variables.
Configuration is done via `Dynaconf`. This means that you can use environment variables, a `.env` file or `.toml`
file(s) to configure the service. You can also combine these methods. The precedence is
`environment variables > .env > .toml`. It is recommended to load settings with the provided
[`load_settings`](pyinfra/config/loader.py) function, which you can combine with the provided
[`parse_args`](pyinfra/config/loader.py) function. This allows you to load settings from a `.toml` file or a folder with
`.toml` files and override them with environment variables.
| Environment Variable | Default | Description |
|-------------------------------|----------------------------------|--------------------------------------------------------------------------|
| LOGGING_LEVEL_ROOT | "DEBUG" | Logging level for service logger |
| MONITORING_ENABLED | True | Enables Prometheus monitoring |
| PROMETHEUS_METRIC_PREFIX | "redactmanager_research_service" | Prometheus metric prefix, per convention '{product_name}_{service name}' |
| PROMETHEUS_HOST | "127.0.0.1" | Prometheus webserver address |
| PROMETHEUS_PORT | 8080 | Prometheus webserver port |
| RABBITMQ_HOST | "localhost" | RabbitMQ host address |
| RABBITMQ_PORT | "5672" | RabbitMQ host port |
| RABBITMQ_USERNAME | "user" | RabbitMQ username |
| RABBITMQ_PASSWORD | "bitnami" | RabbitMQ password |
| RABBITMQ_HEARTBEAT | 60 | Controls AMQP heartbeat timeout in seconds |
| RABBITMQ_CONNECTION_SLEEP | 5 | Controls AMQP connection sleep timer in seconds |
| REQUEST_QUEUE | "request_queue" | Requests to service |
| RESPONSE_QUEUE | "response_queue" | Responses by service |
| DEAD_LETTER_QUEUE | "dead_letter_queue" | Messages that failed to process |
| STORAGE_BACKEND | "s3" | The type of storage to use {s3, azure} |
| STORAGE_BUCKET | "redaction" | The bucket / container to pull files specified in queue requests from |
| STORAGE_ENDPOINT | "http://127.0.0.1:9000" | Endpoint for s3 storage |
| STORAGE_KEY | "root" | User for s3 storage |
| STORAGE_SECRET | "password" | Password for s3 storage |
| STORAGE_AZURECONNECTIONSTRING | "DefaultEndpointsProtocol=..." | Connection string for Azure storage |
| STORAGE_AZURECONTAINERNAME | "redaction" | AKS container |
| WRITE_CONSUMER_TOKEN | "False" | Value to see if we should write a consumer token to a file |
The following table shows all necessary settings. You can find a preconfigured settings file for this service in
bitbucket. These are the complete settings, you only need all if using all features of the service as described in
the [complete example](pyinfra/examples.py).
## Response Format
| Environment Variable | Internal / .toml Name | Description |
|------------------------------------|----------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| LOGGING__LEVEL | logging.level | Log level |
| METRICS__PROMETHEUS__ENABLED | metrics.prometheus.enabled | Enable Prometheus metrics collection |
| METRICS__PROMETHEUS__PREFIX | metrics.prometheus.prefix | Prefix for Prometheus metrics (e.g. {product}-{service}) |
| WEBSERVER__HOST | webserver.host | Host of the webserver (offering e.g. /prometheus, /ready and /health endpoints) |
| WEBSERVER__PORT | webserver.port | Port of the webserver |
| RABBITMQ__HOST | rabbitmq.host | Host of the RabbitMQ server |
| RABBITMQ__PORT | rabbitmq.port | Port of the RabbitMQ server |
| RABBITMQ__USERNAME | rabbitmq.username | Username for the RabbitMQ server |
| RABBITMQ__PASSWORD | rabbitmq.password | Password for the RabbitMQ server |
| RABBITMQ__HEARTBEAT | rabbitmq.heartbeat | Heartbeat for the RabbitMQ server |
| RABBITMQ__CONNECTION_SLEEP | rabbitmq.connection_sleep | Sleep time intervals during message processing. Has to be a divider of heartbeat, and shouldn't be too big, since only in these intervals queue interactions happen (like receiving new messages) This is also the minimum time the service needs to process a message. |
| RABBITMQ__INPUT_QUEUE | rabbitmq.input_queue | Name of the input queue |
| RABBITMQ__OUTPUT_QUEUE | rabbitmq.output_queue | Name of the output queue |
| RABBITMQ__DEAD_LETTER_QUEUE | rabbitmq.dead_letter_queue | Name of the dead letter queue |
| STORAGE__BACKEND | storage.backend | Storage backend to use (currently only "s3" and "azure" are supported) |
| STORAGE__CACHE_SIZE | storage.cache_size | Number of cached storage connection (to reduce connection stops and reconnects for multi tenancy). |
| STORAGE__S3__BUCKET_NAME | storage.s3.bucket_name | Name of the S3 bucket |
| STORAGE__S3__ENDPOINT | storage.s3.endpoint | Endpoint of the S3 server |
| STORAGE__S3__KEY | storage.s3.key | Access key for the S3 server |
| STORAGE__S3__SECRET | storage.s3.secret | Secret key for the S3 server |
| STORAGE__S3__REGION | storage.s3.region | Region of the S3 server |
| STORAGE__AZURE__CONTAINER | storage.azure.container_name | Name of the Azure container |
| STORAGE__AZURE__CONNECTION_STRING | storage.azure.connection_string | Connection string for the Azure server |
| STORAGE__TENANT_SERVER__PUBLIC_KEY | storage.tenant_server.public_key | Public key of the tenant server |
| STORAGE__TENANT_SERVER__ENDPOINT | storage.tenant_server.endpoint | Endpoint of the tenant server |
### Expected AMQP input message:
## Queue Manager
The queue manager is responsible for consuming messages from the input queue, processing them and sending the response
to the output queue. The default callback also downloads data from the storage and uploads the result to the storage.
The response message does not contain the data itself, but the identifiers from the input message (including headers
beginning with "X-").
Usage:
```python
from pyinfra.queue.manager import QueueManager
from pyinfra.queue.callback import make_download_process_upload_callback, DataProcessor
from pyinfra.config.loader import load_settings
settings = load_settings("path/to/settings")
processing_function: DataProcessor # function should expect a dict (json) or bytes (pdf) as input and should return a json serializable object.
queue_manager = QueueManager(settings)
queue_manager.start_consuming(make_download_process_upload_callback(processing_function, settings))
```
### AMQP input message:
Either use the legacy format with dossierId and fileId as strings or the new format where absolute paths are used.
A tenant ID can be optionally provided in the message header (key: "X-TENANT-ID")
```json
{
"targetFilePath": "",
"responseFilePath": ""
}
```
or
```json
{
"dossierId": "",
"fileId": "",
"targetFileExtension": "",
"responseFileExtension": ""
}
```
Optionally, the input message can contain a field with the key `"operations"`.
### AMQP output message:
All headers beginning with "X-" are forwarded to the message processor, and returned in the response message (e.g.
"X-TENANT-ID" is used to acquire storage information for the tenant).
```json
{
@ -84,19 +98,21 @@ or
```json
{
"dossierId": "",
"fileId": ""
"fileId": "",
"targetFileExtension": "",
"responseFileExtension": ""
}
```
## Usage & API
## Module Installation
### Setup
Add the respective version of the pyinfra package to your pyproject.toml file. Make sure to add our gitlab registry as a source.
For now, all internal packages used by pyinfra also have to be added to the pyproject.toml file.
Add the respective version of the pyinfra package to your pyproject.toml file. Make sure to add our gitlab registry as a
source.
For now, all internal packages used by pyinfra also have to be added to the pyproject.toml file (namely kn-utils).
Execute `poetry lock` and `poetry install` to install the packages.
You can look up the latest version of the package in the [gitlab registry](https://gitlab.knecon.com/knecon/research/pyinfra/-/packages).
You can look up the latest version of the package in
the [gitlab registry](https://gitlab.knecon.com/knecon/research/pyinfra/-/packages).
For the used versions of internal dependencies, please refer to the [pyproject.toml](pyproject.toml) file.
```toml
@ -110,45 +126,29 @@ url = "https://gitlab.knecon.com/api/v4/groups/19/-/packages/pypi/simple"
priority = "explicit"
```
### API
```python
from pyinfra import loader
from pyinfra.payload_processing.processor import make_payload_processor
from pyinfra.queue.manager import QueueManager
pyinfra_config = config.get_config()
process_payload = make_payload_processor(process_data, config=pyinfra_config)
queue_manager = QueueManager(pyinfra_config)
queue_manager.start_consuming(process_payload)
```
`process_data` should expect a dict (json) or bytes (pdf) as input and should return a list of results.
## Scripts
### Run pyinfra locally
**Shell 1**: Start minio and rabbitmq containers
```bash
$ cd tests && docker-compose up
$ cd tests && docker compose up
```
**Shell 2**: Start pyinfra with callback mock
```bash
$ python scripts/start_pyinfra.py
```
**Shell 3**: Upload dummy content on storage and publish message
```bash
$ python scripts/send_request.py
```
## Tests
Running all tests take a bit longer than you are probably used to, because among other things the required startup times are
quite high for docker-compose dependent tests. This is why the tests are split into two parts. The first part contains all
tests that do not require docker-compose and the second part contains all tests that require docker-compose.
Per default, only the first part is executed, but when releasing a new version, all tests should be executed.
Tests require a running minio and rabbitmq container, meaning you have to run `docker compose up` in the tests folder
before running the tests.

View File

@ -1,3 +1,4 @@
import argparse
import os
from pathlib import Path
from typing import Union
@ -58,3 +59,15 @@ def validate_settings(settings: Dynaconf, validators):
raise ValidationError("Settings validation failed.")
logger.debug("Settings validated.")
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--settings_path",
"-s",
type=Path,
default=pyinfra_config_path,
help="Path to settings file or folder. Must be a .toml file or a folder containing .toml files.",
)
return parser.parse_args()

View File

@ -15,7 +15,7 @@ def start_queue_consumer_with_prometheus_and_health_endpoints(process_fn: DataPr
Supplies /health, /ready and /prometheus endpoints. The process_fn is monitored for processing time per call.
Workload is only received via queue messages. The message contains a file path to the data to be processed, which
gets downloaded from the storage. The data and the message are then passed to the process_fn. The process_fn should
return a json-dump-able object. This object is then uploaded to the storage. The response message is just the
return a json serializable object. This object is then uploaded to the storage. The response message is just the
original message.
Adapt as needed.

View File

@ -14,7 +14,7 @@ def make_download_process_upload_callback(data_processor: DataProcessor, setting
Data will be downloaded from the storage as specified in the message. If a tenant id is specified, the storage
will be configured to use that tenant id, otherwise the storage is configured as specified in the settings.
The data is the passed to the dataprocessor, together with the message. The dataprocessor should return a
json-dump-able object. This object is then uploaded to the storage as specified in the message.
json serializable object. This object is then uploaded to the storage as specified in the message.
The response message is just the original message.
Adapt as needed.

View File

@ -73,11 +73,11 @@ def download_data_as_specified_in_message(storage: Storage, raw_payload: dict) -
def upload_data_as_specified_in_message(storage: Storage, raw_payload: dict, data):
"""Convenience function to upload a file specified in a message payload. For now, only json-dump-able data is
"""Convenience function to upload a file specified in a message payload. For now, only json serializable data is
supported. The storage json consists of the raw_payload, which is extended with a 'data' key, containing the
data to be uploaded.
If the content is not a json-dump-able object, an exception will be raised.
If the content is not a json serializable object, an exception will be raised.
If the result file identifier specifies compression with gzip (.gz), it will be compressed before upload.
This function can be extended in the future as needed (e.g. if we need to upload images), but since further
@ -95,7 +95,7 @@ def upload_data_as_specified_in_message(storage: Storage, raw_payload: dict, dat
raise ValueError("No upload file path found in payload, nothing to upload.")
if ".json" not in payload.responseFilePath:
raise ValueError("Only json-dump-able data can be uploaded.")
raise ValueError("Only json serializable data can be uploaded.")
data = {**raw_payload, "data": data}

View File

@ -1,25 +1,9 @@
import argparse
import time
from pathlib import Path
from kn_utils.logging import logger
from pyinfra.config.loader import load_settings, pyinfra_config_path
from pyinfra.config.loader import load_settings, parse_args
from pyinfra.examples import start_queue_consumer_with_prometheus_and_health_endpoints
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--settings_path",
"-s",
type=Path,
default=pyinfra_config_path,
help="Path to settings file or folder. Must be a .toml file or a folder containing .toml files.",
)
return parser.parse_args()
def processor_mock(_data: dict, _message: dict) -> dict:
time.sleep(5)
return {"result1": "result1"}