retry logic added

This commit is contained in:
Matthias Bisping 2022-02-17 14:52:14 +01:00
parent 879b80dc0f
commit 8bdad96c71
4 changed files with 109 additions and 61 deletions

View File

@ -12,6 +12,10 @@ rabbitmq:
prefetch_count: 2
retry: # Controls retry behaviour for messages the processing of which failed
enabled: $RETRY|False # Toggles retry behaviour
max_attempts: $MAX_ATTEMPTS|3 # Number of times a message may fail before being published to dead letter queue
minio:
host: $STORAGE_ENDPOINT|localhost # MinIO host address
port: $STORAGE_PORT|9000 # MinIO host port

View File

@ -1,51 +1,62 @@
import json
from flask import jsonify
import logging
from time import sleep
from pyinfra.rabbitmq import make_connection, make_channel, declare_queue
def make_retry_callback(republish, max_attempts):
# def make_request_processor(consumer):
# def get_n_previous_attempts(props):
# return 0 if props.headers is None else props.headers.get("x-retry-count", 0)
#
# def attempts_remain(n_attempts):
# return n_attempts < max_attempts and CONFIG.rabbitmq.retry.enabled
#
# def republish(ch, body, n_current_attempts):
# ch.basic_publish(
# exchange="",
# routing_key=CONFIG.rabbitmq.queues.input,
# body=body,
# properties=pika.BasicProperties(headers={"x-retry-count": n_current_attempts}),
# )
#
# def on_request(ch, method, props, body):
#
# try:
# response = consumer(body)
# ch.basic_publish(exchange="", routing_key=CONFIG.rabbitmq.queues.output, body=response)
# ch.basic_ack(delivery_tag=method.delivery_tag)
#
# except Exception as e:
#
# n_attempts = get_n_previous_attempts(props) + 1
#
# logger.error(f"Message failed to process {n_attempts}/{max_attempts} times. Error: {e}")
# if n_attempts == max_attempts:
# logger.exception(f"Adding to dead letter queue. Last exception: {e}")
#
# if attempts_remain(n_attempts):
# republish(ch, body, n_attempts)
# ch.basic_ack(delivery_tag=method.delivery_tag)
#
# else:
# ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
#
# max_attempts = CONFIG.rabbitmq.retry.max_attempts
#
# return on_request
def get_n_previous_attempts(props):
return 0 if props.headers is None else props.headers.get("x-retry-count", 0)
def attempts_remain(n_attempts):
return n_attempts < max_attempts
def callback(channel, method, properties, body):
n_attempts = get_n_previous_attempts(properties) + 1
logging.error(f"Message failed to process {n_attempts}/{max_attempts} times: {body}")
if attempts_remain(n_attempts):
republish(channel, body, n_attempts)
channel.basic_ack(delivery_tag=method.delivery_tag)
else:
logging.exception(f"Adding to dead letter queue: {body}")
channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
return callback
class AnalysisFailure(Exception):
pass
class DataLoadingFailure(Exception):
pass
class ProcessingFailure(Exception):
pass
def wrap_callback_in_retry_logic(callback, retry_callback):
def wrapped_callback(channel, method, properties, body):
try:
callback(channel, method, properties, body)
except (AnalysisFailure, DataLoadingFailure):
sleep(5)
retry_callback(channel, method, properties, body)
return wrapped_callback
def json_wrap(body_processor):
def inner(payload):
return json.dumps(body_processor(json.loads(payload)))
return inner
def make_callback_for_output_queue(json_wrapped_body_processor, output_queue_name):
@ -63,9 +74,8 @@ def make_callback_for_output_queue(json_wrapped_body_processor, output_queue_nam
return callback
def processor_to_callback(body_processor, output_queue_name):
def wrap_body_processor_on_json_decode_encode_operations(payload):
return json.dumps(body_processor(json.loads(payload)))
def make_retry_callback_for_output_queue(json_wrapped_body_processor, output_queue_name, retry_callback):
callback = make_callback_for_output_queue(json_wrapped_body_processor, output_queue_name)
callback = wrap_callback_in_retry_logic(callback, retry_callback)
callback = make_callback_for_output_queue(wrap_body_processor_on_json_decode_encode_operations, output_queue_name)
return callback

View File

@ -4,6 +4,7 @@ from typing import Callable
import pika
from retry import retry
from pyinfra.callback import ProcessingFailure
from pyinfra.rabbitmq import make_connection, make_channel, declare_queue
@ -11,7 +12,7 @@ class ConsumerError(Exception):
pass
# @retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
@retry(ConsumerError, tries=3, delay=5, jitter=(1, 3))
def consume(queue_name: str, on_message_callback: Callable):
connection = make_connection()
@ -38,4 +39,5 @@ def consume(queue_name: str, on_message_callback: Callable):
logging.debug(err)
continue
raise ConsumerError(f"Error while consuming {queue_name}.")
except ProcessingFailure as err:
raise ConsumerError(f"Error while consuming {queue_name}.") from err

View File

@ -4,11 +4,13 @@ import traceback
from multiprocessing import Process
from operator import itemgetter
import pika
import requests
from flask import Flask, jsonify
from waitress import serve
from pyinfra.callback import processor_to_callback
from pyinfra.callback import make_retry_callback_for_output_queue, make_retry_callback, json_wrap, \
make_callback_for_output_queue, AnalysisFailure, DataLoadingFailure, ProcessingFailure
from pyinfra.config import CONFIG
from pyinfra.consume import consume, ConsumerError
from pyinfra.storage.azure_blob_storage import AzureBlobStorageHandle
@ -33,7 +35,11 @@ def make_storage_data_loader(storage):
return gzip.decompress(data)
def load_data(payload):
return decompress(download(payload))
try:
return decompress(download(payload))
except Exception as err:
logging.warning(f"Loading data from storage failed for {payload}")
raise DataLoadingFailure() from err
return load_data
@ -47,9 +53,7 @@ def make_analyzer(analysis_endpoint):
return analysis_response
except Exception as err:
logging.warning("Exception caught when calling analysis endpoint.")
logging.warning(err)
logging.exception(traceback.format_exc())
raise err
raise AnalysisFailure() from err
return analyze
@ -59,11 +63,16 @@ def make_payload_processor(analysis_endpoint):
load_data = make_storage_data_loader(get_storage())
analyze_file = make_analyzer(analysis_endpoint)
@json_wrap
def process(payload: dict):
logging.info(f"Processing {payload}...")
data = load_data(payload)
predictions = analyze_file(data)
return predictions
try:
data = load_data(payload)
predictions = analyze_file(data)
return predictions
except (DataLoadingFailure, AnalysisFailure) as err:
logging.warning(f"Processing of {payload} failed.")
raise ProcessingFailure() from err
return process
@ -108,13 +117,34 @@ def get_storage():
return storage
def main():
callback = processor_to_callback(
make_payload_processor(CONFIG.service.analysis_endpoint), output_queue_name=CONFIG.rabbitmq.queues.output
def republish(ch, body, n_current_attempts):
ch.basic_publish(
exchange="",
routing_key=CONFIG.rabbitmq.queues.input,
body=body,
properties=pika.BasicProperties(headers={"x-retry-count": n_current_attempts}),
)
webserver = Process(target=start_integrity_checks_webserver, args=("debug",))
def main():
json_wrapped_body_processor = make_payload_processor(CONFIG.service.analysis_endpoint)
if CONFIG.rabbitmq.retry.enabled:
retry_callback = make_retry_callback(republish, max_attempts=CONFIG.rabbitmq.retry.max_attempts)
callback = make_retry_callback_for_output_queue(
json_wrapped_body_processor=json_wrapped_body_processor,
output_queue_name=CONFIG.rabbitmq.queues.output,
retry_callback=retry_callback
)
else:
callback = make_callback_for_output_queue(
json_wrapped_body_processor=json_wrapped_body_processor,
output_queue_name=CONFIG.rabbitmq.queues.output
)
webserver = Process(target=start_integrity_checks_webserver, args=("production",))
logging.info("Starting webserver...")
webserver.start()
@ -133,5 +163,7 @@ if __name__ == "__main__":
logging_level = CONFIG.service.logging_level
logging.basicConfig(level=logging_level)
logging.getLogger("pika").setLevel(logging.ERROR)
logging.getLogger("flask").setLevel(logging.ERROR)
logging.getLogger("urllib3").setLevel(logging.ERROR)
main()