From 2da4f37620ab0287e6d417c63f0cf27634886e79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonathan=20K=C3=B6ssler?= Date: Thu, 11 Jul 2024 12:49:07 +0200 Subject: [PATCH] feat: wip for multiple tenants - for pkg build --- pyinfra/queue/threaded_tenants.py | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/pyinfra/queue/threaded_tenants.py b/pyinfra/queue/threaded_tenants.py index a18e66f..219f3d2 100644 --- a/pyinfra/queue/threaded_tenants.py +++ b/pyinfra/queue/threaded_tenants.py @@ -250,7 +250,6 @@ class ServiceQueueManager(BaseQueueManager): self.service_response_exchange_name = settings.rabbitmq.service_response_exchange_name self.service_request_queue_prefix = settings.rabbitmq.service_request_queue_prefix - self.service_response_queue_prefix = settings.rabbitmq.service_response_queue_prefix self.service_dlq_name = settings.rabbitmq.service_dlq_name @@ -276,18 +275,6 @@ class ServiceQueueManager(BaseQueueManager): queue=request_queue_name, exchange=self.service_request_exchange_name, routing_key=tenant_id ) - response_queue_name = f"{self.service_response_queue_prefix}_{tenant_id}" - self.channel.queue_declare( - queue=response_queue_name, - durable=True, - arguments={ - "x-dead-letter-exchange": "", - "x-dead-letter-routing-key": self.service_dlq_name, - "x-expires": self.queue_expiration_time, # TODO: check if necessary - }, - ) - self.channel.queue_bind(queue=response_queue_name, exchange=self.service_response_exchange_name, routing_key=tenant_id) - @retry(tries=3, delay=5, jitter=(1, 3), logger=logger, exceptions=(requests.exceptions.HTTPError, requests.exceptions.ConnectionError)) def get_initial_tenant_ids(self, tenant_endpoint_url: str) -> list: response = requests.get(tenant_endpoint_url, timeout=10) @@ -360,9 +347,7 @@ class ServiceQueueManager(BaseQueueManager): try: for tenant_id in self.tenant_ids: request_queue_name = f"{self.service_request_queue_prefix}_{tenant_id}" - response_queue_name = f"{self.service_response_queue_prefix}_{tenant_id}" self.channel.queue_purge(request_queue_name) - self.channel.queue_purge(response_queue_name) logger.info("Queues purged.") except pika.exceptions.ChannelWrongStateError: pass @@ -380,18 +365,6 @@ class ServiceQueueManager(BaseQueueManager): ) self.channel.queue_bind(queue=request_queue_name, exchange=self.service_request_exchange_name, routing_key=tenant_id) - response_queue_name = f"{self.service_response_queue_prefix}_{tenant_id}" - self.channel.queue_declare( - queue=response_queue_name, - durable=True, - arguments={ - "x-dead-letter-exchange": "", - "x-dead-letter-routing-key": self.service_dlq_name, - "x-expires": self.queue_expiration_time, # TODO: check if necessary - }, - ) - self.channel.queue_bind(queue=response_queue_name, exchange=self.service_response_exchange_name, routing_key=tenant_id) - self.tenant_ids.append(tenant_id) logger.debug(f"Added tenant {tenant_id}.") @@ -400,10 +373,6 @@ class ServiceQueueManager(BaseQueueManager): self.channel.queue_unbind(queue=request_queue_name, exchange=self.service_request_exchange_name, routing_key=tenant_id) self.channel.queue_delete(request_queue_name) - response_queue_name = f"{self.service_response_queue_prefix}_{tenant_id}" - self.channel.queue_unbind(queue=response_queue_name, exchange=self.service_response_exchange_name, routing_key=tenant_id) - self.channel.queue_delete(response_queue_name) - self.tenant_ids.remove(tenant_id) logger.debug(f"Deleted tenant {tenant_id}.")