From 66aaeca92801f9cca89b232f42f117265a34e6d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonathan=20K=C3=B6ssler?= Date: Wed, 24 Jul 2024 17:28:13 +0200 Subject: [PATCH] fix: async queue test --- scripts/send_async_request.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/scripts/send_async_request.py b/scripts/send_async_request.py index 4b44cf4..931b2b8 100644 --- a/scripts/send_async_request.py +++ b/scripts/send_async_request.py @@ -9,8 +9,7 @@ from aio_pika.abc import AbstractIncomingMessage from kn_utils.logging import logger from pyinfra.config.loader import load_settings, local_pyinfra_root_path -from pyinfra.examples import get_rabbitmq_config -from pyinfra.queue.async_manager import AsyncQueueManager +from pyinfra.queue.async_manager import AsyncQueueManager, RabbitMQConfig from pyinfra.storage.storages.s3 import S3Storage, get_s3_storage_from_settings settings = load_settings(local_pyinfra_root_path / "config/") @@ -89,7 +88,21 @@ def upload_json_and_make_message_body(tenant_id: str): async def test_rabbitmq_handler() -> None: tenant_service_url = settings.storage.tenant_server.endpoint - config = get_rabbitmq_config(settings) + config = RabbitMQConfig( + host=settings.rabbitmq.host, + port=settings.rabbitmq.port, + username=settings.rabbitmq.username, + password=settings.rabbitmq.password, + heartbeat=settings.rabbitmq.heartbeat, + input_queue_prefix=settings.rabbitmq.service_request_queue_prefix, + tenant_event_queue_suffix=settings.rabbitmq.tenant_event_queue_suffix, + tenant_exchange_name=settings.rabbitmq.tenant_exchange_name, + service_request_exchange_name=settings.rabbitmq.service_request_exchange_name, + service_response_exchange_name=settings.rabbitmq.service_response_exchange_name, + service_dead_letter_queue_name=settings.rabbitmq.service_dlq_name, + queue_expiration_time=settings.rabbitmq.queue_expiration_time, + pod_name=settings.kubernetes.pod_name, + ) handler = AsyncQueueManager(config, tenant_service_url, dummy_message_processor) @@ -104,7 +117,7 @@ async def test_rabbitmq_handler() -> None: Message(body=json.dumps(create_message).encode()), routing_key="tenant.created" ) logger.info(f"Sent create tenant message for {tenant_id}") - await asyncio.sleep(2) # Wait for queue creation + await asyncio.sleep(0.5) # Wait for queue creation # Prepare service request service_request, storage = upload_json_and_make_message_body(tenant_id) @@ -128,7 +141,7 @@ async def test_rabbitmq_handler() -> None: Message(body=json.dumps(delete_message).encode()), routing_key="tenant.delete" ) logger.info(f"Sent delete tenant message for {tenant_id}") - await asyncio.sleep(2) # Wait for queue deletion + await asyncio.sleep(0.5) # Wait for queue deletion await handler.connection.close()