This commit is contained in:
Matthias Bisping 2022-02-17 15:14:51 +01:00
parent 8bdad96c71
commit 4459f87c6e
9 changed files with 113 additions and 183 deletions

View File

@ -5,12 +5,12 @@ rabbitmq:
password: $RABBITMQ_PASSWORD|bitnami # RabbitMQ password
heartbeat: $RABBITMQ_HEARTBEAT|7200 # Controls AMQP heartbeat timeout in seconds
queues: # Hack image-service queues
input: image_request_queue # requests to service
output: image_response_queue # responses by service
dead_letter: image_dead_letter_queue # messages that failed to process
queues:
input: image_request_queue # Requests to service
output: image_response_queue # Responses by service
dead_letter: image_dead_letter_queue # Messages that failed to process
prefetch_count: 2
prefetch_count: 1
retry: # Controls retry behaviour for messages the processing of which failed
enabled: $RETRY|False # Toggles retry behaviour
@ -25,15 +25,10 @@ minio:
service:
logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for log file messages
logfile_path: $LOGFILE_PATH|null # Overwrites the default path for the service logfile (image_service/log.log)
name: $SERVICE_NAME|mini-queue-service-v1 # Name of the service in the kubernetes cluster
name: $SERVICE_NAME|pyinfra-service-v1 # Name of the service in the kubernetes cluster
storage_backend: $STORAGE_BACKEND|s3 # The storage to pull files to be processed from
analysis_endpoint: $ANALYSIS_ENDPOINT|"http://127.0.0.1:5000"
webserver:
host: $SANIC_HOST|"0.0.0.0" # Sanic webserver host address
process_host: $SANIC_PROCESS_HOST|"127.0.0.1" # Sanic webserver host address for individual service processes
port: $SANIC_PORT|8080 # Sanic webserver host port
check_quantifier: $CHECK_QUANTIFIER|any # Whether all or any service instance needs to pass all checks for a passed master check
cache: false # Whether to cache readiness and health check results
logging_level_sanic: $LOGGING_LEVEL_SANIC|WARNING
host: $PROBE_SERVER_HOST|"0.0.0.0" # Probe webserver address
port: $PROBE_SERVER_PORT|8080 # Probe webserver port

View File

@ -2,11 +2,11 @@ import json
import logging
from time import sleep
from pyinfra.exceptions import AnalysisFailure, DataLoadingFailure
from pyinfra.rabbitmq import make_connection, make_channel, declare_queue
def make_retry_callback(republish, max_attempts):
def get_n_previous_attempts(props):
return 0 if props.headers is None else props.headers.get("x-retry-count", 0)
@ -30,19 +30,7 @@ def make_retry_callback(republish, max_attempts):
return callback
class AnalysisFailure(Exception):
pass
class DataLoadingFailure(Exception):
pass
class ProcessingFailure(Exception):
pass
def wrap_callback_in_retry_logic(callback, retry_callback):
def wrapped_callback(channel, method, properties, body):
try:
callback(channel, method, properties, body)
@ -56,6 +44,7 @@ def wrap_callback_in_retry_logic(callback, retry_callback):
def json_wrap(body_processor):
def inner(payload):
return json.dumps(body_processor(json.loads(payload)))
return inner

View File

@ -4,7 +4,7 @@ from typing import Callable
import pika
from retry import retry
from pyinfra.callback import ProcessingFailure
from pyinfra.exceptions import ProcessingFailure
from pyinfra.rabbitmq import make_connection, make_channel, declare_queue

69
pyinfra/core.py Normal file
View File

