diff --git a/config.yaml b/config.yaml index 500ae77..1c0b44b 100755 --- a/config.yaml +++ b/config.yaml @@ -5,12 +5,12 @@ rabbitmq: password: $RABBITMQ_PASSWORD|bitnami # RabbitMQ password heartbeat: $RABBITMQ_HEARTBEAT|7200 # Controls AMQP heartbeat timeout in seconds - queues: # Hack image-service queues - input: image_request_queue # requests to service - output: image_response_queue # responses by service - dead_letter: image_dead_letter_queue # messages that failed to process + queues: + input: image_request_queue # Requests to service + output: image_response_queue # Responses by service + dead_letter: image_dead_letter_queue # Messages that failed to process - prefetch_count: 2 + prefetch_count: 1 retry: # Controls retry behaviour for messages the processing of which failed enabled: $RETRY|False # Toggles retry behaviour @@ -25,15 +25,10 @@ minio: service: logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for log file messages - logfile_path: $LOGFILE_PATH|null # Overwrites the default path for the service logfile (image_service/log.log) - name: $SERVICE_NAME|mini-queue-service-v1 # Name of the service in the kubernetes cluster + name: $SERVICE_NAME|pyinfra-service-v1 # Name of the service in the kubernetes cluster storage_backend: $STORAGE_BACKEND|s3 # The storage to pull files to be processed from analysis_endpoint: $ANALYSIS_ENDPOINT|"http://127.0.0.1:5000" webserver: - host: $SANIC_HOST|"0.0.0.0" # Sanic webserver host address - process_host: $SANIC_PROCESS_HOST|"127.0.0.1" # Sanic webserver host address for individual service processes - port: $SANIC_PORT|8080 # Sanic webserver host port - check_quantifier: $CHECK_QUANTIFIER|any # Whether all or any service instance needs to pass all checks for a passed master check - cache: false # Whether to cache readiness and health check results - logging_level_sanic: $LOGGING_LEVEL_SANIC|WARNING + host: $PROBE_SERVER_HOST|"0.0.0.0" # Probe webserver address + port: $PROBE_SERVER_PORT|8080 # Probe webserver port diff --git a/pyinfra/callback.py b/pyinfra/callback.py index 47900c4..685471c 100644 --- a/pyinfra/callback.py +++ b/pyinfra/callback.py @@ -2,11 +2,11 @@ import json import logging from time import sleep +from pyinfra.exceptions import AnalysisFailure, DataLoadingFailure from pyinfra.rabbitmq import make_connection, make_channel, declare_queue def make_retry_callback(republish, max_attempts): - def get_n_previous_attempts(props): return 0 if props.headers is None else props.headers.get("x-retry-count", 0) @@ -30,19 +30,7 @@ def make_retry_callback(republish, max_attempts): return callback -class AnalysisFailure(Exception): - pass - - -class DataLoadingFailure(Exception): - pass - -class ProcessingFailure(Exception): - pass - - def wrap_callback_in_retry_logic(callback, retry_callback): - def wrapped_callback(channel, method, properties, body): try: callback(channel, method, properties, body) @@ -56,6 +44,7 @@ def wrap_callback_in_retry_logic(callback, retry_callback): def json_wrap(body_processor): def inner(payload): return json.dumps(body_processor(json.loads(payload))) + return inner diff --git a/pyinfra/consume.py b/pyinfra/consume.py index 5e7888d..4e075c5 100644 --- a/pyinfra/consume.py +++ b/pyinfra/consume.py @@ -4,7 +4,7 @@ from typing import Callable import pika from retry import retry -from pyinfra.callback import ProcessingFailure +from pyinfra.exceptions import ProcessingFailure from pyinfra.rabbitmq import make_connection, make_channel, declare_queue diff --git a/pyinfra/core.py b/pyinfra/core.py new file mode 100644 index 0000000..d356630 --- /dev/null +++ b/pyinfra/core.py @@ -0,0 +1,69 @@ +import gzip +import logging +from operator import itemgetter + +import requests + +from pyinfra.callback import json_wrap +from pyinfra.exceptions import DataLoadingFailure, AnalysisFailure, ProcessingFailure +from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name +from src.serve import get_storage + + +def make_storage_data_loader(storage): + def get_object_name(payload: dict) -> str: + dossier_id, file_id = itemgetter("dossierId", "fileId")(payload) + object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id) + return object_name + + def download(payload): + object_name = get_object_name(payload) + logging.debug(f"Downloading {object_name}...") + data = storage.get_object(object_name) + logging.debug(f"Downloaded {object_name}.") + return data + + def decompress(data): + return gzip.decompress(data) + + def load_data(payload): + try: + return decompress(download(payload)) + except Exception as err: + logging.warning(f"Loading data from storage failed for {payload}") + raise DataLoadingFailure() from err + + return load_data + + +def make_analyzer(analysis_endpoint): + def analyze(data): + try: + analysis_response = requests.post(analysis_endpoint, data=data) + analysis_response.raise_for_status() + analysis_response = analysis_response.json() + return analysis_response + except Exception as err: + logging.warning("Exception caught when calling analysis endpoint.") + raise AnalysisFailure() from err + + return analyze + + +def make_payload_processor(analysis_endpoint): + + load_data = make_storage_data_loader(get_storage()) + analyze_file = make_analyzer(analysis_endpoint) + + @json_wrap + def process(payload: dict): + logging.info(f"Processing {payload}...") + try: + data = load_data(payload) + predictions = analyze_file(data) + return predictions + except (DataLoadingFailure, AnalysisFailure) as err: + logging.warning(f"Processing of {payload} failed.") + raise ProcessingFailure() from err + + return process diff --git a/pyinfra/exceptions.py b/pyinfra/exceptions.py new file mode 100644 index 0000000..b97154f --- /dev/null +++ b/pyinfra/exceptions.py @@ -0,0 +1,14 @@ +class AnalysisFailure(Exception): + pass + + +class DataLoadingFailure(Exception): + pass + + +class ProcessingFailure(Exception): + pass + + +class UnknownStorageBackend(ValueError): + pass diff --git a/pyinfra/storage/storage.py b/pyinfra/storage/storage.py index 95c42b9..a1b8d2d 100644 --- a/pyinfra/storage/storage.py +++ b/pyinfra/storage/storage.py @@ -7,7 +7,7 @@ from operator import attrgetter from typing import Iterable from pyinfra.utils.file import path_to_compressed_storage_pdf_object_name, provide_directory -from pyinfra.utils.meta import NoAttemptsLeft, max_attempts +from pyinfra.utils.retry import NoAttemptsLeft, max_attempts class StorageHandle: diff --git a/pyinfra/utils/file.py b/pyinfra/utils/file.py index 66727ce..c3b865a 100644 --- a/pyinfra/utils/file.py +++ b/pyinfra/utils/file.py @@ -1,12 +1,6 @@ """Defines utilities for different operations on files.""" - -import gzip -import logging import os -import shutil -import tempfile -from operator import itemgetter def provide_directory(path): @@ -33,55 +27,3 @@ def path_to_compressed_storage_pdf_object_name(path): path_no_ext, ext = os.path.splitext(path) path_gz = produce_compressed_storage_pdf_object_name(path_no_ext) return path_gz - - -def unzip(gz_path, pdf_dir): - def inner(): - - path, ext = os.path.splitext(gz_path) - basename = os.path.basename(path) - dossier_id = os.path.basename(os.path.dirname(gz_path)) - target_dir = os.path.join(pdf_dir, dossier_id) - provide_directory(target_dir) - target_path = os.path.join(target_dir, basename) - - assert ext == ".gz" - - with gzip.open(gz_path, "rb") as f_in: - with open(target_path, "wb") as f_out: - shutil.copyfileobj(f_in, f_out) - - logging.debug(f"unzipped {gz_path} into {target_path}") - - return target_path - - try: - unzipped_file_path = inner() - finally: - shutil.rmtree(os.path.dirname(gz_path)) - - return unzipped_file_path - - -def download(storage_client, object_name, target_root_dir): - logging.debug(f"Downloading {object_name}...") - downloaded_file_path = storage_client.download_file(object_name, target_root_dir=target_root_dir) - logging.debug(f"Downloaded {object_name} into {downloaded_file_path}.") - return downloaded_file_path - - -def download_pdf_from_storage_via_request_payload(storage_client, payload: dict, pdf_dir: str): - - provide_directory(pdf_dir) - - with tempfile.TemporaryDirectory() as pdf_compressed_dir: - - dossier_id, file_id = itemgetter("dossierId", "fileId")(payload) - object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id) - downloaded_file_path = download(storage_client, object_name, pdf_compressed_dir) - unzipped_file_path = unzip(downloaded_file_path, pdf_dir) - return unzipped_file_path - - -def get_file_paths(directory): - return [os.path.join(directory, f) for f in os.listdir(directory)] diff --git a/pyinfra/utils/meta.py b/pyinfra/utils/retry.py similarity index 88% rename from pyinfra/utils/meta.py rename to pyinfra/utils/retry.py index 722935c..0729407 100644 --- a/pyinfra/utils/meta.py +++ b/pyinfra/utils/retry.py @@ -1,4 +1,3 @@ -import functools import logging import time from functools import partial, wraps @@ -6,25 +5,6 @@ from math import exp from typing import Tuple, Type, Callable -def invocation_counter(var: str): - def inner(func): - invocation_count = 0 - - @functools.wraps(func) - def inner(*args, **kwargs): - nonlocal invocation_count - invocation_count += 1 - - if var not in kwargs: - kwargs[var] = invocation_count - - return func(*args, **kwargs) - - return inner - - return inner - - class NoAttemptsLeft(Exception): pass diff --git a/src/serve.py b/src/serve.py index fa42e97..0c3dffb 100644 --- a/src/serve.py +++ b/src/serve.py @@ -1,80 +1,21 @@ -import gzip import logging -import traceback from multiprocessing import Process -from operator import itemgetter import pika -import requests from flask import Flask, jsonify from waitress import serve -from pyinfra.callback import make_retry_callback_for_output_queue, make_retry_callback, json_wrap, \ - make_callback_for_output_queue, AnalysisFailure, DataLoadingFailure, ProcessingFailure +from pyinfra.callback import ( + make_retry_callback_for_output_queue, + make_retry_callback, + make_callback_for_output_queue, +) from pyinfra.config import CONFIG from pyinfra.consume import consume, ConsumerError +from pyinfra.core import make_payload_processor +from pyinfra.exceptions import UnknownStorageBackend from pyinfra.storage.azure_blob_storage import AzureBlobStorageHandle from pyinfra.storage.minio import MinioHandle -from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name - - -def make_storage_data_loader(storage): - def get_object_name(payload: dict) -> str: - dossier_id, file_id = itemgetter("dossierId", "fileId")(payload) - object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id) - return object_name - - def download(payload): - object_name = get_object_name(payload) - logging.debug(f"Downloading {object_name}...") - data = storage.get_object(object_name) - logging.debug(f"Downloaded {object_name}.") - return data - - def decompress(data): - return gzip.decompress(data) - - def load_data(payload): - try: - return decompress(download(payload)) - except Exception as err: - logging.warning(f"Loading data from storage failed for {payload}") - raise DataLoadingFailure() from err - - return load_data - - -def make_analyzer(analysis_endpoint): - def analyze(data): - try: - analysis_response = requests.post(analysis_endpoint, data=data) - analysis_response.raise_for_status() - analysis_response = analysis_response.json() - return analysis_response - except Exception as err: - logging.warning("Exception caught when calling analysis endpoint.") - raise AnalysisFailure() from err - - return analyze - - -def make_payload_processor(analysis_endpoint): - - load_data = make_storage_data_loader(get_storage()) - analyze_file = make_analyzer(analysis_endpoint) - - @json_wrap - def process(payload: dict): - logging.info(f"Processing {payload}...") - try: - data = load_data(payload) - predictions = analyze_file(data) - return predictions - except (DataLoadingFailure, AnalysisFailure) as err: - logging.warning(f"Processing of {payload} failed.") - raise ProcessingFailure() from err - - return process # TODO: implement meaningful checks @@ -100,10 +41,6 @@ def start_integrity_checks_webserver(mode="debug"): serve(app, host=CONFIG.webserver.host, port=CONFIG.webserver.port) -class UnknownStorageBackend(ValueError): - pass - - def get_storage(): storage_backend = CONFIG.service.storage_backend @@ -117,8 +54,8 @@ def get_storage(): return storage -def republish(ch, body, n_current_attempts): - ch.basic_publish( +def republish(channel, body, n_current_attempts): + channel.basic_publish( exchange="", routing_key=CONFIG.rabbitmq.queues.input, body=body, @@ -126,7 +63,7 @@ def republish(ch, body, n_current_attempts): ) -def main(): +def make_callback(): json_wrapped_body_processor = make_payload_processor(CONFIG.service.analysis_endpoint) @@ -136,20 +73,24 @@ def main(): callback = make_retry_callback_for_output_queue( json_wrapped_body_processor=json_wrapped_body_processor, output_queue_name=CONFIG.rabbitmq.queues.output, - retry_callback=retry_callback + retry_callback=retry_callback, ) else: callback = make_callback_for_output_queue( - json_wrapped_body_processor=json_wrapped_body_processor, - output_queue_name=CONFIG.rabbitmq.queues.output + json_wrapped_body_processor=json_wrapped_body_processor, output_queue_name=CONFIG.rabbitmq.queues.output ) + return callback + + +def main(): + webserver = Process(target=start_integrity_checks_webserver, args=("production",)) logging.info("Starting webserver...") webserver.start() try: - consume(CONFIG.rabbitmq.queues.input, callback) + consume(CONFIG.rabbitmq.queues.input, make_callback()) except KeyboardInterrupt: pass except ConsumerError: