From 71e3082cd0acf55038da2dd2628a96d25ea0abc2 Mon Sep 17 00:00:00 2001 From: Maverick Studer Date: Mon, 26 Aug 2024 14:38:23 +0200 Subject: [PATCH] RED-9331: Implement fair upload / analysis processing per tenant --- .../RabbitQueueFromExchangeServiceImpl.java | 18 ++++- .../queue/TenantExchangeMessageReceiver.java | 80 ++++++++++++------- .../ApplicationTaskExecutorBeanConfig.java | 4 +- 3 files changed, 67 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeServiceImpl.java b/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeServiceImpl.java index 5a63ec2..3f116a3 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeServiceImpl.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeServiceImpl.java @@ -11,6 +11,7 @@ import org.springframework.stereotype.Service; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import lombok.experimental.FieldDefaults; import lombok.extern.slf4j.Slf4j; @@ -25,13 +26,17 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan @Override + @SneakyThrows public void addNewQueue(String listenerId, String queueNamePrefix, String exchangeName, String routingKey, String dlq, Map arguments) { String queueName = queueNamePrefix + "_" + routingKey; Queue queue = new Queue(queueName, true, false, false); Binding binding = getBinding(exchangeName, routingKey, dlq, queueName, arguments); - rabbitAdmin.declareQueue(queue); + + String returnedQueueName = rabbitAdmin.declareQueue(queue); + log.debug("Declared queue {}", returnedQueueName); rabbitAdmin.declareBinding(binding); + log.debug("Declared binding"); addQueueToListener(listenerId, queueName); } @@ -43,7 +48,12 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan removeQueueFromListener(listenerId, queueName); Binding binding = getBinding(exchangeName, routingKey, dlq, queueName, arguments); rabbitAdmin.removeBinding(binding); - rabbitAdmin.deleteQueue(queueName); + log.debug("Removed binding"); + if(rabbitAdmin.deleteQueue(queueName)) { + log.info("Deleted queue {} successfully", queueName); + } else { + log.info("Queue deletion failed"); + } } @@ -66,7 +76,7 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan log.info("Adding queue " + queueName + " to listener " + listenerId); if (!checkQueueExistOnListener(listenerId, queueName)) { getMessageListenerContainerById(listenerId).addQueueNames(queueName); - } + } } @@ -75,7 +85,7 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan log.info("Removing queue " + queueName + " from listener " + listenerId); if (checkQueueExistOnListener(listenerId, queueName)) { - + getMessageListenerContainerById(listenerId).removeQueueNames(queueName); rabbitAdmin.deleteQueue(queueName); } diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantExchangeMessageReceiver.java b/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantExchangeMessageReceiver.java index e03d156..faa2381 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantExchangeMessageReceiver.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantExchangeMessageReceiver.java @@ -1,6 +1,7 @@ package com.knecon.fforesight.tenantcommons.queue; import java.util.Set; +import java.util.concurrent.Semaphore; import com.knecon.fforesight.tenantcommons.TenantProvider; import com.knecon.fforesight.tenantcommons.model.TenantCreatedEvent; @@ -9,6 +10,7 @@ import com.knecon.fforesight.tenantcommons.model.TenantResponse; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import lombok.experimental.FieldDefaults; import lombok.extern.slf4j.Slf4j; @@ -19,51 +21,73 @@ public abstract class TenantExchangeMessageReceiver { RabbitQueueFromExchangeService rabbitQueueService; TenantProvider tenantProvider; + Semaphore semaphore = new Semaphore(1); protected abstract Set getTenantQueueConfigs(); + @SneakyThrows public void initializeQueues() { - log.info("Initializing queues for all tenants."); - tenantProvider.getTenants() - .forEach(tenant -> { - getTenantQueueConfigs().parallelStream() - .forEach(tqc -> rabbitQueueService.addNewQueue(tqc.getListenerId(), - tqc.getQueuePrefix(), - tqc.getExchangeName(), - tenant.getTenantId(), - tqc.getDlqName(), - tqc.getArguments())); - log.info("Initialized queues for tenant " + tenant.getTenantId()); - }); + try { + semaphore.acquire(); + + log.info("Initializing queues for all tenants."); + tenantProvider.getTenants() + .forEach(tenant -> { + getTenantQueueConfigs().parallelStream() + .forEach(tqc -> rabbitQueueService.addNewQueue(tqc.getListenerId(), + tqc.getQueuePrefix(), + tqc.getExchangeName(), + tenant.getTenantId(), + tqc.getDlqName(), + tqc.getArguments())); + log.info("Initialized queues for tenant " + tenant.getTenantId()); + }); + } finally { + semaphore.release(); + } } + @SneakyThrows public void reactToTenantCreation(TenantCreatedEvent tenantCreatedEvent) { - String tenantId = tenantCreatedEvent.getTenantId(); - log.info("Creating queues for new tenant " + tenantId); - getTenantQueueConfigs().forEach(tqc -> rabbitQueueService.addNewQueue(tqc.getListenerId(), - tqc.getQueuePrefix(), - tqc.getExchangeName(), - tenantId, - tqc.getDlqName(), - tqc.getArguments())); + try { + semaphore.acquire(); + + String tenantId = tenantCreatedEvent.getTenantId(); + log.info("Creating queues for new tenant " + tenantId); + getTenantQueueConfigs().forEach(tqc -> rabbitQueueService.addNewQueue(tqc.getListenerId(), + tqc.getQueuePrefix(), + tqc.getExchangeName(), + tenantId, + tqc.getDlqName(), + tqc.getArguments())); + } finally { + semaphore.release(); + } } + @SneakyThrows public void reactToTenantDeletion(TenantResponse tenantResponse) { - String tenantId = tenantResponse.getTenantId(); - log.info("Removing queues for deleted tenant " + tenantId); - getTenantQueueConfigs().forEach(tqc -> rabbitQueueService.deleteQueue(tqc.getListenerId(), - tqc.getQueuePrefix(), - tqc.getExchangeName(), - tenantId, - tqc.getDlqName(), - tqc.getArguments())); + try { + semaphore.acquire(); + + String tenantId = tenantResponse.getTenantId(); + log.info("Removing queues for deleted tenant " + tenantId); + getTenantQueueConfigs().forEach(tqc -> rabbitQueueService.deleteQueue(tqc.getListenerId(), + tqc.getQueuePrefix(), + tqc.getExchangeName(), + tenantId, + tqc.getDlqName(), + tqc.getArguments())); + } finally { + semaphore.release(); + } } diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/task/ApplicationTaskExecutorBeanConfig.java b/src/main/java/com/knecon/fforesight/tenantcommons/task/ApplicationTaskExecutorBeanConfig.java index 58d2487..63026e9 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/task/ApplicationTaskExecutorBeanConfig.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/task/ApplicationTaskExecutorBeanConfig.java @@ -3,8 +3,6 @@ package com.knecon.fforesight.tenantcommons.task; import static org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME; import static org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME; -import java.util.concurrent.Executor; - import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.task.TaskExecutorBuilder; @@ -19,7 +17,7 @@ public class ApplicationTaskExecutorBeanConfig { @Lazy @Bean(name = {APPLICATION_TASK_EXECUTOR_BEAN_NAME, DEFAULT_TASK_EXECUTOR_BEAN_NAME}) - @ConditionalOnMissingBean({Executor.class}) + @ConditionalOnMissingBean({ThreadPoolTaskExecutor.class}) public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { return builder.build(); }