@ -0,0 +1,69 @@
import gzip
import logging
from operator import itemgetter
import requests
from pyinfra.callback import json_wrap
from pyinfra.exceptions import DataLoadingFailure, AnalysisFailure, ProcessingFailure
from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name
from src.serve import get_storage
def make_storage_data_loader(storage):
def get_object_name(payload: dict) -> str:
dossier_id, file_id = itemgetter("dossierId", "fileId")(payload)
object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id)
return object_name
def download(payload):
object_name = get_object_name(payload)
logging.debug(f"Downloading {object_name}...")
data = storage.get_object(object_name)
logging.debug(f"Downloaded {object_name}.")
return data
def decompress(data):
return gzip.decompress(data)
def load_data(payload):
try:
return decompress(download(payload))
except Exception as err:
logging.warning(f"Loading data from storage failed for {payload}")
raise DataLoadingFailure() from err
return load_data
def make_analyzer(analysis_endpoint):
def analyze(data):
try:
analysis_response = requests.post(analysis_endpoint, data=data)
analysis_response.raise_for_status()
analysis_response = analysis_response.json()
return analysis_response
except Exception as err:
logging.warning("Exception caught when calling analysis endpoint.")
raise AnalysisFailure() from err
return analyze
def make_payload_processor(analysis_endpoint):
load_data = make_storage_data_loader(get_storage())
analyze_file = make_analyzer(analysis_endpoint)
@json_wrap
def process(payload: dict):
logging.info(f"Processing {payload}...")
try:
data = load_data(payload)
predictions = analyze_file(data)
return predictions
except (DataLoadingFailure, AnalysisFailure) as err:
logging.warning(f"Processing of {payload} failed.")
raise ProcessingFailure() from err
return process

14
pyinfra/exceptions.py Normal file
View File

@ -0,0 +1,14 @@
class AnalysisFailure(Exception):
pass
class DataLoadingFailure(Exception):
pass
class ProcessingFailure(Exception):
pass
class UnknownStorageBackend(ValueError):
pass

View File

@ -7,7 +7,7 @@ from operator import attrgetter
from typing import Iterable
from pyinfra.utils.file import path_to_compressed_storage_pdf_object_name, provide_directory
from pyinfra.utils.meta import NoAttemptsLeft, max_attempts
from pyinfra.utils.retry import NoAttemptsLeft, max_attempts
class StorageHandle:

View File

@ -1,12 +1,6 @@
"""Defines utilities for different operations on files."""
import gzip
import logging
import os
import shutil
import tempfile
from operator import itemgetter
def provide_directory(path):
@ -33,55 +27,3 @@ def path_to_compressed_storage_pdf_object_name(path):
path_no_ext, ext = os.path.splitext(path)
path_gz = produce_compressed_storage_pdf_object_name(path_no_ext)
return path_gz
def unzip(gz_path, pdf_dir):
def inner():
path, ext = os.path.splitext(gz_path)
basename = os.path.basename(path)
dossier_id = os.path.basename(os.path.dirname(gz_path))
target_dir = os.path.join(pdf_dir, dossier_id)
provide_directory(target_dir)
target_path = os.path.join(target_dir, basename)
assert ext == ".gz"
with gzip.open(gz_path, "rb") as f_in:
with open(target_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
logging.debug(f"unzipped {gz_path} into {target_path}")
return target_path
try:
unzipped_file_path = inner()
finally:
shutil.rmtree(os.path.dirname(gz_path))
return unzipped_file_path
def download(storage_client, object_name, target_root_dir):
logging.debug(f"Downloading {object_name}...")
downloaded_file_path = storage_client.download_file(object_name, target_root_dir=target_root_dir)
logging.debug(f"Downloaded {object_name} into {downloaded_file_path}.")
return downloaded_file_path
def download_pdf_from_storage_via_request_payload(storage_client, payload: dict, pdf_dir: str):
provide_directory(pdf_dir)
with tempfile.TemporaryDirectory() as pdf_compressed_dir:
dossier_id, file_id = itemgetter("dossierId", "fileId")(payload)
object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id)
downloaded_file_path = download(storage_client, object_name, pdf_compressed_dir)
unzipped_file_path = unzip(downloaded_file_path, pdf_dir)
return unzipped_file_path
def get_file_paths(directory):
return [os.path.join(directory, f) for f in os.listdir(directory)]

View File

@ -1,4 +1,3 @@
import functools
import logging
import time
from functools import partial, wraps
@ -6,25 +5,6 @@ from math import exp
from typing import Tuple, Type, Callable
def invocation_counter(var: str):
def inner(func):
invocation_count = 0
@functools.wraps(func)
def inner(*args, **kwargs):
nonlocal invocation_count
invocation_count += 1
if var not in kwargs:
kwargs[var] = invocation_count
return func(*args, **kwargs)
return inner
return inner
class NoAttemptsLeft(Exception):
pass

