import argparse import json import pika from pyinfra.config import get_config from pyinfra.storage.storage import get_s3_storage CONFIG = get_config() def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--operation", "-o", choices=["table", "layout", "figure", "image"], required=True) args = parser.parse_args() return args def read_connection_params(): credentials = pika.PlainCredentials(CONFIG.rabbitmq_username, CONFIG.rabbitmq_password) parameters = pika.ConnectionParameters( host=CONFIG.rabbitmq_host, port=CONFIG.rabbitmq_port, heartbeat=int(CONFIG.rabbitmq_heartbeat), credentials=credentials, ) return parameters def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel: channel = connection.channel() channel.basic_qos(prefetch_count=1) return channel def declare_queue(channel, queue: str): args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": CONFIG.dead_letter_queue} return channel.queue_declare(queue=queue, auto_delete=False, durable=True, arguments=args) def make_connection() -> pika.BlockingConnection: parameters = read_connection_params() connection = pika.BlockingConnection(parameters) return connection def build_message_bodies(operation, bucket_name): storage = get_s3_storage(CONFIG) for bucket_name, pdf_name in storage.get_all_object_names(bucket_name): if "pdf" not in pdf_name: continue file_id = pdf_name.split(".")[0] dossier_id, file_id = file_id.split("/") message_dict = { "dossierId": dossier_id, "fileId": file_id, "targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": f"{operation.upper()}.json.gz", } if operation != "image": message_dict.update({"operation": operation}) yield json.dumps(message_dict).encode() def main(args): connection = make_connection() channel = make_channel(connection) declare_queue(channel, CONFIG.request_queue) declare_queue(channel, CONFIG.response_queue) for body in build_message_bodies(args.operation, CONFIG.storage_bucket): channel.basic_publish("", CONFIG.request_queue, body) print(f"Put {body} on {CONFIG.request_queue}") for method_frame, _, body in channel.consume(queue=CONFIG.response_queue, inactivity_timeout=1): if not body: break print(f"Received {json.loads(body)}") channel.basic_ack(method_frame.delivery_tag) channel.close() if __name__ == "__main__": main(parse_args())