From 8bdad96c71920e7ac69aabba23b47136b929ea91 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Thu, 17 Feb 2022 14:52:14 +0100 Subject: [PATCH] retry logic added --- config.yaml | 4 ++ pyinfra/callback.py | 102 ++++++++++++++++++++++++-------------------- pyinfra/consume.py | 6 ++- src/serve.py | 58 +++++++++++++++++++------ 4 files changed, 109 insertions(+), 61 deletions(-) diff --git a/config.yaml b/config.yaml index 62cab56..500ae77 100755 --- a/config.yaml +++ b/config.yaml @@ -12,6 +12,10 @@ rabbitmq: prefetch_count: 2 + retry: # Controls retry behaviour for messages the processing of which failed + enabled: $RETRY|False # Toggles retry behaviour + max_attempts: $MAX_ATTEMPTS|3 # Number of times a message may fail before being published to dead letter queue + minio: host: $STORAGE_ENDPOINT|localhost # MinIO host address port: $STORAGE_PORT|9000 # MinIO host port diff --git a/pyinfra/callback.py b/pyinfra/callback.py index 4546303..47900c4 100644 --- a/pyinfra/callback.py +++ b/pyinfra/callback.py @@ -1,51 +1,62 @@ import json - -from flask import jsonify +import logging +from time import sleep from pyinfra.rabbitmq import make_connection, make_channel, declare_queue +def make_retry_callback(republish, max_attempts): -# def make_request_processor(consumer): -# def get_n_previous_attempts(props): -# return 0 if props.headers is None else props.headers.get("x-retry-count", 0) -# -# def attempts_remain(n_attempts): -# return n_attempts < max_attempts and CONFIG.rabbitmq.retry.enabled -# -# def republish(ch, body, n_current_attempts): -# ch.basic_publish( -# exchange="", -# routing_key=CONFIG.rabbitmq.queues.input, -# body=body, -# properties=pika.BasicProperties(headers={"x-retry-count": n_current_attempts}), -# ) -# -# def on_request(ch, method, props, body): -# -# try: -# response = consumer(body) -# ch.basic_publish(exchange="", routing_key=CONFIG.rabbitmq.queues.output, body=response) -# ch.basic_ack(delivery_tag=method.delivery_tag) -# -# except Exception as e: -# -# n_attempts = get_n_previous_attempts(props) + 1 -# -# logger.error(f"Message failed to process {n_attempts}/{max_attempts} times. Error: {e}") -# if n_attempts == max_attempts: -# logger.exception(f"Adding to dead letter queue. Last exception: {e}") -# -# if attempts_remain(n_attempts): -# republish(ch, body, n_attempts) -# ch.basic_ack(delivery_tag=method.delivery_tag) -# -# else: -# ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False) -# -# max_attempts = CONFIG.rabbitmq.retry.max_attempts -# -# return on_request + def get_n_previous_attempts(props): + return 0 if props.headers is None else props.headers.get("x-retry-count", 0) + + def attempts_remain(n_attempts): + return n_attempts < max_attempts + + def callback(channel, method, properties, body): + + n_attempts = get_n_previous_attempts(properties) + 1 + + logging.error(f"Message failed to process {n_attempts}/{max_attempts} times: {body}") + + if attempts_remain(n_attempts): + republish(channel, body, n_attempts) + channel.basic_ack(delivery_tag=method.delivery_tag) + + else: + logging.exception(f"Adding to dead letter queue: {body}") + channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False) + + return callback + + +class AnalysisFailure(Exception): + pass + + +class DataLoadingFailure(Exception): + pass + +class ProcessingFailure(Exception): + pass + + +def wrap_callback_in_retry_logic(callback, retry_callback): + + def wrapped_callback(channel, method, properties, body): + try: + callback(channel, method, properties, body) + except (AnalysisFailure, DataLoadingFailure): + sleep(5) + retry_callback(channel, method, properties, body) + + return wrapped_callback + + +def json_wrap(body_processor): + def inner(payload): + return json.dumps(body_processor(json.loads(payload))) + return inner def make_callback_for_output_queue(json_wrapped_body_processor, output_queue_name): @@ -63,9 +74,8 @@ def make_callback_for_output_queue(json_wrapped_body_processor, output_queue_nam return callback -def processor_to_callback(body_processor, output_queue_name): - def wrap_body_processor_on_json_decode_encode_operations(payload): - return json.dumps(body_processor(json.loads(payload))) +def make_retry_callback_for_output_queue(json_wrapped_body_processor, output_queue_name, retry_callback): + callback = make_callback_for_output_queue(json_wrapped_body_processor, output_queue_name) + callback = wrap_callback_in_retry_logic(callback, retry_callback) - callback = make_callback_for_output_queue(wrap_body_processor_on_json_decode_encode_operations, output_queue_name) return callback diff --git a/pyinfra/consume.py b/pyinfra/consume.py index 46729fc..5e7888d 100644 --- a/pyinfra/consume.py +++ b/pyinfra/consume.py @@ -4,6 +4,7 @@ from typing import Callable import pika from retry import retry +from pyinfra.callback import ProcessingFailure from pyinfra.rabbitmq import make_connection, make_channel, declare_queue @@ -11,7 +12,7 @@ class ConsumerError(Exception): pass -# @retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3)) +@retry(ConsumerError, tries=3, delay=5, jitter=(1, 3)) def consume(queue_name: str, on_message_callback: Callable): connection = make_connection() @@ -38,4 +39,5 @@ def consume(queue_name: str, on_message_callback: Callable): logging.debug(err) continue - raise ConsumerError(f"Error while consuming {queue_name}.") + except ProcessingFailure as err: + raise ConsumerError(f"Error while consuming {queue_name}.") from err diff --git a/src/serve.py b/src/serve.py index 62b80cb..fa42e97 100644 --- a/src/serve.py +++ b/src/serve.py @@ -4,11 +4,13 @@ import traceback from multiprocessing import Process from operator import itemgetter +import pika import requests from flask import Flask, jsonify from waitress import serve -from pyinfra.callback import processor_to_callback +from pyinfra.callback import make_retry_callback_for_output_queue, make_retry_callback, json_wrap, \ + make_callback_for_output_queue, AnalysisFailure, DataLoadingFailure, ProcessingFailure from pyinfra.config import CONFIG from pyinfra.consume import consume, ConsumerError from pyinfra.storage.azure_blob_storage import AzureBlobStorageHandle @@ -33,7 +35,11 @@ def make_storage_data_loader(storage): return gzip.decompress(data) def load_data(payload): - return decompress(download(payload)) + try: + return decompress(download(payload)) + except Exception as err: + logging.warning(f"Loading data from storage failed for {payload}") + raise DataLoadingFailure() from err return load_data @@ -47,9 +53,7 @@ def make_analyzer(analysis_endpoint): return analysis_response except Exception as err: logging.warning("Exception caught when calling analysis endpoint.") - logging.warning(err) - logging.exception(traceback.format_exc()) - raise err + raise AnalysisFailure() from err return analyze @@ -59,11 +63,16 @@ def make_payload_processor(analysis_endpoint): load_data = make_storage_data_loader(get_storage()) analyze_file = make_analyzer(analysis_endpoint) + @json_wrap def process(payload: dict): logging.info(f"Processing {payload}...") - data = load_data(payload) - predictions = analyze_file(data) - return predictions + try: + data = load_data(payload) + predictions = analyze_file(data) + return predictions + except (DataLoadingFailure, AnalysisFailure) as err: + logging.warning(f"Processing of {payload} failed.") + raise ProcessingFailure() from err return process @@ -108,13 +117,34 @@ def get_storage(): return storage -def main(): - - callback = processor_to_callback( - make_payload_processor(CONFIG.service.analysis_endpoint), output_queue_name=CONFIG.rabbitmq.queues.output +def republish(ch, body, n_current_attempts): + ch.basic_publish( + exchange="", + routing_key=CONFIG.rabbitmq.queues.input, + body=body, + properties=pika.BasicProperties(headers={"x-retry-count": n_current_attempts}), ) - webserver = Process(target=start_integrity_checks_webserver, args=("debug",)) + +def main(): + + json_wrapped_body_processor = make_payload_processor(CONFIG.service.analysis_endpoint) + + if CONFIG.rabbitmq.retry.enabled: + retry_callback = make_retry_callback(republish, max_attempts=CONFIG.rabbitmq.retry.max_attempts) + + callback = make_retry_callback_for_output_queue( + json_wrapped_body_processor=json_wrapped_body_processor, + output_queue_name=CONFIG.rabbitmq.queues.output, + retry_callback=retry_callback + ) + else: + callback = make_callback_for_output_queue( + json_wrapped_body_processor=json_wrapped_body_processor, + output_queue_name=CONFIG.rabbitmq.queues.output + ) + + webserver = Process(target=start_integrity_checks_webserver, args=("production",)) logging.info("Starting webserver...") webserver.start() @@ -133,5 +163,7 @@ if __name__ == "__main__": logging_level = CONFIG.service.logging_level logging.basicConfig(level=logging_level) logging.getLogger("pika").setLevel(logging.ERROR) + logging.getLogger("flask").setLevel(logging.ERROR) + logging.getLogger("urllib3").setLevel(logging.ERROR) main()