feat: add backwards compatibility
This commit is contained in:
parent
8844df44ce
commit
8ac16de0fa
@ -6,10 +6,11 @@ from kn_utils.logging import logger
|
||||
|
||||
# from threading import Thread
|
||||
from pyinfra.config.loader import get_pyinfra_validators, validate_settings
|
||||
|
||||
# from pyinfra.queue.threaded_tenants import ServiceQueueManager, TenantQueueManager
|
||||
from pyinfra.queue.async_tenants_v2 import RabbitMQConfig, RabbitMQHandler
|
||||
from pyinfra.queue.callback import Callback
|
||||
|
||||
# from pyinfra.queue.threaded_tenants import ServiceQueueManager, TenantQueueManager
|
||||
from pyinfra.queue.manager import QueueManager
|
||||
from pyinfra.utils.opentelemetry import instrument_app, instrument_pika, setup_trace
|
||||
from pyinfra.webserver.prometheus import (
|
||||
add_prometheus_endpoint,
|
||||
@ -69,25 +70,22 @@ def start_standard_queue_consumer(
|
||||
instrument_pika()
|
||||
instrument_app(app)
|
||||
|
||||
# manager = AsyncQueueManager(settings=settings, message_processor=callback)
|
||||
config = get_rabbitmq_config(settings)
|
||||
manager = RabbitMQHandler(
|
||||
config=config, tenant_service_url=settings.storage.tenant_server.endpoint, message_processor=callback
|
||||
)
|
||||
if settings.multiple_tenants.enabled:
|
||||
config = get_rabbitmq_config(settings)
|
||||
manager = RabbitMQHandler(
|
||||
config=config, tenant_service_url=settings.storage.tenant_server.endpoint, message_processor=callback
|
||||
)
|
||||
else:
|
||||
manager = QueueManager(settings)
|
||||
|
||||
# app = add_health_check_endpoint(app, service_manager.is_ready)
|
||||
app = add_health_check_endpoint(app, manager.is_ready)
|
||||
|
||||
webserver_thread = create_webserver_thread_from_settings(app, settings)
|
||||
webserver_thread.start()
|
||||
|
||||
# thread_t = Thread(target=tenant_manager.start_consuming, daemon=True)
|
||||
# thread_s = Thread(target=service_manager.start_sequential_basic_get, args=(callback,), daemon=True)
|
||||
|
||||
# thread_t.start()
|
||||
# thread_s.start()
|
||||
|
||||
# thread_t.join()
|
||||
# thread_s.join()
|
||||
|
||||
asyncio.run(manager.run())
|
||||
if isinstance(manager, RabbitMQHandler):
|
||||
asyncio.run(manager.run())
|
||||
elif isinstance(manager, QueueManager):
|
||||
manager.start_consuming(callback)
|
||||
else:
|
||||
logger.warning(f"Behavior for type {type(manager)} is not defined")
|
||||
|
||||
@ -68,7 +68,7 @@ class RabbitMQHandler:
|
||||
await self.connect()
|
||||
return self.channel.is_open
|
||||
|
||||
async def setup_exchanges(self):
|
||||
async def setup_exchanges(self) -> None:
|
||||
self.tenant_exchange = await self.channel.declare_exchange(
|
||||
self.config.tenant_exchange_name, ExchangeType.TOPIC, durable=True
|
||||
)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user