- forward x-tenant-id from queue message header to payload processor - add functions to receive storage infos from an endpoint or the config. This enables hashing and caching of connections created from these infos - add function to initialize storage connections from storage infos - streamline and refactor tests to make them more readable and robust and to make it easier to add new tests - update payload processor with first iteration of multi tenancy storage connection support with connection caching and backwards compability
73 lines
2.3 KiB
Python
73 lines
2.3 KiB
Python
import gzip
|
|
import json
|
|
import logging
|
|
from operator import itemgetter
|
|
|
|
import pika
|
|
|
|
from pyinfra.config import get_config
|
|
from pyinfra.queue.development_queue_manager import DevelopmentQueueManager
|
|
from pyinfra.storage.storages.s3 import get_s3_storage_from_config
|
|
|
|
CONFIG = get_config()
|
|
logging.basicConfig()
|
|
logger = logging.getLogger()
|
|
logger.setLevel(logging.INFO)
|
|
|
|
|
|
def upload_json_and_make_message_body():
|
|
bucket = CONFIG.storage_bucket
|
|
dossier_id, file_id, suffix = "dossier", "file", "json.gz"
|
|
content = {
|
|
"numberOfPages": 7,
|
|
"sectionTexts": "data",
|
|
}
|
|
|
|
object_name = f"{dossier_id}/{file_id}.{suffix}"
|
|
data = gzip.compress(json.dumps(content).encode("utf-8"))
|
|
|
|
storage = get_s3_storage_from_config(CONFIG)
|
|
if not storage.has_bucket(bucket):
|
|
storage.make_bucket(bucket)
|
|
storage.put_object(bucket, object_name, data)
|
|
|
|
message_body = {
|
|
"dossierId": dossier_id,
|
|
"fileId": file_id,
|
|
"targetFileExtension": suffix,
|
|
"responseFileExtension": f"result.{suffix}",
|
|
}
|
|
return message_body
|
|
|
|
|
|
def main():
|
|
development_queue_manager = DevelopmentQueueManager(CONFIG)
|
|
development_queue_manager.clear_queues()
|
|
|
|
message = upload_json_and_make_message_body()
|
|
|
|
development_queue_manager.publish_request(message, pika.BasicProperties(headers={"X-TENANT-ID": "redaction"}))
|
|
logger.info(f"Put {message} on {CONFIG.request_queue}")
|
|
|
|
storage = get_s3_storage_from_config(CONFIG)
|
|
for method_frame, properties, body in development_queue_manager._channel.consume(
|
|
queue=CONFIG.response_queue, inactivity_timeout=15
|
|
):
|
|
if not body:
|
|
break
|
|
response = json.loads(body)
|
|
logger.info(f"Received {response}")
|
|
logger.info(f"Message headers: {properties.headers}")
|
|
development_queue_manager._channel.basic_ack(method_frame.delivery_tag)
|
|
dossier_id, file_id = itemgetter("dossierId", "fileId")(response)
|
|
suffix = message["responseFileExtension"]
|
|
print(f"{dossier_id}/{file_id}.{suffix}")
|
|
result = storage.get_object(CONFIG.storage_bucket, f"{dossier_id}/{file_id}.{suffix}")
|
|
result = json.loads(gzip.decompress(result))
|
|
logger.info(f"Contents of result on storage: {result}")
|
|
development_queue_manager.close_channel()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|