import gzip import json import logging from operator import itemgetter import pika from pyinfra.config import get_config from pyinfra.queue.development_queue_manager import DevelopmentQueueManager from pyinfra.storage.storages.s3 import get_s3_storage_from_config CONFIG = get_config() logging.basicConfig() logger = logging.getLogger() logger.setLevel(logging.INFO) def upload_json_and_make_message_body(): bucket = CONFIG.storage_bucket dossier_id, file_id, suffix = "dossier", "file", "json.gz" content = { "numberOfPages": 7, "sectionTexts": "data", } object_name = f"{dossier_id}/{file_id}.{suffix}" data = gzip.compress(json.dumps(content).encode("utf-8")) storage = get_s3_storage_from_config(CONFIG) if not storage.has_bucket(bucket): storage.make_bucket(bucket) storage.put_object(bucket, object_name, data) message_body = { "dossierId": dossier_id, "fileId": file_id, "targetFileExtension": suffix, "responseFileExtension": f"result.{suffix}", } return message_body def main(): development_queue_manager = DevelopmentQueueManager(CONFIG) development_queue_manager.clear_queues() message = upload_json_and_make_message_body() development_queue_manager.publish_request(message, pika.BasicProperties(headers={"X-TENANT-ID": "redaction"})) logger.info(f"Put {message} on {CONFIG.request_queue}") storage = get_s3_storage_from_config(CONFIG) for method_frame, properties, body in development_queue_manager._channel.consume( queue=CONFIG.response_queue, inactivity_timeout=15 ): if not body: break response = json.loads(body) logger.info(f"Received {response}") logger.info(f"Message headers: {properties.headers}") development_queue_manager._channel.basic_ack(method_frame.delivery_tag) dossier_id, file_id = itemgetter("dossierId", "fileId")(response) suffix = message["responseFileExtension"] print(f"{dossier_id}/{file_id}.{suffix}") result = storage.get_object(CONFIG.storage_bucket, f"{dossier_id}/{file_id}.{suffix}") result = json.loads(gzip.decompress(result)) logger.info(f"Contents of result on storage: {result}") development_queue_manager.close_channel() if __name__ == "__main__": main()