import asyncio import gzip import json from operator import itemgetter from typing import Any, Dict from aio_pika import Message from aio_pika.abc import AbstractIncomingMessage from kn_utils.logging import logger from pyinfra.config.loader import load_settings, local_pyinfra_root_path from pyinfra.queue.async_manager import AsyncQueueManager, RabbitMQConfig from pyinfra.storage.storages.s3 import S3Storage, get_s3_storage_from_settings settings = load_settings(local_pyinfra_root_path / "config/") async def dummy_message_processor(message: Dict[str, Any]) -> Dict[str, Any]: logger.info(f"Processing message: {message}") # await asyncio.sleep(1) # Simulate processing time storage = get_s3_storage_from_settings(settings) tenant_id, dossier_id, file_id = itemgetter("tenantId", "dossierId", "fileId")(message) suffix = message["responseFileExtension"] object_name = f"{tenant_id}/{dossier_id}/{file_id}.{message['targetFileExtension']}" original_content = json.loads(gzip.decompress(storage.get_object(object_name))) processed_content = { "processedPages": original_content["numberOfPages"], "processedSectionTexts": f"Processed: {original_content['sectionTexts']}", } processed_object_name = f"{tenant_id}/{dossier_id}/{file_id}.{suffix}" processed_data = gzip.compress(json.dumps(processed_content).encode("utf-8")) storage.put_object(processed_object_name, processed_data) processed_message = message.copy() processed_message["processed"] = True processed_message["processor_message"] = "This message was processed by the dummy processor" logger.info(f"Finished processing message. Result: {processed_message}") return processed_message async def on_response_message_callback(storage: S3Storage): async def on_message(message: AbstractIncomingMessage) -> None: async with message.process(ignore_processed=True): if not message.body: raise ValueError response = json.loads(message.body) logger.info(f"Received {response}") logger.info(f"Message headers: {message.properties.headers}") await message.ack() tenant_id, dossier_id, file_id = itemgetter("tenantId", "dossierId", "fileId")(response) suffix = response["responseFileExtension"] result = storage.get_object(f"{tenant_id}/{dossier_id}/{file_id}.{suffix}") result = json.loads(gzip.decompress(result)) logger.info(f"Contents of result on storage: {result}") return on_message def upload_json_and_make_message_body(tenant_id: str): dossier_id, file_id, suffix = "dossier", "file", "json.gz" content = { "numberOfPages": 7, "sectionTexts": "data", } object_name = f"{tenant_id}/{dossier_id}/{file_id}.{suffix}" data = gzip.compress(json.dumps(content).encode("utf-8")) storage = get_s3_storage_from_settings(settings) if not storage.has_bucket(): storage.make_bucket() storage.put_object(object_name, data) message_body = { "tenantId": tenant_id, "dossierId": dossier_id, "fileId": file_id, "targetFileExtension": suffix, "responseFileExtension": f"result.{suffix}", } return message_body, storage async def test_rabbitmq_handler() -> None: tenant_service_url = settings.storage.tenant_server.endpoint config = RabbitMQConfig( host=settings.rabbitmq.host, port=settings.rabbitmq.port, username=settings.rabbitmq.username, password=settings.rabbitmq.password, heartbeat=settings.rabbitmq.heartbeat, input_queue_prefix=settings.rabbitmq.service_request_queue_prefix, tenant_event_queue_suffix=settings.rabbitmq.tenant_event_queue_suffix, tenant_exchange_name=settings.rabbitmq.tenant_exchange_name, service_request_exchange_name=settings.rabbitmq.service_request_exchange_name, service_response_exchange_name=settings.rabbitmq.service_response_exchange_name, service_dead_letter_queue_name=settings.rabbitmq.service_dlq_name, queue_expiration_time=settings.rabbitmq.queue_expiration_time, pod_name=settings.kubernetes.pod_name, ) handler = AsyncQueueManager(config, tenant_service_url, dummy_message_processor) await handler.connect() await handler.setup_exchanges() tenant_id = "test_tenant" # Test tenant creation create_message = {"tenantId": tenant_id} await handler.tenant_exchange.publish( Message(body=json.dumps(create_message).encode()), routing_key="tenant.created" ) logger.info(f"Sent create tenant message for {tenant_id}") await asyncio.sleep(0.5) # Wait for queue creation # Prepare service request service_request, storage = upload_json_and_make_message_body(tenant_id) # Test service request await handler.input_exchange.publish(Message(body=json.dumps(service_request).encode()), routing_key=tenant_id) logger.info(f"Sent service request for {tenant_id}") await asyncio.sleep(5) # Wait for message processing # Consume service request response_queue = await handler.channel.declare_queue(name=f"response_queue_{tenant_id}") await response_queue.bind(exchange=handler.output_exchange, routing_key=tenant_id) callback = await on_response_message_callback(storage) await response_queue.consume(callback=callback) await asyncio.sleep(5) # Wait for message processing # Test tenant deletion delete_message = {"tenantId": tenant_id} await handler.tenant_exchange.publish( Message(body=json.dumps(delete_message).encode()), routing_key="tenant.delete" ) logger.info(f"Sent delete tenant message for {tenant_id}") await asyncio.sleep(0.5) # Wait for queue deletion await handler.connection.close() if __name__ == "__main__": asyncio.run(test_rabbitmq_handler())