Merge branch 'RED-9331' into 'main'

RED-9331: Explore possibilities for fair upload / analysis processing per tenant

See merge request fforesight/tenant-commons!15
This commit is contained in:
Maverick Studer 2024-07-18 13:51:11 +02:00
commit 00c057f3f6
8 changed files with 364 additions and 1 deletions

View File

@ -10,7 +10,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
@AutoConfiguration
@EnableFeignClients(basePackageClasses = TenantsClient.class)
@ComponentScan(basePackageClasses = MultiTenancyAutoConfiguration.class)
@ComponentScan(basePackageClasses = {MultiTenancyAutoConfiguration.class})
public class MultiTenancyAutoConfiguration {
@PostConstruct

View File

@ -5,6 +5,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
@ -87,4 +89,10 @@ public class MultiTenancyMessagingConfiguration {
return message;
};
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
}

View File

@ -0,0 +1,14 @@
package com.knecon.fforesight.tenantcommons.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TenantCreatedEvent {
private String tenantId;
}

View File

@ -0,0 +1,24 @@
package com.knecon.fforesight.tenantcommons.model;
import java.util.HashMap;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TenantQueueConfiguration {
private String listenerId;
private String exchangeName;
private String queuePrefix;
private String dlqName;
@Builder.Default
private Map<String, Object> arguments = new HashMap<>();
}

View File

@ -0,0 +1,20 @@
package com.knecon.fforesight.tenantcommons.queue;
import java.util.Map;
public interface RabbitQueueFromExchangeService {
void addNewQueue(String listenerId, String queueNamePrefix, String exchangeName, String routingKey, String dlq, Map<String, Object> arguments);
void deleteQueue(String listenerId, String queueNamePrefix, String exchangeName, String routingKey, String dlq, Map<String, Object> arguments);
void addQueueToListener(String listenerId, String queueName);
void removeQueueFromListener(String listenerId, String queueName);
boolean checkQueueExistOnListener(String listenerId, String queueName);
}

View File

@ -0,0 +1,107 @@
package com.knecon.fforesight.tenantcommons.queue;
import java.util.Map;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.stereotype.Service;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.experimental.FieldDefaults;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchangeService {
RabbitAdmin rabbitAdmin;
RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
@Override
public void addNewQueue(String listenerId, String queueNamePrefix, String exchangeName, String routingKey, String dlq, Map<String, Object> arguments) {
String queueName = queueNamePrefix + "_" + routingKey;
Queue queue = new Queue(queueName, true, false, false);
Binding binding = getBinding(exchangeName, routingKey, dlq, queueName, arguments);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(binding);
addQueueToListener(listenerId, queueName);
}
@Override
public void deleteQueue(String listenerId, String queueNamePrefix, String exchangeName, String routingKey, String dlq, Map<String, Object> arguments) {
String queueName = queueNamePrefix + "_" + routingKey;
removeQueueFromListener(listenerId, queueName);
Binding binding = getBinding(exchangeName, routingKey, dlq, queueName, arguments);
rabbitAdmin.removeBinding(binding);
rabbitAdmin.deleteQueue(queueName);
}
private static Binding getBinding(String exchangeName, String routingKey, String dlq, String queueName, Map<String, Object> arguments) {
Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);
if (dlq != null && !dlq.isBlank()) {
binding.addArgument("x-dead-letter-exchange", "");
binding.addArgument("x-dead-letter-routing-key", dlq);
arguments.forEach(binding::addArgument);
}
return binding;
}
@Override
public void addQueueToListener(String listenerId, String queueName) {
log.info("Adding queue " + queueName + " to listener " + listenerId);
if (!checkQueueExistOnListener(listenerId, queueName)) {
getMessageListenerContainerById(listenerId).addQueueNames(queueName);
}
}
@Override
public void removeQueueFromListener(String listenerId, String queueName) {
log.info("Removing queue " + queueName + " from listener " + listenerId);
if (checkQueueExistOnListener(listenerId, queueName)) {
getMessageListenerContainerById(listenerId).removeQueueNames(queueName);
rabbitAdmin.deleteQueue(queueName);
}
}
@Override
public boolean checkQueueExistOnListener(String listenerId, String queueName) {
log.debug("Checking if queue " + queueName + " exists on listener " + listenerId);
String[] queueNames = getMessageListenerContainerById(listenerId).getQueueNames();
for (String name : queueNames) {
if (name.equals(queueName)) {
log.debug("Queue with name : " + queueName + " already exists on listener");
return true;
}
}
log.debug("Queue with name : " + queueName + " does not exist on listener");
return false;
}
private AbstractMessageListenerContainer getMessageListenerContainerById(String listenerId) {
log.debug("Getting message listener container by id " + listenerId);
return ((AbstractMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer(listenerId));
}
}

View File

