diff --git a/pyinfra/examples.py b/pyinfra/examples.py index b5c4355..199a9cf 100644 --- a/pyinfra/examples.py +++ b/pyinfra/examples.py @@ -6,10 +6,11 @@ from kn_utils.logging import logger # from threading import Thread from pyinfra.config.loader import get_pyinfra_validators, validate_settings - -# from pyinfra.queue.threaded_tenants import ServiceQueueManager, TenantQueueManager from pyinfra.queue.async_tenants_v2 import RabbitMQConfig, RabbitMQHandler from pyinfra.queue.callback import Callback + +# from pyinfra.queue.threaded_tenants import ServiceQueueManager, TenantQueueManager +from pyinfra.queue.manager import QueueManager from pyinfra.utils.opentelemetry import instrument_app, instrument_pika, setup_trace from pyinfra.webserver.prometheus import ( add_prometheus_endpoint, @@ -69,25 +70,22 @@ def start_standard_queue_consumer( instrument_pika() instrument_app(app) - # manager = AsyncQueueManager(settings=settings, message_processor=callback) - config = get_rabbitmq_config(settings) - manager = RabbitMQHandler( - config=config, tenant_service_url=settings.storage.tenant_server.endpoint, message_processor=callback - ) + if settings.multiple_tenants.enabled: + config = get_rabbitmq_config(settings) + manager = RabbitMQHandler( + config=config, tenant_service_url=settings.storage.tenant_server.endpoint, message_processor=callback + ) + else: + manager = QueueManager(settings) - # app = add_health_check_endpoint(app, service_manager.is_ready) app = add_health_check_endpoint(app, manager.is_ready) webserver_thread = create_webserver_thread_from_settings(app, settings) webserver_thread.start() - # thread_t = Thread(target=tenant_manager.start_consuming, daemon=True) - # thread_s = Thread(target=service_manager.start_sequential_basic_get, args=(callback,), daemon=True) - - # thread_t.start() - # thread_s.start() - - # thread_t.join() - # thread_s.join() - - asyncio.run(manager.run()) + if isinstance(manager, RabbitMQHandler): + asyncio.run(manager.run()) + elif isinstance(manager, QueueManager): + manager.start_consuming(callback) + else: + logger.warning(f"Behavior for type {type(manager)} is not defined") diff --git a/pyinfra/queue/async_tenants_v2.py b/pyinfra/queue/async_tenants_v2.py index 6a16543..52f1cff 100644 --- a/pyinfra/queue/async_tenants_v2.py +++ b/pyinfra/queue/async_tenants_v2.py @@ -68,7 +68,7 @@ class RabbitMQHandler: await self.connect() return self.channel.is_open - async def setup_exchanges(self): + async def setup_exchanges(self) -> None: self.tenant_exchange = await self.channel.declare_exchange( self.config.tenant_exchange_name, ExchangeType.TOPIC, durable=True )