Merge in RR/pyinfra from change-queue-durability to master
Squashed commit of the following:
commit b01e5e05e27776cbb629ba3c2e757a64934ace30
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Thu Mar 17 12:50:40 2022 +0100
refactor
commit 549b89952e626ef7b1029c97ec7fb527112cc2b2
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Thu Mar 17 12:47:13 2022 +0100
black
commit 6a36f56a3e6308f4954a28c1bf4797a838986371
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Thu Mar 17 12:46:51 2022 +0100
adjust mock scriptis,add different file format option for IO
commit 3b50f88482d74ea39240cdd41cc7dd62076c87b7
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Thu Mar 17 12:45:49 2022 +0100
change request_queue to durable=True
80 lines
2.7 KiB
Python
80 lines
2.7 KiB
Python
import argparse
|
|
import json
|
|
|
|
import pika
|
|
|
|
from pyinfra.config import CONFIG
|
|
from pyinfra.storage.storages import get_s3_storage
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--analysis_container", "-a", choices=["detr", "ner", "image"], required=True)
|
|
args = parser.parse_args()
|
|
return args
|
|
|
|
|
|
def read_connection_params():
|
|
credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password)
|
|
parameters = pika.ConnectionParameters(
|
|
host=CONFIG.rabbitmq.host,
|
|
port=CONFIG.rabbitmq.port,
|
|
heartbeat=CONFIG.rabbitmq.heartbeat,
|
|
credentials=credentials,
|
|
)
|
|
return parameters
|
|
|
|
|
|
def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel:
|
|
channel = connection.channel()
|
|
channel.basic_qos(prefetch_count=1)
|
|
return channel
|
|
|
|
|
|
def declare_queue(channel, queue: str):
|
|
args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": CONFIG.rabbitmq.queues.dead_letter}
|
|
return channel.queue_declare(queue=queue, auto_delete=False, durable=True, arguments=args)
|
|
|
|
|
|
def make_connection() -> pika.BlockingConnection:
|
|
parameters = read_connection_params()
|
|
connection = pika.BlockingConnection(parameters)
|
|
return connection
|
|
|
|
|
|
def build_message_bodies(analyse_container_type):
|
|
def update_message(message_dict):
|
|
if analyse_container_type == "detr" or analyse_container_type == "image":
|
|
message_dict.update({"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "IMAGE_INFO.json.gz"})
|
|
if analyse_container_type == "ner":
|
|
message_dict.update(
|
|
{"targetFileExtension": "TEXT.json.gz", "responseFileExtension": "NER_ENTITIES.json.gz"})
|
|
return message_dict
|
|
|
|
storage = get_s3_storage()
|
|
for bucket_name, pdf_name in storage.get_all_object_names(CONFIG.storage.bucket):
|
|
file_id = pdf_name.split(".")[0]
|
|
dossier_id, file_id = file_id.split("/")
|
|
message_dict = {"dossierId": dossier_id, "fileId": file_id}
|
|
update_message(message_dict)
|
|
yield json.dumps(message_dict).encode()
|
|
|
|
|
|
def main(args):
|
|
connection = make_connection()
|
|
channel = make_channel(connection)
|
|
declare_queue(channel, CONFIG.rabbitmq.queues.input)
|
|
declare_queue(channel, CONFIG.rabbitmq.queues.output)
|
|
|
|
for body in build_message_bodies(args.analysis_container):
|
|
channel.basic_publish("", CONFIG.rabbitmq.queues.input, body)
|
|
print(f"Put {body} on {CONFIG.rabbitmq.queues.input}")
|
|
|
|
for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output):
|
|
print(f"Received {json.loads(body)}")
|
|
channel.basic_ack(method_frame.delivery_tag)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main(parse_args())
|