From 93d899c83c7aad2333e69fff903932e9b7dea008 Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Tue, 15 Feb 2022 09:38:16 +0100 Subject: [PATCH] rancher ready --- Dockerfile | 3 +- config.yaml | 47 +-------- dotfiles/minimal_conf_rancher.yaml | 162 +++++++++++++++++++++++++++++ mini_queue/consumer.py | 16 --- mini_queue/run.py | 48 ++++++--- 5 files changed, 204 insertions(+), 72 deletions(-) create mode 100644 dotfiles/minimal_conf_rancher.yaml delete mode 100644 mini_queue/consumer.py diff --git a/Dockerfile b/Dockerfile index 92c4861..95c77b6 100755 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.8 as builder +FROM python:3.8 # Use a virtual environment. RUN python -m venv /app/venv @@ -11,6 +11,7 @@ RUN python -m pip install --upgrade pip WORKDIR /app/service COPY . ./ +# Install module & dependencies RUN python3 -m pip install -e . RUN python3 -m pip install -r requirements.txt diff --git a/config.yaml b/config.yaml index 73b2943..c30d39d 100755 --- a/config.yaml +++ b/config.yaml @@ -7,51 +7,12 @@ rabbitmq: heartbeat: $RABBITMQ_HEARTBEAT|7200 # Controls AMQP heartbeat timeout in seconds queues: # Names of queues for... - input: image_request_queue # requests to service - output: image_response_queue # responses by service - dead_letter: image_dead_letter_queue # messages that failed to process - - prefetch_count: 1 - - retry: # Controls retry behaviour for messages the processing of which failed - enabled: $RETRY|True # Toggles retry behaviour - max_attempts: $MAX_ATTEMPTS|3 # Number of times a message may fail before being published to dead letter queue -# TODO: implement - max_interval: $MAX_INTERVAL|15000 # Increase timeout for a message every time it fails to a maximum of this value - -minio: - host: $STORAGE_ENDPOINT|localhost # MinIO host address - port: $STORAGE_PORT|9000 # MinIO host port - user: $STORAGE_KEY|root # MinIO user name - password: $STORAGE_SECRET|password # MinIO user password - bucket: $STORAGE_BUCKET_NAME|redaction # MinIO bucket - -azure_blob_storage: - connection_string: $STORAGE_AZURECONNECTIONSTRING|"DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net" - container: $STORAGE_AZURECONTAINERNAME|"image-service-v2-test-data" - -sanic: - host: $SANIC_HOST|"0.0.0.0" # Sanic webserver host address - process_host: $SANIC_PROCESS_HOST|"127.0.0.1" # Sanic webserver host address for individual service processes - port: $SANIC_PORT|8080 # Sanic webserver host port - check_quantifier: $CHECK_QUANTIFIER|any # Whether all or any service instance needs to pass all checks for a passed master check - cache: false # Whether to cache readiness and health check results - logging_level_sanic: $LOGGING_LEVEL_SANIC|WARNING + input: mini_request_queue # requests to service + output: mini_response_queue # responses by service + dead_letter: mini_letter_queue # messages that failed to process service: logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for log file messages logfile_path: $LOGFILE_PATH|null # Overwrites the default path for the service logfile (image_service/log.log) - verbose: $VERBOSE|True # Service workers print document processing progress to stdout - assert_gpu: $ASSERT_GPU|False # Whether to make a working GPU a mandatory readiness condition - run_id: $RUN_ID|fabfb1f192c745369b88cab34471aba7 # The ID of the mlflow run to load the model from - n_instances: $CONCURRENCY|1 # Number of service top loops that run in parallel (processes, not threads!) - name: $SERVICE_NAME|image-service-v2 # Name of the service in the kubernetes cluster - storage_backend: $STORAGE_BACKEND|s3 # The storage to pull files to be processed from - model_cache_file: "/root/.keras/models/efficientnetb0_notop.h5" # Workaround to intercept auto-download if model is not cached - batch_size: $BATCH_SIZE|32 # Number of images in memory simultaneously per service instance - minimum_free_memory_percentage: $MINIMUM_FREE_MEMORY_PERCENTAGE|.3 # Minimum allowed percentage of free memory - available_memory: $AVAILABLE_MEMORY|6000 # Available memory in MB - monitor_memory_usage: $MONITOR_MEMORY_USAGE|True # Whether to monitor the memory usage and kill the process when memory is insufficient - -pdftron_license_key: "Knecon AG(en.knecon.swiss):OEM:DDA-R::WL+:AMS(20211029):BECC974307DAB4F34B513BC9B2531B24496F6FCB83CD8AC574358A959730B622FABEF5C7" + name: $SERVICE_NAME|mini-queue-service-v1 # Name of the service in the kubernetes cluster diff --git a/dotfiles/minimal_conf_rancher.yaml b/dotfiles/minimal_conf_rancher.yaml new file mode 100644 index 0000000..a23ee21 --- /dev/null +++ b/dotfiles/minimal_conf_rancher.yaml @@ -0,0 +1,162 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + annotations: + meta.helm.sh/release-name: red-research + meta.helm.sh/release-namespace: red-research + labels: + apiVersion: v2 + app: image-service + app.kubernetes.io/instance: red-research + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/name: redaction + helm.sh/chart: redaction + io.cattle.field/appId: red-research + type: service + name: mini-queue + namespace: red-research +spec: + selector: + matchLabels: + apiVersion: v2 + app: image-service + io.cattle.field/appId: red-research + template: + metadata: + annotations: + prometheus.io/path: /prometheus + prometheus.io/port: "8080" + prometheus.io/scrape: "true" + labels: + apiVersion: v2 + app: image-service + io.cattle.field/appId: red-research + spec: + affinity: + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - podAffinityTerm: + labelSelector: + matchExpressions: + - key: app + operator: In + values: + - image-service + topologyKey: kubernetes.io/hostname + weight: 100 + automountServiceAccountToken: false + containers: + - env: + - name: BATCH_SIZE + value: "32" + - name: CONCURRENCY + value: "1" + - name: LOGGING_LEVEL_ROOT + value: DEBUG + - name: MAX_IMAGE_FORMAT + value: "10" + - name: MAX_REL_IMAGE_SIZE + value: "0.75" + - name: MINIMUM_FREE_MEMORY_PERCENTAGE + value: "0.3" + - name: MIN_IMAGE_FORMAT + value: "0.1" + - name: MIN_REL_IMAGE_SIZE + value: "0.05" + - name: MONITORING_ENABLED + value: "true" + - name: MONITOR_MEMORY_USAGE + value: "true" + - name: RABBITMQ_HEARTBEAT + value: "7200" + - name: RABBITMQ_HOST + value: red-research-rabbitmq + - name: RABBITMQ_USERNAME + value: user + - name: RUN_ID + value: fabfb1f192c745369b88cab34471aba7 + - name: STORAGE_BUCKET_NAME + value: redaction + - name: STORAGE_ENDPOINT + value: red-research-minio-headless + - name: VERBOSE + value: "true" + - name: RABBITMQ_PASSWORD + valueFrom: + secretKeyRef: + key: rabbitmq-password + name: red-research-rabbitmq + optional: false + - name: STORAGE_KEY + valueFrom: + secretKeyRef: + key: root-user + name: red-research-minio + optional: false + - name: STORAGE_SECRET + valueFrom: + secretKeyRef: + key: root-password + name: red-research-minio + optional: false + envFrom: + - configMapRef: + name: storage-backend + optional: false + image: nexus.iqser.com:5001/red/mini-queue-service-v1:latest + imagePullPolicy: Always + name: mini-queue + ports: + - containerPort: 8080 + name: http + protocol: TCP + resources: + limits: + cpu: "2" + memory: 4000Mi + requests: + cpu: "1" + memory: 2000Mi + securityContext: + allowPrivilegeEscalation: false + capabilities: {} + readOnlyRootFilesystem: true + runAsNonRoot: true + runAsUser: 1001 + volumeMounts: + - mountPath: /tmp + name: tmp + - mountPath: /app/service/incl/redai_image/data/tmp + name: data-tmp + - mountPath: /app/service/incl/image_service/data/mlruns/.trash + name: trash + imagePullSecrets: + - name: nexus + initContainers: + - command: + - sh + - -c + - until nc -z -w 10 red-research-rabbitmq 5672; do echo waiting for rabbitmq; + done; echo rabbitmq found + image: nexus.iqser.com:5001/infra/busybox:1.33.1 + imagePullPolicy: Always + name: init-rabbitmq + resources: + limits: + cpu: 100m + memory: 128Mi + requests: + cpu: 50m + memory: 64Mi + securityContext: + allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + runAsNonRoot: true + runAsUser: 1001 + volumes: + - emptyDir: {} + name: tmp + - emptyDir: {} + name: data-tmp + - emptyDir: {} + name: trash diff --git a/mini_queue/consumer.py b/mini_queue/consumer.py deleted file mode 100644 index 05f9718..0000000 --- a/mini_queue/consumer.py +++ /dev/null @@ -1,16 +0,0 @@ -import pika -import sys -from retry import retry - -from mini_queue.utils.config import CONFIG - - -def init_params(): - credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) - parameters = pika.ConnectionParameters( - host=CONFIG.rabbitmq.host, - port=CONFIG.rabbitmq.port, - heartbeat=CONFIG.rabbitmq.heartbeat, - credentials=credentials, - ) - return parameters diff --git a/mini_queue/run.py b/mini_queue/run.py index e73b4b7..5694905 100644 --- a/mini_queue/run.py +++ b/mini_queue/run.py @@ -1,5 +1,4 @@ -from mini_queue.consumer import init_params -from mini_queue.utils.config import CONFIG +import logging import sys import pika from retry import retry @@ -8,28 +7,53 @@ from mini_queue.utils.config import CONFIG def callback(channel, method, properties, body): - print("Received %r" % body) + logging.info(" [R] Received %r" % body) + sys.sleep(1) response = body channel.basic_publish(exchange="", routing_key=CONFIG.rabbitmq.queues.output, body=response) channel.basic_ack(delivery_tag=method.delivery_tag) -if __name__ == "__main__": +def init_params(): + credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) + parameters = pika.ConnectionParameters( + host=CONFIG.rabbitmq.host, + port=CONFIG.rabbitmq.port, + heartbeat=CONFIG.rabbitmq.heartbeat, + credentials=credentials, + ) + return parameters - print("startet happy pikachu!") + +def main(): + + logging.info(" [S] Startet happy pikachu!") parameters = init_params() - connection = pika.BlockingConnection(parameters) - channel = connection.channel() - - channel.queue_declare(queue=CONFIG.rabbitmq.queues.output, durable=True) + #channel.queue_declare(queue=CONFIG.rabbitmq.queues.output, durable=True) while True: try: channel.basic_consume(queue=CONFIG.rabbitmq.queues.input, auto_ack=False, on_message_callback=callback) - print(" [*] Waiting for messages. To exit press CTRL+C") + logging.info(" [*] Waiting for messages. To exit press CTRL+C") channel.start_consuming() - except: - pass + + except pika.exceptions.ConnectionClosedByBroker as err: + logging.info("Caught a channel error: {}, stopping...".format(err)) + continue + except pika.exceptions.AMQPChannelError as err: + logging.warning("Caught a channel error: {}, stopping...".format(err)) + break + except pika.exceptions.AMQPConnectionError: + logging.info("Connection was closed, retrying...") + continue + + + + +if __name__ == "__main__": + logging_level = CONFIG.service.logging_level + logging.basicConfig(level=logging_level) + main() \ No newline at end of file