@ -0,0 +1,70 @@
package com.knecon.fforesight.tenantcommons.queue;
import java.util.Set;
import com.knecon.fforesight.tenantcommons.TenantProvider;
import com.knecon.fforesight.tenantcommons.model.TenantCreatedEvent;
import com.knecon.fforesight.tenantcommons.model.TenantQueueConfiguration;
import com.knecon.fforesight.tenantcommons.model.TenantResponse;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.experimental.FieldDefaults;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
public abstract class TenantExchangeMessageReceiver {
RabbitQueueFromExchangeService rabbitQueueService;
TenantProvider tenantProvider;
protected abstract Set<TenantQueueConfiguration> getTenantQueueConfigs();
public void initializeQueues() {
log.info("Initializing queues for all tenants.");
tenantProvider.getTenants()
.forEach(tenant -> {
getTenantQueueConfigs().parallelStream()
.forEach(tqc -> rabbitQueueService.addNewQueue(tqc.getListenerId(),
tqc.getQueuePrefix(),
tqc.getExchangeName(),
tenant.getTenantId(),
tqc.getDlqName(),
tqc.getArguments()));
log.info("Initialized queues for tenant " + tenant.getTenantId());
});
}
public void reactToTenantCreation(TenantCreatedEvent tenantCreatedEvent) {
String tenantId = tenantCreatedEvent.getTenantId();
log.info("Creating queues for new tenant " + tenantId);
getTenantQueueConfigs().forEach(tqc -> rabbitQueueService.addNewQueue(tqc.getListenerId(),
tqc.getQueuePrefix(),
tqc.getExchangeName(),
tenantId,
tqc.getDlqName(),
tqc.getArguments()));
}
public void reactToTenantDeletion(TenantResponse tenantResponse) {
String tenantId = tenantResponse.getTenantId();
log.info("Removing queues for deleted tenant " + tenantId);
getTenantQueueConfigs().forEach(tqc -> rabbitQueueService.deleteQueue(tqc.getListenerId(),
tqc.getQueuePrefix(),
tqc.getExchangeName(),
tenantId,
tqc.getDlqName(),
tqc.getArguments()));
}
}

View File

@ -0,0 +1,120 @@
package com.knecon.fforesight.tenantcommons.queue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import lombok.NoArgsConstructor;
@NoArgsConstructor
public abstract class TenantMessagingConfiguration {
@Value("${POD_NAME:}")
private String podName;
private static final String TENANT_EVENTS_DLQ_SUFFIX = "_tenant_events_dlq";
private static final String TENANT_CREATED_QUEUE_SUFFIX = "_tenant_created_queue";
private static final String TENANT_DELETED_QUEUE_SUFFIX = "_tenant_deleted_queue";
// time in ms after which a deletion will be executed when no consumer is present
// see: https://www.rabbitmq.com/docs/ttl#queue-ttl
public static final int QUEUE_EXPIRATION_TIME = 300000; // 5 minutes
public String getTenantCreatedQueueName() {
return getQueueNameWithSuffix(TENANT_CREATED_QUEUE_SUFFIX);
}
public String getTenantDeletedQueueName() {
return getQueueNameWithSuffix(TENANT_DELETED_QUEUE_SUFFIX);
}
public String getTenantEventsDLQName() {
return getQueueNameWithSuffix(TENANT_EVENTS_DLQ_SUFFIX);
}
protected String getQueueNameWithSuffix(String suffix) {
if (!useDefaultQueueName() && podName != null && !podName.isEmpty()) {
return podName + suffix;
} else {
return getDefaultQueueName(suffix);
}
}
protected boolean useDefaultQueueName() {
return false;
}
// This method will be overridden by subclasses if the POD_NAME is not set.
// Default implementation throws an exception to ensure it's implemented if used.
protected String getDefaultQueueName(String suffix) {
throw new UnsupportedOperationException("Queue name method not implemented");
}
@Bean(name = "tenantExchange")
public TopicExchange tenantExchange(@Value("${fforesight.tenant-exchange.name:tenants-exchange}") String tenantExchangeName) {
return new TopicExchange(tenantExchangeName);
}
@Bean("tenantCreatedQueue")
public Queue tenantCreatedQueue() {
return QueueBuilder.durable(getTenantCreatedQueueName())
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", getTenantEventsDLQName())
.withArgument("x-expires", QUEUE_EXPIRATION_TIME)
.build();
}
@Bean("tenantDeletedQueue")
public Queue tenantDeletedQueue() {
return QueueBuilder.durable(getTenantDeletedQueueName())
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", getTenantEventsDLQName())
.withArgument("x-expires", QUEUE_EXPIRATION_TIME)
.build();
}
@Bean
public Queue tenantEventsDLQ() {
return QueueBuilder.durable(getTenantEventsDLQName()).withArgument("x-expires", QUEUE_EXPIRATION_TIME).build();
}
@Bean
public Binding tenantCreatedBinding(@Qualifier("tenantCreatedQueue") Queue tenantCreatedQueue, @Qualifier("tenantExchange") TopicExchange tenantExchange) {
return BindingBuilder.bind(tenantCreatedQueue).to(tenantExchange).with("tenant.created");
}
@Bean
public Binding tenantDeletedBinding(@Qualifier("tenantDeletedQueue") Queue tenantDeletedQueue, @Qualifier("tenantExchange") TopicExchange tenantExchange) {
return BindingBuilder.bind(tenantDeletedQueue).to(tenantExchange).with("tenant.delete");
}
}