Merge branch 'RED-9331' into 'main'
RED-9331: Implement fair upload / analysis processing per tenant See merge request fforesight/tenant-commons!17
This commit is contained in:
commit
b59fc9278d
@ -11,6 +11,7 @@ import org.springframework.stereotype.Service;
|
|||||||
|
|
||||||
import lombok.AccessLevel;
|
import lombok.AccessLevel;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.SneakyThrows;
|
||||||
import lombok.experimental.FieldDefaults;
|
import lombok.experimental.FieldDefaults;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
@ -25,13 +26,17 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan
|
|||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@SneakyThrows
|
||||||
public void addNewQueue(String listenerId, String queueNamePrefix, String exchangeName, String routingKey, String dlq, Map<String, Object> arguments) {
|
public void addNewQueue(String listenerId, String queueNamePrefix, String exchangeName, String routingKey, String dlq, Map<String, Object> arguments) {
|
||||||
|
|
||||||
String queueName = queueNamePrefix + "_" + routingKey;
|
String queueName = queueNamePrefix + "_" + routingKey;
|
||||||
Queue queue = new Queue(queueName, true, false, false);
|
Queue queue = new Queue(queueName, true, false, false);
|
||||||
Binding binding = getBinding(exchangeName, routingKey, dlq, queueName, arguments);
|
Binding binding = getBinding(exchangeName, routingKey, dlq, queueName, arguments);
|
||||||
rabbitAdmin.declareQueue(queue);
|
|
||||||
|
String returnedQueueName = rabbitAdmin.declareQueue(queue);
|
||||||
|
log.debug("Declared queue {}", returnedQueueName);
|
||||||
rabbitAdmin.declareBinding(binding);
|
rabbitAdmin.declareBinding(binding);
|
||||||
|
log.debug("Declared binding");
|
||||||
addQueueToListener(listenerId, queueName);
|
addQueueToListener(listenerId, queueName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -43,7 +48,12 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan
|
|||||||
removeQueueFromListener(listenerId, queueName);
|
removeQueueFromListener(listenerId, queueName);
|
||||||
Binding binding = getBinding(exchangeName, routingKey, dlq, queueName, arguments);
|
Binding binding = getBinding(exchangeName, routingKey, dlq, queueName, arguments);
|
||||||
rabbitAdmin.removeBinding(binding);
|
rabbitAdmin.removeBinding(binding);
|
||||||
rabbitAdmin.deleteQueue(queueName);
|
log.debug("Removed binding");
|
||||||
|
if(rabbitAdmin.deleteQueue(queueName)) {
|
||||||
|
log.info("Deleted queue {} successfully", queueName);
|
||||||
|
} else {
|
||||||
|
log.info("Queue deletion failed");
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -66,7 +76,7 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan
|
|||||||
log.info("Adding queue " + queueName + " to listener " + listenerId);
|
log.info("Adding queue " + queueName + " to listener " + listenerId);
|
||||||
if (!checkQueueExistOnListener(listenerId, queueName)) {
|
if (!checkQueueExistOnListener(listenerId, queueName)) {
|
||||||
getMessageListenerContainerById(listenerId).addQueueNames(queueName);
|
getMessageListenerContainerById(listenerId).addQueueNames(queueName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -75,7 +85,7 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan
|
|||||||
|
|
||||||
log.info("Removing queue " + queueName + " from listener " + listenerId);
|
log.info("Removing queue " + queueName + " from listener " + listenerId);
|
||||||
if (checkQueueExistOnListener(listenerId, queueName)) {
|
if (checkQueueExistOnListener(listenerId, queueName)) {
|
||||||
|
|
||||||
getMessageListenerContainerById(listenerId).removeQueueNames(queueName);
|
getMessageListenerContainerById(listenerId).removeQueueNames(queueName);
|
||||||
rabbitAdmin.deleteQueue(queueName);
|
rabbitAdmin.deleteQueue(queueName);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
package com.knecon.fforesight.tenantcommons.queue;
|
package com.knecon.fforesight.tenantcommons.queue;
|
||||||
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
|
|
||||||
import com.knecon.fforesight.tenantcommons.TenantProvider;
|
import com.knecon.fforesight.tenantcommons.TenantProvider;
|
||||||
import com.knecon.fforesight.tenantcommons.model.TenantCreatedEvent;
|
import com.knecon.fforesight.tenantcommons.model.TenantCreatedEvent;
|
||||||
@ -9,6 +10,7 @@ import com.knecon.fforesight.tenantcommons.model.TenantResponse;
|
|||||||
|
|
||||||
import lombok.AccessLevel;
|
import lombok.AccessLevel;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.SneakyThrows;
|
||||||
import lombok.experimental.FieldDefaults;
|
import lombok.experimental.FieldDefaults;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
@ -19,51 +21,73 @@ public abstract class TenantExchangeMessageReceiver {
|
|||||||
|
|
||||||
RabbitQueueFromExchangeService rabbitQueueService;
|
RabbitQueueFromExchangeService rabbitQueueService;
|
||||||
TenantProvider tenantProvider;
|
TenantProvider tenantProvider;
|
||||||
|
Semaphore semaphore = new Semaphore(1);
|
||||||
|
|
||||||
|
|
||||||
protected abstract Set<TenantQueueConfiguration> getTenantQueueConfigs();
|
protected abstract Set<TenantQueueConfiguration> getTenantQueueConfigs();
|
||||||
|
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
public void initializeQueues() {
|
public void initializeQueues() {
|
||||||
|
|
||||||
log.info("Initializing queues for all tenants.");
|
try {
|
||||||
tenantProvider.getTenants()
|
semaphore.acquire();
|
||||||
.forEach(tenant -> {
|
|
||||||
getTenantQueueConfigs().parallelStream()
|
log.info("Initializing queues for all tenants.");
|
||||||
.forEach(tqc -> rabbitQueueService.addNewQueue(tqc.getListenerId(),
|
tenantProvider.getTenants()
|
||||||
tqc.getQueuePrefix(),
|
.forEach(tenant -> {
|
||||||
tqc.getExchangeName(),
|
getTenantQueueConfigs().parallelStream()
|
||||||
tenant.getTenantId(),
|
.forEach(tqc -> rabbitQueueService.addNewQueue(tqc.getListenerId(),
|
||||||
tqc.getDlqName(),
|
tqc.getQueuePrefix(),
|
||||||
tqc.getArguments()));
|
tqc.getExchangeName(),
|
||||||
log.info("Initialized queues for tenant " + tenant.getTenantId());
|
tenant.getTenantId(),
|
||||||
});
|
tqc.getDlqName(),
|
||||||
|
tqc.getArguments()));
|
||||||
|
log.info("Initialized queues for tenant " + tenant.getTenantId());
|
||||||
|
});
|
||||||
|
} finally {
|
||||||
|
semaphore.release();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
public void reactToTenantCreation(TenantCreatedEvent tenantCreatedEvent) {
|
public void reactToTenantCreation(TenantCreatedEvent tenantCreatedEvent) {
|
||||||
|
|
||||||
String tenantId = tenantCreatedEvent.getTenantId();
|
try {
|
||||||
log.info("Creating queues for new tenant " + tenantId);
|
semaphore.acquire();
|
||||||
getTenantQueueConfigs().forEach(tqc -> rabbitQueueService.addNewQueue(tqc.getListenerId(),
|
|
||||||
tqc.getQueuePrefix(),
|
String tenantId = tenantCreatedEvent.getTenantId();
|
||||||
tqc.getExchangeName(),
|
log.info("Creating queues for new tenant " + tenantId);
|
||||||
tenantId,
|
getTenantQueueConfigs().forEach(tqc -> rabbitQueueService.addNewQueue(tqc.getListenerId(),
|
||||||
tqc.getDlqName(),
|
tqc.getQueuePrefix(),
|
||||||
tqc.getArguments()));
|
tqc.getExchangeName(),
|
||||||
|
tenantId,
|
||||||
|
tqc.getDlqName(),
|
||||||
|
tqc.getArguments()));
|
||||||
|
} finally {
|
||||||
|
semaphore.release();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
public void reactToTenantDeletion(TenantResponse tenantResponse) {
|
public void reactToTenantDeletion(TenantResponse tenantResponse) {
|
||||||
|
|
||||||
String tenantId = tenantResponse.getTenantId();
|
try {
|
||||||
log.info("Removing queues for deleted tenant " + tenantId);
|
semaphore.acquire();
|
||||||
getTenantQueueConfigs().forEach(tqc -> rabbitQueueService.deleteQueue(tqc.getListenerId(),
|
|
||||||
tqc.getQueuePrefix(),
|
String tenantId = tenantResponse.getTenantId();
|
||||||
tqc.getExchangeName(),
|
log.info("Removing queues for deleted tenant " + tenantId);
|
||||||
tenantId,
|
getTenantQueueConfigs().forEach(tqc -> rabbitQueueService.deleteQueue(tqc.getListenerId(),
|
||||||
tqc.getDlqName(),
|
tqc.getQueuePrefix(),
|
||||||
tqc.getArguments()));
|
tqc.getExchangeName(),
|
||||||
|
tenantId,
|
||||||
|
tqc.getDlqName(),
|
||||||
|
tqc.getArguments()));
|
||||||
|
} finally {
|
||||||
|
semaphore.release();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -3,8 +3,6 @@ package com.knecon.fforesight.tenantcommons.task;
|
|||||||
import static org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME;
|
import static org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME;
|
||||||
import static org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;
|
import static org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;
|
||||||
|
|
||||||
import java.util.concurrent.Executor;
|
|
||||||
|
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||||
import org.springframework.boot.task.TaskExecutorBuilder;
|
import org.springframework.boot.task.TaskExecutorBuilder;
|
||||||
@ -19,7 +17,7 @@ public class ApplicationTaskExecutorBeanConfig {
|
|||||||
|
|
||||||
@Lazy
|
@Lazy
|
||||||
@Bean(name = {APPLICATION_TASK_EXECUTOR_BEAN_NAME, DEFAULT_TASK_EXECUTOR_BEAN_NAME})
|
@Bean(name = {APPLICATION_TASK_EXECUTOR_BEAN_NAME, DEFAULT_TASK_EXECUTOR_BEAN_NAME})
|
||||||
@ConditionalOnMissingBean({Executor.class})
|
@ConditionalOnMissingBean({ThreadPoolTaskExecutor.class})
|
||||||
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
|
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user