feat(opentel,dynaconf): adapt new pyinfra

Also changes logging to knutils logging.
This commit is contained in:
Julius Unverfehrt 2024-02-08 17:16:08 +01:00
parent 6163e29d6b
commit 499c501acf
7 changed files with 1822 additions and 1461 deletions

44
config/pyinfra.toml Normal file
View File

@ -0,0 +1,44 @@
[metrics.prometheus]
enabled = true
prefix = "redactmanager_image_service"
[tracing.opentelemetry]
enabled = true
endpoint = "http://otel-collector-opentelemetry-collector.otel-collector:4318/v1/traces"
service_name = "redactmanager_image_service"
exporter = "otlp"
[webserver]
host = "0.0.0.0"
port = 8080
[rabbitmq]
host = "localhost"
port = 5672
username = ""
password = ""
heartbeat = 60
# Has to be a divider of heartbeat, and shouldn't be too big, since only in these intervals queue interactions happen (like receiving new messages)
# This is also the minimum time the service needs to process a message
connection_sleep = 5
input_queue = "request_queue"
output_queue = "response_queue"
dead_letter_queue = "dead_letter_queue"
[storage]
backend = "s3"
[storage.s3]
bucket = "redaction"
endpoint = "http://127.0.0.1:9000"
key = ""
secret = ""
region = "eu-central-1"
[storage.azure]
container = "redaction"
connection_string = ""
[storage.tenant_server]
public_key = ""
endpoint = "http://tenant-user-management:8081/internal-api/tenants"

28
config/settings.toml Normal file
View File

@ -0,0 +1,28 @@
[logging]
level = "INFO"
[service]
# Print document processing progress to stdout
verbose = false
batch_size = 16
mlflow_run_id = "fabfb1f192c745369b88cab34471aba7"
# These variables control filters that are applied to either images, image metadata or service_estimator predictions.
# The filter result values are reported in the service responses. For convenience the response to a request contains a
# "filters.allPassed" field, which is set to false if any of the values returned by the filters did not meet its
# specified required value.
[filters]
# Minimum permissible prediction confidence
min_confidence = 0.5
# Image size to page size ratio (ratio of geometric means of areas)
[filters.image_to_page_quotient]
min = 0.05
max = 0.75
# Image width to height ratio
[filters.image_width_to_height_quotient]
min = 0.1
max = 10

View File

@ -1,46 +1,6 @@
"""Implements a config object with dot-indexing syntax."""
from pathlib import Path
from pyinfra.config.loader import load_settings
from envyaml import EnvYAML
from image_prediction.locations import CONFIG_FILE
def _get_item_and_maybe_make_dotindexable(container, item):
ret = container[item]
return DotIndexable(ret) if isinstance(ret, dict) else ret
class DotIndexable:
def __init__(self, x):
self.x = x
def get(self, item, default=None):
try:
return _get_item_and_maybe_make_dotindexable(self.x, item)
except KeyError:
return default
def __getattr__(self, item):
return _get_item_and_maybe_make_dotindexable(self.x, item)
def __repr__(self):
return self.x.__repr__()
def __getitem__(self, item):
return self.__getattr__(item)
class Config:
def __init__(self, config_path):
self.__config = EnvYAML(config_path)
def __getattr__(self, item):
if item in self.__config:
return _get_item_and_maybe_make_dotindexable(self.__config, item)
def __getitem__(self, item):
return self.__getattr__(item)
CONFIG = Config(CONFIG_FILE)
local_root_path = Path(__file__).parents[1]
CONFIG = load_settings(root_path=local_root_path, settings_path="config")

View File

@ -1,27 +1,4 @@
import logging
import kn_utils
from image_prediction.config import CONFIG
def make_logger_getter():
logger = logging.getLogger("imclf")
logger.propagate = False
handler = logging.StreamHandler()
handler.setLevel(CONFIG.service.logging_level)
log_format = "%(asctime)s %(levelname)-8s %(message)s"
formatter = logging.Formatter(log_format, datefmt="%Y-%m-%d %H:%M:%S")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(CONFIG.service.logging_level)
def get_logger():
return logger
return get_logger
get_logger = make_logger_getter()
# TODO: remove this module and use the `get_logger` function from the `kn_utils` package.
get_logger = kn_utils.get_logger

3103
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -8,7 +8,7 @@ packages = [{ include = "image_prediction" }]
[tool.poetry.dependencies]
python = ">=3.10,<3.11"
pyinfra = { version = "1.10.0", source = "gitlab-research" }
pyinfra = { version = "2.0.0", source = "gitlab-research" }
kn-utils = { version = "0.2.7", source = "gitlab-research" }
dvc = "^2.34.0"
dvc-ssh = "^2.20.0"

View File

@ -1,17 +1,16 @@
from image_prediction import logger
from image_prediction.config import Config
from image_prediction.locations import CONFIG_FILE
from sys import stdout
from kn_utils.logging import logger
from pyinfra.examples import start_standard_queue_consumer
from pyinfra.queue.callback import make_download_process_upload_callback
from image_prediction.config import CONFIG
from image_prediction.pipeline import load_pipeline
from image_prediction.utils.banner import load_banner
from image_prediction.utils.process_wrapping import wrap_in_process
from pyinfra import config
from pyinfra.payload_processing.processor import make_payload_processor
from pyinfra.queue.queue_manager import QueueManager
PYINFRA_CONFIG = config.get_config()
IMAGE_CONFIG = Config(CONFIG_FILE)
logger.setLevel(PYINFRA_CONFIG.logging_level_root)
logger.remove()
logger.add(sink=stdout, level=CONFIG.logging.level)
# A component of the processing pipeline (probably tensorflow) does not release allocated memory (see RED-4206).
@ -19,18 +18,16 @@ logger.setLevel(PYINFRA_CONFIG.logging_level_root)
# Workaround: Manage Memory with the operating system, by wrapping the processing in a sub-process.
# FIXME: Find more fine-grained solution or if the problem occurs persistently for python services,
@wrap_in_process
def process_data(data: bytes) -> list:
pipeline = load_pipeline(verbose=IMAGE_CONFIG.service.verbose, batch_size=IMAGE_CONFIG.service.batch_size)
def process_data(data: bytes, _message: dict) -> list:
pipeline = load_pipeline(verbose=CONFIG.service.verbose, batch_size=CONFIG.service.batch_size)
return list(pipeline(data))
def main():
logger.info(load_banner())
process_payload = make_payload_processor(process_data, config=PYINFRA_CONFIG)
queue_manager = QueueManager(PYINFRA_CONFIG)
queue_manager.start_consuming(process_payload)
callback = make_download_process_upload_callback(process_data, CONFIG)
start_standard_queue_consumer(callback, CONFIG)
if __name__ == "__main__":