From f723bcb9b1a0e0decb3954d95c2feb7fb540ed84 Mon Sep 17 00:00:00 2001 From: "francisco.schulz" Date: Thu, 11 Jul 2024 12:06:59 -0400 Subject: [PATCH] fix(fetch_active_tenants): propper async API call --- scripts/test_async_class.py | 37 ++++++++++++++++--------------------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/scripts/test_async_class.py b/scripts/test_async_class.py index f9eee3e..3ac82e7 100644 --- a/scripts/test_async_class.py +++ b/scripts/test_async_class.py @@ -1,4 +1,5 @@ import asyncio +from typing import Set import aiohttp from aio_pika import connect_robust, ExchangeType, Message from aio_pika.abc import AbstractIncomingMessage @@ -23,6 +24,7 @@ class RabbitMQHandler: self.service_request_exchange_name = "service_request_exchange" # INPUT self.service_response_exchange_name = "service_response_exchange" # OUTPUT self.service_dead_letter_queue_name = "service_dlq" + self.queue_expiration_time = 300000 self.connection = None self.channel = None self.tenant_exchange = None @@ -112,29 +114,22 @@ class RabbitMQHandler: tenant_id = message.routing_key await self.output_exchange.publish(Message(body=processed_content.encode()), routing_key=tenant_id) - # FIXME: coroutine error - async def fetch_active_tenants(self): - async with aiohttp.ClientSession() as session: - async with session.get(self.tenant_service_url) as response: - if response.status == 200 and response.headers["content-type"].lower() == "application/json": - tenants = {await tenant["tenantId"] for tenant in response.json()} - return await tenants - else: - print(f"Failed to fetch active tenants. Status: {response.status}") - return set() - - # TODO: remove after fetch_active_tenants is fixed - def get_initial_tenant_ids(self) -> set: - response = requests.get(self.tenant_service_url, timeout=10) - response.raise_for_status() # Raise an HTTPError for bad responses - - if response.headers["content-type"].lower() == "application/json": - tenants = {tenant["tenantId"] for tenant in response.json()} - return tenants - return set() + async def fetch_active_tenants(self) -> Set[str]: + try: + async with aiohttp.ClientSession() as session: + async with session.get(self.tenant_service_url) as response: + if response.status == 200: + data = await response.json() + return {tenant["tenantId"] for tenant in data} + else: + logger.error(f"Failed to fetch active tenants. Status: {response.status}") + return set() + except aiohttp.ClientError as e: + logger.error(f"Error fetching active tenants: {e}") + return set() async def initialize_tenant_queues(self): - active_tenants = self.get_initial_tenant_ids() + active_tenants = await self.fetch_active_tenants() for tenant_id in active_tenants: await self.create_tenant_queues(tenant_id)