View File

@ -1,80 +1,21 @@
import gzip
import logging
import traceback
from multiprocessing import Process
from operator import itemgetter
import pika
import requests
from flask import Flask, jsonify
from waitress import serve
from pyinfra.callback import make_retry_callback_for_output_queue, make_retry_callback, json_wrap, \
make_callback_for_output_queue, AnalysisFailure, DataLoadingFailure, ProcessingFailure
from pyinfra.callback import (
make_retry_callback_for_output_queue,
make_retry_callback,
make_callback_for_output_queue,
)
from pyinfra.config import CONFIG
from pyinfra.consume import consume, ConsumerError
from pyinfra.core import make_payload_processor
from pyinfra.exceptions import UnknownStorageBackend
from pyinfra.storage.azure_blob_storage import AzureBlobStorageHandle
from pyinfra.storage.minio import MinioHandle
from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name
def make_storage_data_loader(storage):
def get_object_name(payload: dict) -> str:
dossier_id, file_id = itemgetter("dossierId", "fileId")(payload)
object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id)
return object_name
def download(payload):
object_name = get_object_name(payload)
logging.debug(f"Downloading {object_name}...")
data = storage.get_object(object_name)
logging.debug(f"Downloaded {object_name}.")
return data
def decompress(data):
return gzip.decompress(data)
def load_data(payload):
try:
return decompress(download(payload))
except Exception as err:
logging.warning(f"Loading data from storage failed for {payload}")
raise DataLoadingFailure() from err
return load_data
def make_analyzer(analysis_endpoint):
def analyze(data):
try:
analysis_response = requests.post(analysis_endpoint, data=data)
analysis_response.raise_for_status()
analysis_response = analysis_response.json()
return analysis_response
except Exception as err:
logging.warning("Exception caught when calling analysis endpoint.")
raise AnalysisFailure() from err
return analyze
def make_payload_processor(analysis_endpoint):
load_data = make_storage_data_loader(get_storage())
analyze_file = make_analyzer(analysis_endpoint)
@json_wrap
def process(payload: dict):
logging.info(f"Processing {payload}...")
try:
data = load_data(payload)
predictions = analyze_file(data)
return predictions
except (DataLoadingFailure, AnalysisFailure) as err:
logging.warning(f"Processing of {payload} failed.")
raise ProcessingFailure() from err
return process
# TODO: implement meaningful checks
@ -100,10 +41,6 @@ def start_integrity_checks_webserver(mode="debug"):
serve(app, host=CONFIG.webserver.host, port=CONFIG.webserver.port)
class UnknownStorageBackend(ValueError):
pass
def get_storage():
storage_backend = CONFIG.service.storage_backend
@ -117,8 +54,8 @@ def get_storage():
return storage
def republish(ch, body, n_current_attempts):
ch.basic_publish(
def republish(channel, body, n_current_attempts):
channel.basic_publish(
exchange="",
routing_key=CONFIG.rabbitmq.queues.input,
body=body,
@ -126,7 +63,7 @@ def republish(ch, body, n_current_attempts):
)
def main():
def make_callback():
json_wrapped_body_processor = make_payload_processor(CONFIG.service.analysis_endpoint)
@ -136,20 +73,24 @@ def main():
callback = make_retry_callback_for_output_queue(
json_wrapped_body_processor=json_wrapped_body_processor,
output_queue_name=CONFIG.rabbitmq.queues.output,
retry_callback=retry_callback
retry_callback=retry_callback,
)
else:
callback = make_callback_for_output_queue(
json_wrapped_body_processor=json_wrapped_body_processor,
output_queue_name=CONFIG.rabbitmq.queues.output
json_wrapped_body_processor=json_wrapped_body_processor, output_queue_name=CONFIG.rabbitmq.queues.output
)
return callback
def main():
webserver = Process(target=start_integrity_checks_webserver, args=("production",))
logging.info("Starting webserver...")
webserver.start()
try:
consume(CONFIG.rabbitmq.queues.input, callback)
consume(CONFIG.rabbitmq.queues.input, make_callback())
except KeyboardInterrupt:
pass
except ConsumerError: