import asyncio import gzip import json import time from aio_pika.abc import AbstractIncomingMessage from operator import itemgetter from kn_utils.logging import logger from pyinfra.config.loader import load_settings, local_pyinfra_root_path from pyinfra.queue.async_tenants import AsyncQueueManager from pyinfra.storage.storages.s3 import get_s3_storage_from_settings from pyinfra.storage.storages.s3 import S3Storage settings = load_settings(local_pyinfra_root_path / "config/") def upload_json_and_make_message_body(tenant_id: str): dossier_id, file_id, suffix = "dossier", "file", "json.gz" content = { "numberOfPages": 7, "sectionTexts": "data", } object_name = f"{tenant_id}/{dossier_id}/{file_id}.{suffix}" data = gzip.compress(json.dumps(content).encode("utf-8")) storage = get_s3_storage_from_settings(settings) if not storage.has_bucket(): storage.make_bucket() storage.put_object(object_name, data) message_body = { "tenantId": tenant_id, "dossierId": dossier_id, "fileId": file_id, "targetFileExtension": suffix, "responseFileExtension": f"result.{suffix}", } return message_body def tenant_event_message(tenant_id: str): return {"tenantId": tenant_id} def on_message_callback(storage: S3Storage): async def on_message(message: AbstractIncomingMessage) -> None: async with message.process(): if not message.body: raise ValueError response = json.loads(message.body) logger.info(f"Received {response}") logger.info(f"Message headers: {message.properties.headers}") await message.ack() tenant_id, dossier_id, file_id = itemgetter("tenantId", "dossierId", "fileId")(response) suffix = response["responseFileExtension"] result = storage.get_object(f"{tenant_id}/{dossier_id}/{file_id}.{suffix}") result = json.loads(gzip.decompress(result)) logger.info(f"Contents of result on storage: {result}") return on_message async def send_tenant_event(queue_manager: AsyncQueueManager, tenant_id: str, event_type: str): await queue_manager.purge_queues() message = tenant_event_message(tenant_id) if event_type == "create": await queue_manager.publish_message_to_tenant_created_queue(message=message) elif event_type == "delete": await queue_manager.publish_message_to_tenant_deleted_queue(message=message) else: logger.warning(f"Event type '{event_type}' not known.") await queue_manager.stop_consumers() async def send_service_request(queue_manager: AsyncQueueManager, tenant_id: str): request_queue_name = f"{settings.rabbitmq.service_request_queue_prefix}_{tenant_id}" await queue_manager.purge_queues() message = upload_json_and_make_message_body(tenant_id) await queue_manager.publish_message_to_input_queue(tenant_id=tenant_id, message=message) logger.info(f"Put {message} on {request_queue_name}.") storage = get_s3_storage_from_settings(settings) response_queue_name = f"{settings.rabbitmq.service_response_queue_prefix}_{tenant_id}" service_response_queue = await queue_manager.channel.get_queue(name=response_queue_name) time.sleep(10) callback = on_message_callback(storage) await service_response_queue.consume(callback=callback) await queue_manager.stop_consumers() if __name__ == "__main__": # tenant_ids = ["a", "b", "c", "d"] queue_manager = AsyncQueueManager(settings) # asyncio.run(send_tenant_event(queue_manager, "test", "create")) asyncio.run(send_service_request(queue_manager,"redaction"))