Merge in RR/pyinfra from RED-4124-s3-retry to master
Squashed commit of the following:
commit 40e4dc3712fc692115b8274226430a431bf0192f
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Tue May 31 12:59:45 2022 +0200
RED-4124 PyInfra now tries three times to download an unobtainable object before republishing message, removed unreadable stack trace print if message is put on dead letter queue since allready logged where the exeption is raised.
89 lines
3.1 KiB
Python
89 lines
3.1 KiB
Python
import argparse
|
|
import json
|
|
|
|
import pika
|
|
|
|
from pyinfra.config import CONFIG, parse_disjunction_string
|
|
from pyinfra.storage.storages import get_s3_storage
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--bucket_name", "-b", required=True)
|
|
parser.add_argument("--analysis_container", "-a", choices=["detr", "ner", "image", "dl_error"], required=True)
|
|
args = parser.parse_args()
|
|
return args
|
|
|
|
|
|
def read_connection_params():
|
|
credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password)
|
|
parameters = pika.ConnectionParameters(
|
|
host=CONFIG.rabbitmq.host,
|
|
port=CONFIG.rabbitmq.port,
|
|
heartbeat=CONFIG.rabbitmq.heartbeat,
|
|
credentials=credentials,
|
|
)
|
|
return parameters
|
|
|
|
|
|
def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel:
|
|
channel = connection.channel()
|
|
channel.basic_qos(prefetch_count=1)
|
|
return channel
|
|
|
|
|
|
def declare_queue(channel, queue: str):
|
|
args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": CONFIG.rabbitmq.queues.dead_letter}
|
|
return channel.queue_declare(queue=queue, auto_delete=False, durable=True, arguments=args)
|
|
|
|
|
|
def make_connection() -> pika.BlockingConnection:
|
|
parameters = read_connection_params()
|
|
connection = pika.BlockingConnection(parameters)
|
|
return connection
|
|
|
|
|
|
def build_message_bodies(analyse_container_type, bucket_name):
|
|
def update_message(message_dict):
|
|
if analyse_container_type == "detr" or analyse_container_type == "image":
|
|
message_dict.update({"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "IMAGE_INFO.json.gz"})
|
|
if analyse_container_type == "dl_error":
|
|
message_dict.update({"targetFileExtension": "no_such_file", "responseFileExtension": "IMAGE_INFO.json.gz"})
|
|
if analyse_container_type == "ner":
|
|
message_dict.update(
|
|
{"targetFileExtension": "TEXT.json.gz", "responseFileExtension": "NER_ENTITIES.json.gz"}
|
|
)
|
|
return message_dict
|
|
|
|
storage = get_s3_storage()
|
|
for bucket_name, pdf_name in storage.get_all_object_names(bucket_name):
|
|
if "pdf" not in pdf_name:
|
|
continue
|
|
file_id = pdf_name.split(".")[0]
|
|
dossier_id, file_id = file_id.split("/")
|
|
message_dict = {"dossierId": dossier_id, "fileId": file_id}
|
|
update_message(message_dict)
|
|
yield json.dumps(message_dict).encode()
|
|
|
|
|
|
def main(args):
|
|
connection = make_connection()
|
|
channel = make_channel(connection)
|
|
declare_queue(channel, CONFIG.rabbitmq.queues.input)
|
|
declare_queue(channel, CONFIG.rabbitmq.queues.output)
|
|
|
|
for body in build_message_bodies(args.analysis_container, args.bucket_name):
|
|
channel.basic_publish("", CONFIG.rabbitmq.queues.input, body)
|
|
print(f"Put {body} on {CONFIG.rabbitmq.queues.input}")
|
|
|
|
for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output, inactivity_timeout=1):
|
|
if not body:
|
|
break
|
|
print(f"Received {json.loads(body)}")
|
|
channel.basic_ack(method_frame.delivery_tag)
|
|
channel.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main(parse_args())
|