pyinfra/scripts/mock_client.py
Kresnadi Budisantoso 1363030b95 Pull request #27: Add parser for bucket/container env vars
Merge in RR/pyinfra from kbudisantoso/configyaml-1650538128334 to master

Squashed commit of the following:

commit 6103b7720315aaef3d98aea8f3c817477bbf500b
Merge: 69ac65a 3b91185
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Thu Apr 21 14:45:10 2022 +0200

    Merge remote-tracking branch 'origin' into kbudisantoso/configyaml-1650538128334

commit 69ac65ae1bd4095c797112c6f9530f0b1705277e
Merge: 9a1cd07 a00ceae
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Thu Apr 21 14:37:34 2022 +0200

    Merge remote-tracking branch 'origin' into kbudisantoso/configyaml-1650538128334

commit 9a1cd07c09e5ee2618f2c1a3c27b69c67b1eaeb0
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Thu Apr 21 14:35:49 2022 +0200

    test done

commit e7127e8af937fe067f1f92eb688187ebbe609478
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Thu Apr 21 14:32:25 2022 +0200

    test

commit 262957e33d19dbafb3f10b5a32c438460b966a88
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Thu Apr 21 14:16:33 2022 +0200

    add parser for env var storage_buckets/containers

commit 3535002b4aac9f297bdbe112b04f537cef25f5c2
Author: Kresnadi Budisantoso <kresnadi.budisantoso@iqser.com>
Date:   Thu Apr 21 12:48:52 2022 +0200

    config.yaml online editiert mit Bitbucket
2022-04-21 14:55:01 +02:00

84 lines
2.8 KiB
Python

import argparse
import json
import pika
from pyinfra.config import CONFIG, parse_disjunction_string
from pyinfra.storage.storages import get_s3_storage
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--analysis_container", "-a", choices=["detr", "ner", "image"], required=True)
args = parser.parse_args()
return args
def read_connection_params():
credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password)
parameters = pika.ConnectionParameters(
host=CONFIG.rabbitmq.host,
port=CONFIG.rabbitmq.port,
heartbeat=CONFIG.rabbitmq.heartbeat,
credentials=credentials,
)
return parameters
def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel:
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
return channel
def declare_queue(channel, queue: str):
args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": CONFIG.rabbitmq.queues.dead_letter}
return channel.queue_declare(queue=queue, auto_delete=False, durable=True, arguments=args)
def make_connection() -> pika.BlockingConnection:
parameters = read_connection_params()
connection = pika.BlockingConnection(parameters)
return connection
def build_message_bodies(analyse_container_type):
def update_message(message_dict):
if analyse_container_type == "detr" or analyse_container_type == "image":
message_dict.update({"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "IMAGE_INFO.json.gz"})
if analyse_container_type == "ner":
message_dict.update(
{"targetFileExtension": "TEXT.json.gz", "responseFileExtension": "NER_ENTITIES.json.gz"}
)
return message_dict
storage = get_s3_storage()
for bucket_name, pdf_name in storage.get_all_object_names(parse_disjunction_string(CONFIG.storage.bucket)):
if "pdf" not in pdf_name:
continue
file_id = pdf_name.split(".")[0]
dossier_id, file_id = file_id.split("/")
message_dict = {"dossierId": dossier_id, "fileId": file_id}
update_message(message_dict)
yield json.dumps(message_dict).encode()
def main(args):
connection = make_connection()
channel = make_channel(connection)
declare_queue(channel, CONFIG.rabbitmq.queues.input)
declare_queue(channel, CONFIG.rabbitmq.queues.output)
for body in build_message_bodies(args.analysis_container):
channel.basic_publish("", CONFIG.rabbitmq.queues.input, body)
print(f"Put {body} on {CONFIG.rabbitmq.queues.input}")
for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output):
print(f"Received {json.loads(body)}")
channel.basic_ack(method_frame.delivery_tag)
channel.close()
if __name__ == "__main__":
main(parse_args())