refactor: IoC for callback, update readme

This commit is contained in:
Julius Unverfehrt 2024-01-25 10:41:48 +01:00
parent f6f56b8d8c
commit b2f073e0c5
5 changed files with 85 additions and 61 deletions

View File

@ -7,7 +7,6 @@
5. [ Scripts ](#scripts) 5. [ Scripts ](#scripts)
6. [ Tests ](#tests) 6. [ Tests ](#tests)
## About ## About
Shared library for the research team, containing code related to infrastructure and communication with other services. Shared library for the research team, containing code related to infrastructure and communication with other services.
@ -31,42 +30,44 @@ The following table shows all necessary settings. You can find a preconfigured s
bitbucket. These are the complete settings, you only need all if using all features of the service as described in bitbucket. These are the complete settings, you only need all if using all features of the service as described in
the [complete example](pyinfra/examples.py). the [complete example](pyinfra/examples.py).
| Environment Variable | Internal / .toml Name | Description | | Environment Variable | Internal / .toml Name | Description |
|------------------------------------|----------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |--------------------------------------|------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| LOGGING__LEVEL | logging.level | Log level | | LOGGING__LEVEL | logging.level | Log level |
| METRICS__PROMETHEUS__ENABLED | metrics.prometheus.enabled | Enable Prometheus metrics collection | | METRICS__PROMETHEUS__ENABLED | metrics.prometheus.enabled | Enable Prometheus metrics collection |
| METRICS__PROMETHEUS__PREFIX | metrics.prometheus.prefix | Prefix for Prometheus metrics (e.g. {product}-{service}) | | METRICS__PROMETHEUS__PREFIX | metrics.prometheus.prefix | Prefix for Prometheus metrics (e.g. {product}-{service}) |
| WEBSERVER__HOST | webserver.host | Host of the webserver (offering e.g. /prometheus, /ready and /health endpoints) | | WEBSERVER__HOST | webserver.host | Host of the webserver (offering e.g. /prometheus, /ready and /health endpoints) |
| WEBSERVER__PORT | webserver.port | Port of the webserver | | WEBSERVER__PORT | webserver.port | Port of the webserver |
| RABBITMQ__HOST | rabbitmq.host | Host of the RabbitMQ server | | RABBITMQ__HOST | rabbitmq.host | Host of the RabbitMQ server |
| RABBITMQ__PORT | rabbitmq.port | Port of the RabbitMQ server | | RABBITMQ__PORT | rabbitmq.port | Port of the RabbitMQ server |
| RABBITMQ__USERNAME | rabbitmq.username | Username for the RabbitMQ server | | RABBITMQ__USERNAME | rabbitmq.username | Username for the RabbitMQ server |
| RABBITMQ__PASSWORD | rabbitmq.password | Password for the RabbitMQ server | | RABBITMQ__PASSWORD | rabbitmq.password | Password for the RabbitMQ server |
| RABBITMQ__HEARTBEAT | rabbitmq.heartbeat | Heartbeat for the RabbitMQ server | | RABBITMQ__HEARTBEAT | rabbitmq.heartbeat | Heartbeat for the RabbitMQ server |
| RABBITMQ__CONNECTION_SLEEP | rabbitmq.connection_sleep | Sleep time intervals during message processing. Has to be a divider of heartbeat, and shouldn't be too big, since only in these intervals queue interactions happen (like receiving new messages) This is also the minimum time the service needs to process a message. | | RABBITMQ__CONNECTION_SLEEP | rabbitmq.connection_sleep | Sleep time intervals during message processing. Has to be a divider of heartbeat, and shouldn't be too big, since only in these intervals queue interactions happen (like receiving new messages) This is also the minimum time the service needs to process a message. |
| RABBITMQ__INPUT_QUEUE | rabbitmq.input_queue | Name of the input queue | | RABBITMQ__INPUT_QUEUE | rabbitmq.input_queue | Name of the input queue |
| RABBITMQ__OUTPUT_QUEUE | rabbitmq.output_queue | Name of the output queue | | RABBITMQ__OUTPUT_QUEUE | rabbitmq.output_queue | Name of the output queue |
| RABBITMQ__DEAD_LETTER_QUEUE | rabbitmq.dead_letter_queue | Name of the dead letter queue | | RABBITMQ__DEAD_LETTER_QUEUE | rabbitmq.dead_letter_queue | Name of the dead letter queue |
| STORAGE__BACKEND | storage.backend | Storage backend to use (currently only "s3" and "azure" are supported) | | STORAGE__BACKEND | storage.backend | Storage backend to use (currently only "s3" and "azure" are supported) |
| STORAGE__CACHE_SIZE | storage.cache_size | Number of cached storage connection (to reduce connection stops and reconnects for multi tenancy). | | STORAGE__S3__BUCKET | storage.s3.bucket | Name of the S3 bucket |
| STORAGE__S3__BUCKET_NAME | storage.s3.bucket_name | Name of the S3 bucket | | STORAGE__S3__ENDPOINT | storage.s3.endpoint | Endpoint of the S3 server |
| STORAGE__S3__ENDPOINT | storage.s3.endpoint | Endpoint of the S3 server | | STORAGE__S3__KEY | storage.s3.key | Access key for the S3 server |
| STORAGE__S3__KEY | storage.s3.key | Access key for the S3 server | | STORAGE__S3__SECRET | storage.s3.secret | Secret key for the S3 server |
| STORAGE__S3__SECRET | storage.s3.secret | Secret key for the S3 server | | STORAGE__S3__REGION | storage.s3.region | Region of the S3 server |
| STORAGE__S3__REGION | storage.s3.region | Region of the S3 server | | STORAGE__AZURE__CONTAINER | storage.azure.container_name | Name of the Azure container |
| STORAGE__AZURE__CONTAINER | storage.azure.container_name | Name of the Azure container | | STORAGE__AZURE__CONNECTION_STRING | storage.azure.connection_string | Connection string for the Azure server |
| STORAGE__AZURE__CONNECTION_STRING | storage.azure.connection_string | Connection string for the Azure server | | STORAGE__TENANT_SERVER__PUBLIC_KEY | storage.tenant_server.public_key | Public key of the tenant server |
| STORAGE__TENANT_SERVER__PUBLIC_KEY | storage.tenant_server.public_key | Public key of the tenant server | | STORAGE__TENANT_SERVER__ENDPOINT | storage.tenant_server.endpoint | Endpoint of the tenant server |
| STORAGE__TENANT_SERVER__ENDPOINT | storage.tenant_server.endpoint | Endpoint of the tenant server | | TRACING__OPENTELEMETRY__ENDPOINT | tracing.opentelemetry.endpoint | Endpoint to which OpenTelemetry traces are exported
| TRACING__ENDPOINT | tracing.endpoint | Endpoint to which OpenTelemetry traces are exported | TRACING__OPENTELEMETRY__SERVICE_NAME | tracing.opentelemetry.service_name | Name of the service as displayed in the traces collected
| TRACING__SERVER_NAME | tracing.server_name | Name of the service as displayed in the traces collected
### OpenTelemetry ### OpenTelemetry
Open telemetry (vis its Python SDK) is set up to be as unobtrusive as possible; for typical use cases it can be configured Open telemetry (vis its Python SDK) is set up to be as unobtrusive as possible; for typical use cases it can be
from environment variables, without additional work in the microservice app, although additional confiuration is possible. configured
from environment variables, without additional work in the microservice app, although additional confiuration is
possible.
`TRACING_ENDPOINT` should typically be set to `http://otel-collector-opentelemetry-collector.otel-collector:4318/v1/traces`. `TRACING_ENDPOINT` should typically be set
to `http://otel-collector-opentelemetry-collector.otel-collector:4318/v1/traces`.
## Queue Manager ## Queue Manager
@ -75,7 +76,7 @@ to the output queue. The default callback also downloads data from the storage a
The response message does not contain the data itself, but the identifiers from the input message (including headers The response message does not contain the data itself, but the identifiers from the input message (including headers
beginning with "X-"). beginning with "X-").
Usage: ### Standalone Usage
```python ```python
from pyinfra.queue.manager import QueueManager from pyinfra.queue.manager import QueueManager
@ -86,11 +87,38 @@ settings = load_settings("path/to/settings")
processing_function: DataProcessor # function should expect a dict (json) or bytes (pdf) as input and should return a json serializable object. processing_function: DataProcessor # function should expect a dict (json) or bytes (pdf) as input and should return a json serializable object.
queue_manager = QueueManager(settings) queue_manager = QueueManager(settings)
queue_manager.start_consuming(make_download_process_upload_callback(processing_function, settings)) callback = make_download_process_upload_callback(processing_function, settings)
queue_manager.start_consuming(make_download_process_upload_callback(callback, settings))
``` ```
### Usage in a Service
This is the recommended way to use the module. This includes the webserver, Prometheus metrics and health endpoints.
Custom endpoints can be added by adding a new route to the `app` object beforehand. Settings are loaded from files
specified as CLI arguments (e.g. `--settings-path path/to/settings.toml`). The values can also be set or overriden via
environment variables (e.g. `LOGGING__LEVEL=DEBUG`).
The callback can be replaced with a custom one, for example if the data to process is contained in the message itself
and not on the storage.
```python
from pyinfra.config.loader import load_settings, parse_args
from pyinfra.examples import start_standard_queue_consumer
from pyinfra.queue.callback import make_download_process_upload_callback, DataProcessor
processing_function: DataProcessor
arguments = parse_args()
settings = load_settings(arguments.settings_path)
callback = make_download_process_upload_callback(processing_function, settings)
start_standard_queue_consumer(callback, settings) # optionally also pass a fastAPI app object with preconfigured routes
```
### AMQP input message: ### AMQP input message:
Either use the legacy format with dossierId and fileId as strings or the new format where absolute paths are used. Either use the legacy format with dossierId and fileId as strings or the new format where absolute paths are used.
All headers beginning with "X-" are forwarded to the message processor, and returned in the response message (e.g. All headers beginning with "X-" are forwarded to the message processor, and returned in the response message (e.g.
"X-TENANT-ID" is used to acquire storage information for the tenant). "X-TENANT-ID" is used to acquire storage information for the tenant).

View File

@ -3,7 +3,7 @@ from fastapi import FastAPI
from kn_utils.logging import logger from kn_utils.logging import logger
from pyinfra.config.loader import get_all_validators, validate_settings from pyinfra.config.loader import get_all_validators, validate_settings
from pyinfra.queue.callback import DataProcessor, make_download_process_upload_callback from pyinfra.queue.callback import Callback
from pyinfra.queue.manager import QueueManager from pyinfra.queue.manager import QueueManager
from pyinfra.utils.opentelemetry import instrument_pika, setup_trace from pyinfra.utils.opentelemetry import instrument_pika, setup_trace
from pyinfra.webserver.prometheus import ( from pyinfra.webserver.prometheus import (
@ -17,20 +17,16 @@ from pyinfra.webserver.utils import (
def start_standard_queue_consumer( def start_standard_queue_consumer(
process_fn: DataProcessor, callback: Callback,
settings: Dynaconf, settings: Dynaconf,
app: FastAPI = None, app: FastAPI = None,
): ):
"""Default serving logic for research services. """Default serving logic for research services.
Supplies /health, /ready and /prometheus endpoints (if enabled). The process_fn is monitored for processing time per Supplies /health, /ready and /prometheus endpoints (if enabled). The callback is monitored for processing time per
call. Also traces the queue messages via openTelemetry (if enabled). message. Also traces the queue messages via openTelemetry (if enabled).
Workload is only received via queue messages. The message contains a file path to the data to be processed, which Workload is received via queue messages and processed by the callback function (see pyinfra.queue.callback for
gets downloaded from the storage. The data and the message are then passed to the process_fn. The process_fn should callbacks).
return a json serializable object. This object is then uploaded to the storage. The response message is just the
original message.
Adapt as needed.
""" """
validate_settings(settings, get_all_validators()) validate_settings(settings, get_all_validators())
@ -43,7 +39,7 @@ def start_standard_queue_consumer(
if settings.metrics.prometheus.enabled: if settings.metrics.prometheus.enabled:
logger.info(f"Prometheus metrics enabled.") logger.info(f"Prometheus metrics enabled.")
app = add_prometheus_endpoint(app) app = add_prometheus_endpoint(app)
process_fn = make_prometheus_processing_time_decorator_from_settings(settings)(process_fn) callback = make_prometheus_processing_time_decorator_from_settings(settings)(callback)
if settings.tracing.opentelemetry.enabled: if settings.tracing.opentelemetry.enabled:
logger.info(f"OpenTelemetry tracing enabled.") logger.info(f"OpenTelemetry tracing enabled.")
@ -55,5 +51,4 @@ def start_standard_queue_consumer(
webserver_thread = create_webserver_thread_from_settings(app, settings) webserver_thread = create_webserver_thread_from_settings(app, settings)
webserver_thread.start() webserver_thread.start()
callback = make_download_process_upload_callback(process_fn, settings)
queue_manager.start_consuming(callback) queue_manager.start_consuming(callback)

View File

@ -9,18 +9,18 @@ from pyinfra.storage.utils import (
upload_data_as_specified_in_message, upload_data_as_specified_in_message,
) )
DataProcessor = Callable[[Union[dict, bytes], dict], dict] DataProcessor = Callable[[Union[dict, bytes], dict], Union[dict, list, str]]
Callback = Callable[[dict], dict]
def make_download_process_upload_callback(data_processor: DataProcessor, settings: Dynaconf): def make_download_process_upload_callback(data_processor: DataProcessor, settings: Dynaconf) -> Callback:
"""Default callback for processing queue messages. """Default callback for processing queue messages.
Data will be downloaded from the storage as specified in the message. If a tenant id is specified, the storage Data will be downloaded from the storage as specified in the message. If a tenant id is specified, the storage
will be configured to use that tenant id, otherwise the storage is configured as specified in the settings. will be configured to use that tenant id, otherwise the storage is configured as specified in the settings.
The data is the passed to the dataprocessor, together with the message. The dataprocessor should return a The data is the passed to the dataprocessor, together with the message. The dataprocessor should return a
json serializable object. This object is then uploaded to the storage as specified in the message. json serializable object. This object is then uploaded to the storage as specified in the message. The response
message is just the original message.
The response message is just the original message.
Adapt as needed.
""" """
def inner(queue_message_payload: dict) -> dict: def inner(queue_message_payload: dict) -> dict:

View File

@ -2,7 +2,6 @@ import json
from dynaconf import Dynaconf from dynaconf import Dynaconf
from fastapi import FastAPI from fastapi import FastAPI
from kn_utils.logging import logger
from opentelemetry import trace from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
@ -43,10 +42,11 @@ def setup_trace(settings: Dynaconf, service_name: str = None, exporter: SpanExpo
processor = BatchSpanProcessor(exporter) processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor) provider.add_span_processor(processor)
# TODO: This produces a warning if trying to set the provider twice. # TODO: trace.set_tracer_provider produces a warning if trying to set the provider twice.
# "WARNING opentelemetry.trace:__init__.py:521 Overriding of current TracerProvider is not allowed" # "WARNING opentelemetry.trace:__init__.py:521 Overriding of current TracerProvider is not allowed"
# This doesn't affect our current usage but should be fixed eventually. # This doesn't seem to affect the functionality since we only want to use the tracer provided set in the beginning.
trace.set_tracer_provider(provider) # We work around the log message by using the protected method with log=False.
trace._set_tracer_provider(provider, log=False)
def get_exporter(settings: Dynaconf): def get_exporter(settings: Dynaconf):

View File

@ -1,9 +1,8 @@
import time import time
from dynaconf import Dynaconf
from pyinfra.config.loader import load_settings, parse_args from pyinfra.config.loader import load_settings, parse_args
from pyinfra.examples import start_standard_queue_consumer from pyinfra.examples import start_standard_queue_consumer
from pyinfra.queue.callback import make_download_process_upload_callback
def processor_mock(_data: dict, _message: dict) -> dict: def processor_mock(_data: dict, _message: dict) -> dict:
@ -14,4 +13,6 @@ def processor_mock(_data: dict, _message: dict) -> dict:
if __name__ == "__main__": if __name__ == "__main__":
arguments = parse_args() arguments = parse_args()
settings = load_settings(arguments.settings_path) settings = load_settings(arguments.settings_path)
start_standard_queue_consumer(processor_mock, settings)
callback = make_download_process_upload_callback(processor_mock, settings)
start_standard_queue_consumer(callback, settings)