feat: wip for multiple tenants - for pkg build
This commit is contained in:
parent
9b20a67ace
commit
2da4f37620
@ -250,7 +250,6 @@ class ServiceQueueManager(BaseQueueManager):
|
||||
self.service_response_exchange_name = settings.rabbitmq.service_response_exchange_name
|
||||
|
||||
self.service_request_queue_prefix = settings.rabbitmq.service_request_queue_prefix
|
||||
self.service_response_queue_prefix = settings.rabbitmq.service_response_queue_prefix
|
||||
|
||||
self.service_dlq_name = settings.rabbitmq.service_dlq_name
|
||||
|
||||
@ -276,18 +275,6 @@ class ServiceQueueManager(BaseQueueManager):
|
||||
queue=request_queue_name, exchange=self.service_request_exchange_name, routing_key=tenant_id
|
||||
)
|
||||
|
||||
response_queue_name = f"{self.service_response_queue_prefix}_{tenant_id}"
|
||||
self.channel.queue_declare(
|
||||
queue=response_queue_name,
|
||||
durable=True,
|
||||
arguments={
|
||||
"x-dead-letter-exchange": "",
|
||||
"x-dead-letter-routing-key": self.service_dlq_name,
|
||||
"x-expires": self.queue_expiration_time, # TODO: check if necessary
|
||||
},
|
||||
)
|
||||
self.channel.queue_bind(queue=response_queue_name, exchange=self.service_response_exchange_name, routing_key=tenant_id)
|
||||
|
||||
@retry(tries=3, delay=5, jitter=(1, 3), logger=logger, exceptions=(requests.exceptions.HTTPError, requests.exceptions.ConnectionError))
|
||||
def get_initial_tenant_ids(self, tenant_endpoint_url: str) -> list:
|
||||
response = requests.get(tenant_endpoint_url, timeout=10)
|
||||
@ -360,9 +347,7 @@ class ServiceQueueManager(BaseQueueManager):
|
||||
try:
|
||||
for tenant_id in self.tenant_ids:
|
||||
request_queue_name = f"{self.service_request_queue_prefix}_{tenant_id}"
|
||||
response_queue_name = f"{self.service_response_queue_prefix}_{tenant_id}"
|
||||
self.channel.queue_purge(request_queue_name)
|
||||
self.channel.queue_purge(response_queue_name)
|
||||
logger.info("Queues purged.")
|
||||
except pika.exceptions.ChannelWrongStateError:
|
||||
pass
|
||||
@ -380,18 +365,6 @@ class ServiceQueueManager(BaseQueueManager):
|
||||
)
|
||||
self.channel.queue_bind(queue=request_queue_name, exchange=self.service_request_exchange_name, routing_key=tenant_id)
|
||||
|
||||
response_queue_name = f"{self.service_response_queue_prefix}_{tenant_id}"
|
||||
self.channel.queue_declare(
|
||||
queue=response_queue_name,
|
||||
durable=True,
|
||||
arguments={
|
||||
"x-dead-letter-exchange": "",
|
||||
"x-dead-letter-routing-key": self.service_dlq_name,
|
||||
"x-expires": self.queue_expiration_time, # TODO: check if necessary
|
||||
},
|
||||
)
|
||||
self.channel.queue_bind(queue=response_queue_name, exchange=self.service_response_exchange_name, routing_key=tenant_id)
|
||||
|
||||
self.tenant_ids.append(tenant_id)
|
||||
logger.debug(f"Added tenant {tenant_id}.")
|
||||
|
||||
@ -400,10 +373,6 @@ class ServiceQueueManager(BaseQueueManager):
|
||||
self.channel.queue_unbind(queue=request_queue_name, exchange=self.service_request_exchange_name, routing_key=tenant_id)
|
||||
self.channel.queue_delete(request_queue_name)
|
||||
|
||||
response_queue_name = f"{self.service_response_queue_prefix}_{tenant_id}"
|
||||
self.channel.queue_unbind(queue=response_queue_name, exchange=self.service_response_exchange_name, routing_key=tenant_id)
|
||||
self.channel.queue_delete(response_queue_name)
|
||||
|
||||
self.tenant_ids.remove(tenant_id)
|
||||
logger.debug(f"Deleted tenant {tenant_id}.")
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user