101 lines
3.3 KiB
Python
101 lines
3.3 KiB
Python
import gzip
|
|
import json
|
|
import time
|
|
from operator import itemgetter
|
|
from threading import Thread
|
|
|
|
from kn_utils.logging import logger
|
|
|
|
from pyinfra.config.loader import load_settings, local_pyinfra_root_path
|
|
from pyinfra.queue.threaded_tenants import ServiceQueueManager, TenantQueueManager
|
|
from pyinfra.storage.storages.s3 import get_s3_storage_from_settings
|
|
|
|
settings = load_settings(local_pyinfra_root_path / "config/")
|
|
|
|
|
|
def upload_json_and_make_message_body(tenant_id: str):
|
|
dossier_id, file_id, suffix = "dossier", "file", "json.gz"
|
|
content = {
|
|
"numberOfPages": 7,
|
|
"sectionTexts": "data",
|
|
}
|
|
|
|
object_name = f"{tenant_id}/{dossier_id}/{file_id}.{suffix}"
|
|
data = gzip.compress(json.dumps(content).encode("utf-8"))
|
|
|
|
storage = get_s3_storage_from_settings(settings)
|
|
if not storage.has_bucket():
|
|
storage.make_bucket()
|
|
storage.put_object(object_name, data)
|
|
|
|
message_body = {
|
|
"tenantId": tenant_id,
|
|
"dossierId": dossier_id,
|
|
"fileId": file_id,
|
|
"targetFileExtension": suffix,
|
|
"responseFileExtension": f"result.{suffix}",
|
|
}
|
|
return message_body
|
|
|
|
|
|
def tenant_event_message(tenant_id: str):
|
|
return {"tenantId": tenant_id}
|
|
|
|
|
|
def send_tenant_event(tenant_id: str, event_type: str):
|
|
queue_manager = TenantQueueManager(settings)
|
|
queue_manager.purge_queues()
|
|
message = tenant_event_message(tenant_id)
|
|
if event_type == "create":
|
|
queue_manager.publish_message_to_tenant_created_queue(message=message)
|
|
elif event_type == "delete":
|
|
queue_manager.publish_message_to_tenant_deleted_queue(message=message)
|
|
else:
|
|
logger.warning(f"Event type '{event_type}' not known.")
|
|
queue_manager.stop_consuming()
|
|
|
|
|
|
def send_service_request(tenant_id: str):
|
|
queue_manager = ServiceQueueManager(settings)
|
|
queue_name = f"service_response_queue_{tenant_id}"
|
|
|
|
queue_manager.purge_queues()
|
|
|
|
message = upload_json_and_make_message_body(tenant_id)
|
|
|
|
queue_manager.publish_message_to_input_queue(tenant_id=tenant_id, message=message)
|
|
logger.info(f"Put {message} on {queue_name}.")
|
|
|
|
storage = get_s3_storage_from_settings(settings)
|
|
|
|
for method_frame, properties, body in queue_manager.channel.consume(queue=queue_name, inactivity_timeout=15):
|
|
if not body:
|
|
break
|
|
response = json.loads(body)
|
|
logger.info(f"Received {response}")
|
|
logger.info(f"Message headers: {properties.headers}")
|
|
queue_manager.channel.basic_ack(method_frame.delivery_tag)
|
|
tenant_id, dossier_id, file_id = itemgetter("tenantId", "dossierId", "fileId")(response)
|
|
suffix = message["responseFileExtension"]
|
|
print(f"{tenant_id}/{dossier_id}/{file_id}.{suffix}")
|
|
result = storage.get_object(f"{tenant_id}/{dossier_id}/{file_id}.{suffix}")
|
|
result = json.loads(gzip.decompress(result))
|
|
logger.info(f"Contents of result on storage: {result}")
|
|
break
|
|
queue_manager.stop_consuming()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uuid
|
|
|
|
unique_ids = [str(uuid.uuid4()) for _ in range(100)]
|
|
|
|
for tenant in unique_ids:
|
|
send_tenant_event(tenant_id=tenant, event_type="create")
|
|
|
|
# for tenant in tenant_ids:
|
|
# send_service_request(tenant_id=tenant)
|
|
|
|
# for tenant in tenant_ids:
|
|
# send_tenant_event(tenant_id=tenant, event_type="delete")
|