pyinfra/scripts/mock_client.py
Julius Unverfehrt 6d31cbe635 Pull request #37: RED-4124 PyInfra now tries three times to download an unobtainable object before republishing message, removed unreadable stack trace print if message is put on dead letter queue since allready logged where the exeption is raised.
Merge in RR/pyinfra from RED-4124-s3-retry to master

Squashed commit of the following:

commit 40e4dc3712fc692115b8274226430a431bf0192f
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Tue May 31 12:59:45 2022 +0200

    RED-4124 PyInfra now tries three times to download an unobtainable object before republishing message, removed unreadable stack trace print if message is put on dead letter queue since allready logged where the exeption is raised.
2022-05-31 13:13:18 +02:00

89 lines
3.1 KiB
Python

import argparse
import json
import pika
from pyinfra.config import CONFIG, parse_disjunction_string
from pyinfra.storage.storages import get_s3_storage
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--bucket_name", "-b", required=True)
parser.add_argument("--analysis_container", "-a", choices=["detr", "ner", "image", "dl_error"], required=True)
args = parser.parse_args()
return args
def read_connection_params():
credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password)
parameters = pika.ConnectionParameters(
host=CONFIG.rabbitmq.host,
port=CONFIG.rabbitmq.port,
heartbeat=CONFIG.rabbitmq.heartbeat,
credentials=credentials,
)
return parameters
def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel:
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
return channel
def declare_queue(channel, queue: str):
args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": CONFIG.rabbitmq.queues.dead_letter}
return channel.queue_declare(queue=queue, auto_delete=False, durable=True, arguments=args)
def make_connection() -> pika.BlockingConnection:
parameters = read_connection_params()
connection = pika.BlockingConnection(parameters)
return connection
def build_message_bodies(analyse_container_type, bucket_name):
def update_message(message_dict):
if analyse_container_type == "detr" or analyse_container_type == "image":
message_dict.update({"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "IMAGE_INFO.json.gz"})
if analyse_container_type == "dl_error":
message_dict.update({"targetFileExtension": "no_such_file", "responseFileExtension": "IMAGE_INFO.json.gz"})
if analyse_container_type == "ner":
message_dict.update(
{"targetFileExtension": "TEXT.json.gz", "responseFileExtension": "NER_ENTITIES.json.gz"}
)
return message_dict
storage = get_s3_storage()
for bucket_name, pdf_name in storage.get_all_object_names(bucket_name):
if "pdf" not in pdf_name:
continue
file_id = pdf_name.split(".")[0]
dossier_id, file_id = file_id.split("/")
message_dict = {"dossierId": dossier_id, "fileId": file_id}
update_message(message_dict)
yield json.dumps(message_dict).encode()
def main(args):
connection = make_connection()
channel = make_channel(connection)
declare_queue(channel, CONFIG.rabbitmq.queues.input)
declare_queue(channel, CONFIG.rabbitmq.queues.output)
for body in build_message_bodies(args.analysis_container, args.bucket_name):
channel.basic_publish("", CONFIG.rabbitmq.queues.input, body)
print(f"Put {body} on {CONFIG.rabbitmq.queues.input}")
for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output, inactivity_timeout=1):
if not body:
break
print(f"Received {json.loads(body)}")
channel.basic_ack(method_frame.delivery_tag)
channel.close()
if __name__ == "__main__":
main(parse_args())