Merge in RR/pyinfra from response to master
Squashed commit of the following:
commit a3056c0df73091ee766491331f07e3a0377e7887
Author: cdietrich <clarissa.dietrich@iqser.com>
Date: Thu Mar 3 10:24:54 2022 +0100
refactor save response as file
97 lines
3.2 KiB
Python
97 lines
3.2 KiB
Python
import json
|
|
import logging
|
|
import tempfile
|
|
from time import sleep
|
|
from pyinfra.config import CONFIG
|
|
|
|
from pyinfra.exceptions import AnalysisFailure, DataLoadingFailure
|
|
from pyinfra.rabbitmq import make_connection, make_channel, declare_queue
|
|
from pyinfra.storage.storages import get_storage
|
|
from pyinfra.utils.file import upload_compressed_response
|
|
|
|
|
|
def make_retry_callback(republish, max_attempts):
|
|
def get_n_previous_attempts(props):
|
|
return 0 if props.headers is None else props.headers.get("x-retry-count", 0)
|
|
|
|
def attempts_remain(n_attempts):
|
|
return n_attempts < max_attempts
|
|
|
|
def callback(channel, method, properties, body):
|
|
|
|
n_attempts = get_n_previous_attempts(properties) + 1
|
|
|
|
logging.error(f"Message failed to process {n_attempts}/{max_attempts} times: {body}")
|
|
|
|
if attempts_remain(n_attempts):
|
|
republish(channel, body, n_attempts)
|
|
channel.basic_ack(delivery_tag=method.delivery_tag)
|
|
|
|
else:
|
|
logging.exception(f"Adding to dead letter queue: {body}")
|
|
channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
|
|
|
|
return callback
|
|
|
|
|
|
def wrap_callback_in_retry_logic(callback, retry_callback):
|
|
def wrapped_callback(channel, method, properties, body):
|
|
try:
|
|
callback(channel, method, properties, body)
|
|
except (AnalysisFailure, DataLoadingFailure):
|
|
sleep(5)
|
|
retry_callback(channel, method, properties, body)
|
|
|
|
return wrapped_callback
|
|
|
|
|
|
def json_wrap(body_processor):
|
|
def inner(payload):
|
|
return json.dumps(body_processor(json.loads(payload)))
|
|
|
|
return inner
|
|
|
|
|
|
def make_callback_for_output_queue(json_wrapped_body_processor, output_queue_name):
|
|
|
|
connection = make_connection()
|
|
channel = make_channel(connection)
|
|
declare_queue(channel, output_queue_name)
|
|
|
|
def callback(channel, method, _, body):
|
|
"""
|
|
response is dossier_id, file_id and analysis result, if CONFIG.service.response.type == "file" the response only
|
|
contains file_id and dossier_id and the analysis result will be written in a json file
|
|
Args:
|
|
channel:
|
|
method:
|
|
_:
|
|
body:
|
|
|
|
Returns:
|
|
|
|
"""
|
|
|
|
dossier_id, file_id, result = json_wrapped_body_processor(body)
|
|
|
|
result_key = CONFIG.service.response.key if CONFIG.service.response.key else "result"
|
|
result = json.dumps({"dossierId": dossier_id, "fileId": file_id, result_key: result})
|
|
|
|
if CONFIG.service.response.type == "file":
|
|
upload_compressed_response(
|
|
get_storage(CONFIG.storage.backend), CONFIG.storage.bucket, dossier_id, file_id, result
|
|
)
|
|
result = json.dumps({"dossierId": dossier_id, "fileId": file_id})
|
|
|
|
channel.basic_publish(exchange="", routing_key=output_queue_name, body=result)
|
|
channel.basic_ack(delivery_tag=method.delivery_tag)
|
|
|
|
return callback
|
|
|
|
|
|
def make_retry_callback_for_output_queue(json_wrapped_body_processor, output_queue_name, retry_callback):
|
|
callback = make_callback_for_output_queue(json_wrapped_body_processor, output_queue_name)
|
|
callback = wrap_callback_in_retry_logic(callback, retry_callback)
|
|
|
|
return callback
|