RED-9331: Implement fair upload / analysis processing per tenant

This commit is contained in:
Maverick Studer 2024-08-26 14:38:23 +02:00
parent 30cf9a68af
commit 71e3082cd0
3 changed files with 67 additions and 35 deletions

View File

@ -11,6 +11,7 @@ import org.springframework.stereotype.Service;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.experimental.FieldDefaults;
import lombok.extern.slf4j.Slf4j;
@ -25,13 +26,17 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan
@Override
@SneakyThrows
public void addNewQueue(String listenerId, String queueNamePrefix, String exchangeName, String routingKey, String dlq, Map<String, Object> arguments) {
String queueName = queueNamePrefix + "_" + routingKey;
Queue queue = new Queue(queueName, true, false, false);
Binding binding = getBinding(exchangeName, routingKey, dlq, queueName, arguments);
rabbitAdmin.declareQueue(queue);
String returnedQueueName = rabbitAdmin.declareQueue(queue);
log.debug("Declared queue {}", returnedQueueName);
rabbitAdmin.declareBinding(binding);
log.debug("Declared binding");
addQueueToListener(listenerId, queueName);
}
@ -43,7 +48,12 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan
removeQueueFromListener(listenerId, queueName);
Binding binding = getBinding(exchangeName, routingKey, dlq, queueName, arguments);
rabbitAdmin.removeBinding(binding);
rabbitAdmin.deleteQueue(queueName);
log.debug("Removed binding");
if(rabbitAdmin.deleteQueue(queueName)) {
log.info("Deleted queue {} successfully", queueName);
} else {
log.info("Queue deletion failed");
}
}
@ -66,7 +76,7 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan
log.info("Adding queue " + queueName + " to listener " + listenerId);
if (!checkQueueExistOnListener(listenerId, queueName)) {
getMessageListenerContainerById(listenerId).addQueueNames(queueName);
}
}
}
@ -75,7 +85,7 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan
log.info("Removing queue " + queueName + " from listener " + listenerId);
if (checkQueueExistOnListener(listenerId, queueName)) {
getMessageListenerContainerById(listenerId).removeQueueNames(queueName);
rabbitAdmin.deleteQueue(queueName);
}

View File

@ -1,6 +1,7 @@
package com.knecon.fforesight.tenantcommons.queue;
import java.util.Set;
import java.util.concurrent.Semaphore;
import com.knecon.fforesight.tenantcommons.TenantProvider;
import com.knecon.fforesight.tenantcommons.model.TenantCreatedEvent;
@ -9,6 +10,7 @@ import com.knecon.fforesight.tenantcommons.model.TenantResponse;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.experimental.FieldDefaults;
import lombok.extern.slf4j.Slf4j;
@ -19,51 +21,73 @@ public abstract class TenantExchangeMessageReceiver {
RabbitQueueFromExchangeService rabbitQueueService;
TenantProvider tenantProvider;
Semaphore semaphore = new Semaphore(1);
protected abstract Set<TenantQueueConfiguration> getTenantQueueConfigs();
@SneakyThrows
public void initializeQueues() {
log.info("Initializing queues for all tenants.");
tenantProvider.getTenants()
.forEach(tenant -> {
getTenantQueueConfigs().parallelStream()
.forEach(tqc -> rabbitQueueService.addNewQueue(tqc.getListenerId(),
tqc.getQueuePrefix(),
tqc.getExchangeName(),
tenant.getTenantId(),
tqc.getDlqName(),
tqc.getArguments()));
log.info("Initialized queues for tenant " + tenant.getTenantId());
});
try {
semaphore.acquire();
log.info("Initializing queues for all tenants.");
tenantProvider.getTenants()
.forEach(tenant -> {
getTenantQueueConfigs().parallelStream()
.forEach(tqc -> rabbitQueueService.addNewQueue(tqc.getListenerId(),
tqc.getQueuePrefix(),
tqc.getExchangeName(),
tenant.getTenantId(),
tqc.getDlqName(),
tqc.getArguments()));
log.info("Initialized queues for tenant " + tenant.getTenantId());
});
} finally {
semaphore.release();
}
}
@SneakyThrows
public void reactToTenantCreation(TenantCreatedEvent tenantCreatedEvent) {
String tenantId = tenantCreatedEvent.getTenantId();
log.info("Creating queues for new tenant " + tenantId);
getTenantQueueConfigs().forEach(tqc -> rabbitQueueService.addNewQueue(tqc.getListenerId(),
tqc.getQueuePrefix(),
tqc.getExchangeName(),
tenantId,
tqc.getDlqName(),
tqc.getArguments()));
try {
semaphore.acquire();
String tenantId = tenantCreatedEvent.getTenantId();
log.info("Creating queues for new tenant " + tenantId);
getTenantQueueConfigs().forEach(tqc -> rabbitQueueService.addNewQueue(tqc.getListenerId(),
tqc.getQueuePrefix(),
tqc.getExchangeName(),
tenantId,
tqc.getDlqName(),
tqc.getArguments()));
} finally {
semaphore.release();
}
}
@SneakyThrows
public void reactToTenantDeletion(TenantResponse tenantResponse) {
String tenantId = tenantResponse.getTenantId();
log.info("Removing queues for deleted tenant " + tenantId);
getTenantQueueConfigs().forEach(tqc -> rabbitQueueService.deleteQueue(tqc.getListenerId(),
tqc.getQueuePrefix(),
tqc.getExchangeName(),
tenantId,
tqc.getDlqName(),
tqc.getArguments()));
try {
semaphore.acquire();
String tenantId = tenantResponse.getTenantId();
log.info("Removing queues for deleted tenant " + tenantId);
getTenantQueueConfigs().forEach(tqc -> rabbitQueueService.deleteQueue(tqc.getListenerId(),
tqc.getQueuePrefix(),
tqc.getExchangeName(),
tenantId,
tqc.getDlqName(),
tqc.getArguments()));
} finally {
semaphore.release();
}
}

View File

@ -3,8 +3,6 @@ package com.knecon.fforesight.tenantcommons.task;
import static org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME;
import static org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;
import java.util.concurrent.Executor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.task.TaskExecutorBuilder;
@ -19,7 +17,7 @@ public class ApplicationTaskExecutorBeanConfig {
@Lazy
@Bean(name = {APPLICATION_TASK_EXECUTOR_BEAN_NAME, DEFAULT_TASK_EXECUTOR_BEAN_NAME})
@ConditionalOnMissingBean({Executor.class})
@ConditionalOnMissingBean({ThreadPoolTaskExecutor.class})
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
return builder.build();
}