pyinfra/scripts/send_request.py
Julius Unverfehrt fff5be2e50 feat(settings): improve config loading logic
Load settings from .toml files, .env and environment variables. Also ensures a ROOT_PATH environment variable is
set. If ROOT_PATH is not set and no root_path argument is passed, the current working directory is used as root.
Settings paths can be a single .toml file, a folder containing .toml files or a list of .toml files and folders.
If a folder is passed, all .toml files in the folder are loaded. If settings path is None, only .env and
environment variables are loaded. If settings_path are relative paths, they are joined with the root_path argument.
2024-01-30 12:56:58 +01:00

68 lines
2.1 KiB
Python

import gzip
import json
from operator import itemgetter
from kn_utils.logging import logger
from pyinfra.config.loader import load_settings, local_pyinfra_root_path
from pyinfra.queue.manager import QueueManager
from pyinfra.storage.storages.s3 import get_s3_storage_from_settings
settings = load_settings(local_pyinfra_root_path / "config/")
def upload_json_and_make_message_body():
dossier_id, file_id, suffix = "dossier", "file", "json.gz"
content = {
"numberOfPages": 7,
"sectionTexts": "data",
}
object_name = f"{dossier_id}/{file_id}.{suffix}"
data = gzip.compress(json.dumps(content).encode("utf-8"))
storage = get_s3_storage_from_settings(settings)
if not storage.has_bucket():
storage.make_bucket()
storage.put_object(object_name, data)
message_body = {
"dossierId": dossier_id,
"fileId": file_id,
"targetFileExtension": suffix,
"responseFileExtension": f"result.{suffix}",
}
return message_body
def main():
queue_manager = QueueManager(settings)
queue_manager.purge_queues()
message = upload_json_and_make_message_body()
queue_manager.publish_message_to_input_queue(message)
logger.info(f"Put {message} on {settings.rabbitmq.input_queue}.")
storage = get_s3_storage_from_settings(settings)
for method_frame, properties, body in queue_manager.channel.consume(
queue=settings.rabbitmq.output_queue, inactivity_timeout=15
):
if not body:
break
response = json.loads(body)
logger.info(f"Received {response}")
logger.info(f"Message headers: {properties.headers}")
queue_manager.channel.basic_ack(method_frame.delivery_tag)
dossier_id, file_id = itemgetter("dossierId", "fileId")(response)
suffix = message["responseFileExtension"]
print(f"{dossier_id}/{file_id}.{suffix}")
result = storage.get_object(f"{dossier_id}/{file_id}.{suffix}")
result = json.loads(gzip.decompress(result))
logger.info(f"Contents of result on storage: {result}")
queue_manager.stop_consuming()
if __name__ == "__main__":
main()