Merge in RR/cv-analysis from new_pyinfra to master
Squashed commit of the following:
commit f7a01a90aad1c402ac537de5bdf15df628ad54df
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Wed Jul 27 10:40:59 2022 +0200
fix typo
commit ff4d549fac5b612c2d391ae85823c5eca1e91916
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Wed Jul 27 10:34:04 2022 +0200
adjust build scripts for new pyinfra
commit ecd70f60d46406d8b6cc7f36a1533d706c917ca8
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Wed Jul 27 09:42:55 2022 +0200
simplify logging by using default configurations
commit 20193c14c940eed2b0a7a72058167e26064119d0
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Tue Jul 26 17:16:57 2022 +0200
tidy-up, refactor config logic to not dependent on external files
commit d8069cd4d404a570bb04a04278161669d1c83332
Author: Isaac Riley <Isaac.Riley@iqser.com>
Date: Tue Jul 26 15:14:59 2022 +0200
update pyinfra
commit c3bc11037cca9baf016043ab997c566f5b4a2586
Author: Isaac Riley <Isaac.Riley@iqser.com>
Date: Tue Jul 26 15:09:14 2022 +0200
repair tests
commit 6f4e4f2863ee16ae056c1d432f663858c5f10221
Author: Isaac Riley <Isaac.Riley@iqser.com>
Date: Tue Jul 26 14:52:38 2022 +0200
updated server logic to work with new pyinfra; update scripts for pyinfra as submodule
commit 2a18dba81de5ee84d0bdf0e77f478693e8d8aef4
Author: Isaac Riley <Isaac.Riley@iqser.com>
Date: Tue Jul 26 14:10:41 2022 +0200
formatting
commit d87ce9328de9aa2341228af9b24473d5e583504e
Author: Isaac Riley <Isaac.Riley@iqser.com>
Date: Tue Jul 26 14:10:11 2022 +0200
make server logic compatible with new pyinfra
85 lines
2.5 KiB
Python
85 lines
2.5 KiB
Python
import argparse
|
|
import json
|
|
|
|
import pika
|
|
|
|
from pyinfra.config import get_config
|
|
from pyinfra.storage.storage import get_s3_storage
|
|
|
|
CONFIG = get_config()
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--operation", "-o", choices=["table", "layout", "figure"], required=True)
|
|
args = parser.parse_args()
|
|
return args
|
|
|
|
|
|
def read_connection_params():
|
|
credentials = pika.PlainCredentials(CONFIG.rabbitmq_username, CONFIG.rabbitmq_password)
|
|
parameters = pika.ConnectionParameters(
|
|
host=CONFIG.rabbitmq_host,
|
|
port=CONFIG.rabbitmq_port,
|
|
heartbeat=int(CONFIG.rabbitmq_heartbeat),
|
|
credentials=credentials,
|
|
)
|
|
return parameters
|
|
|
|
|
|
def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel:
|
|
channel = connection.channel()
|
|
channel.basic_qos(prefetch_count=1)
|
|
return channel
|
|
|
|
|
|
def declare_queue(channel, queue: str):
|
|
args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": CONFIG.dead_letter_queue}
|
|
return channel.queue_declare(queue=queue, auto_delete=False, durable=True, arguments=args)
|
|
|
|
|
|
def make_connection() -> pika.BlockingConnection:
|
|
parameters = read_connection_params()
|
|
connection = pika.BlockingConnection(parameters)
|
|
return connection
|
|
|
|
|
|
def build_message_bodies(operation, bucket_name):
|
|
|
|
storage = get_s3_storage(CONFIG)
|
|
for bucket_name, pdf_name in storage.get_all_object_names(bucket_name):
|
|
if "pdf" not in pdf_name:
|
|
continue
|
|
file_id = pdf_name.split(".")[0]
|
|
dossier_id, file_id = file_id.split("/")
|
|
message_dict = {
|
|
"dossierId": dossier_id,
|
|
"fileId": file_id,
|
|
"targetFileExtension": "ORIGIN.pdf.gz",
|
|
"responseFileExtension": f"{operation.upper()}.json.gz",
|
|
"operation": operation,
|
|
}
|
|
yield json.dumps(message_dict).encode()
|
|
|
|
|
|
def main(args):
|
|
connection = make_connection()
|
|
channel = make_channel(connection)
|
|
declare_queue(channel, CONFIG.request_queue)
|
|
declare_queue(channel, CONFIG.response_queue)
|
|
|
|
for body in build_message_bodies(args.operation, CONFIG.storage_bucket):
|
|
channel.basic_publish("", CONFIG.request_queue, body)
|
|
print(f"Put {body} on {CONFIG.request_queue}")
|
|
|
|
for method_frame, _, body in channel.consume(queue=CONFIG.response_queue, inactivity_timeout=1):
|
|
if not body:
|
|
break
|
|
print(f"Received {json.loads(body)}")
|
|
channel.basic_ack(method_frame.delivery_tag)
|
|
channel.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main(parse_args())
|