import argparse import json import pika from pyinfra.config import CONFIG, parse_disjunction_string from pyinfra.storage.storages import get_s3_storage def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--bucket_name", "-b", required=True) parser.add_argument("--analysis_container", "-a", choices=["detr", "ner", "image"], required=True) args = parser.parse_args() return args def read_connection_params(): credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) parameters = pika.ConnectionParameters( host=CONFIG.rabbitmq.host, port=CONFIG.rabbitmq.port, heartbeat=CONFIG.rabbitmq.heartbeat, credentials=credentials, ) return parameters def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel: channel = connection.channel() channel.basic_qos(prefetch_count=1) return channel def declare_queue(channel, queue: str): args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": CONFIG.rabbitmq.queues.dead_letter} return channel.queue_declare(queue=queue, auto_delete=False, durable=True, arguments=args) def make_connection() -> pika.BlockingConnection: parameters = read_connection_params() connection = pika.BlockingConnection(parameters) return connection def build_message_bodies(analyse_container_type, bucket_name): def update_message(message_dict): if analyse_container_type == "detr" or analyse_container_type == "image": message_dict.update({"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "IMAGE_INFO.json.gz"}) if analyse_container_type == "ner": message_dict.update( {"targetFileExtension": "TEXT.json.gz", "responseFileExtension": "NER_ENTITIES.json.gz"} ) return message_dict storage = get_s3_storage() for bucket_name, pdf_name in storage.get_all_object_names(bucket_name): if "pdf" not in pdf_name: continue file_id = pdf_name.split(".")[0] dossier_id, file_id = file_id.split("/") message_dict = {"dossierId": dossier_id, "fileId": file_id} update_message(message_dict) yield json.dumps(message_dict).encode() def main(args): connection = make_connection() channel = make_channel(connection) declare_queue(channel, CONFIG.rabbitmq.queues.input) declare_queue(channel, CONFIG.rabbitmq.queues.output) for body in build_message_bodies(args.analysis_container, args.bucket_name): channel.basic_publish("", CONFIG.rabbitmq.queues.input, body) print(f"Put {body} on {CONFIG.rabbitmq.queues.input}") for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output): print(f"Received {json.loads(body)}") channel.basic_ack(method_frame.delivery_tag) channel.close() if __name__ == "__main__": main(parse_args())