From f9679a68be9add0081719c0ec947642a0c45a929 Mon Sep 17 00:00:00 2001 From: Maverick Studer Date: Thu, 18 Jul 2024 13:51:10 +0200 Subject: [PATCH] RED-9331: Explore possibilities for fair upload / analysis processing per tenant --- .../MultiTenancyAutoConfiguration.java | 2 +- .../MultiTenancyMessagingConfiguration.java | 8 ++ .../model/TenantCreatedEvent.java | 14 ++ .../model/TenantQueueConfiguration.java | 24 ++++ .../queue/RabbitQueueFromExchangeService.java | 20 +++ .../RabbitQueueFromExchangeServiceImpl.java | 107 ++++++++++++++++ .../queue/TenantExchangeMessageReceiver.java | 70 ++++++++++ .../queue/TenantMessagingConfiguration.java | 120 ++++++++++++++++++ 8 files changed, 364 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/knecon/fforesight/tenantcommons/model/TenantCreatedEvent.java create mode 100644 src/main/java/com/knecon/fforesight/tenantcommons/model/TenantQueueConfiguration.java create mode 100644 src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeService.java create mode 100644 src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeServiceImpl.java create mode 100644 src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantExchangeMessageReceiver.java create mode 100644 src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantMessagingConfiguration.java diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/MultiTenancyAutoConfiguration.java b/src/main/java/com/knecon/fforesight/tenantcommons/MultiTenancyAutoConfiguration.java index 1944b61..67e1c69 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/MultiTenancyAutoConfiguration.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/MultiTenancyAutoConfiguration.java @@ -10,7 +10,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @AutoConfiguration @EnableFeignClients(basePackageClasses = TenantsClient.class) -@ComponentScan(basePackageClasses = MultiTenancyAutoConfiguration.class) +@ComponentScan(basePackageClasses = {MultiTenancyAutoConfiguration.class}) public class MultiTenancyAutoConfiguration { @PostConstruct diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/MultiTenancyMessagingConfiguration.java b/src/main/java/com/knecon/fforesight/tenantcommons/MultiTenancyMessagingConfiguration.java index b59a908..1f63601 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/MultiTenancyMessagingConfiguration.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/MultiTenancyMessagingConfiguration.java @@ -5,6 +5,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.config.ContainerCustomizer; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; @@ -87,4 +89,10 @@ public class MultiTenancyMessagingConfiguration { return message; }; } + + @Bean + public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { + + return new RabbitAdmin(connectionFactory); + } } diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/model/TenantCreatedEvent.java b/src/main/java/com/knecon/fforesight/tenantcommons/model/TenantCreatedEvent.java new file mode 100644 index 0000000..43260f4 --- /dev/null +++ b/src/main/java/com/knecon/fforesight/tenantcommons/model/TenantCreatedEvent.java @@ -0,0 +1,14 @@ +package com.knecon.fforesight.tenantcommons.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class TenantCreatedEvent { + + private String tenantId; + +} diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/model/TenantQueueConfiguration.java b/src/main/java/com/knecon/fforesight/tenantcommons/model/TenantQueueConfiguration.java new file mode 100644 index 0000000..f5ac2c0 --- /dev/null +++ b/src/main/java/com/knecon/fforesight/tenantcommons/model/TenantQueueConfiguration.java @@ -0,0 +1,24 @@ +package com.knecon.fforesight.tenantcommons.model; + +import java.util.HashMap; +import java.util.Map; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TenantQueueConfiguration { + + private String listenerId; + private String exchangeName; + private String queuePrefix; + private String dlqName; + @Builder.Default + private Map arguments = new HashMap<>(); + +} diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeService.java b/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeService.java new file mode 100644 index 0000000..80b8381 --- /dev/null +++ b/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeService.java @@ -0,0 +1,20 @@ +package com.knecon.fforesight.tenantcommons.queue; + +import java.util.Map; + +public interface RabbitQueueFromExchangeService { + + void addNewQueue(String listenerId, String queueNamePrefix, String exchangeName, String routingKey, String dlq, Map arguments); + + void deleteQueue(String listenerId, String queueNamePrefix, String exchangeName, String routingKey, String dlq, Map arguments); + + + void addQueueToListener(String listenerId, String queueName); + + + void removeQueueFromListener(String listenerId, String queueName); + + + boolean checkQueueExistOnListener(String listenerId, String queueName); + +} diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeServiceImpl.java b/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeServiceImpl.java new file mode 100644 index 0000000..5a63ec2 --- /dev/null +++ b/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeServiceImpl.java @@ -0,0 +1,107 @@ +package com.knecon.fforesight.tenantcommons.queue; + +import java.util.Map; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; +import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; +import org.springframework.stereotype.Service; + +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import lombok.experimental.FieldDefaults; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Service +@RequiredArgsConstructor +@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) +public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchangeService { + + RabbitAdmin rabbitAdmin; + RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry; + + + @Override + public void addNewQueue(String listenerId, String queueNamePrefix, String exchangeName, String routingKey, String dlq, Map arguments) { + + String queueName = queueNamePrefix + "_" + routingKey; + Queue queue = new Queue(queueName, true, false, false); + Binding binding = getBinding(exchangeName, routingKey, dlq, queueName, arguments); + rabbitAdmin.declareQueue(queue); + rabbitAdmin.declareBinding(binding); + addQueueToListener(listenerId, queueName); + } + + + @Override + public void deleteQueue(String listenerId, String queueNamePrefix, String exchangeName, String routingKey, String dlq, Map arguments) { + + String queueName = queueNamePrefix + "_" + routingKey; + removeQueueFromListener(listenerId, queueName); + Binding binding = getBinding(exchangeName, routingKey, dlq, queueName, arguments); + rabbitAdmin.removeBinding(binding); + rabbitAdmin.deleteQueue(queueName); + + } + + + private static Binding getBinding(String exchangeName, String routingKey, String dlq, String queueName, Map arguments) { + + Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null); + if (dlq != null && !dlq.isBlank()) { + binding.addArgument("x-dead-letter-exchange", ""); + binding.addArgument("x-dead-letter-routing-key", dlq); + arguments.forEach(binding::addArgument); + } + return binding; + } + + + @Override + public void addQueueToListener(String listenerId, String queueName) { + + log.info("Adding queue " + queueName + " to listener " + listenerId); + if (!checkQueueExistOnListener(listenerId, queueName)) { + getMessageListenerContainerById(listenerId).addQueueNames(queueName); + } + } + + + @Override + public void removeQueueFromListener(String listenerId, String queueName) { + + log.info("Removing queue " + queueName + " from listener " + listenerId); + if (checkQueueExistOnListener(listenerId, queueName)) { + + getMessageListenerContainerById(listenerId).removeQueueNames(queueName); + rabbitAdmin.deleteQueue(queueName); + } + } + + + @Override + public boolean checkQueueExistOnListener(String listenerId, String queueName) { + + log.debug("Checking if queue " + queueName + " exists on listener " + listenerId); + String[] queueNames = getMessageListenerContainerById(listenerId).getQueueNames(); + for (String name : queueNames) { + if (name.equals(queueName)) { + log.debug("Queue with name : " + queueName + " already exists on listener"); + return true; + } + } + log.debug("Queue with name : " + queueName + " does not exist on listener"); + return false; + } + + + private AbstractMessageListenerContainer getMessageListenerContainerById(String listenerId) { + + log.debug("Getting message listener container by id " + listenerId); + return ((AbstractMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer(listenerId)); + } + +} diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantExchangeMessageReceiver.java b/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantExchangeMessageReceiver.java new file mode 100644 index 0000000..e03d156 --- /dev/null +++ b/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantExchangeMessageReceiver.java @@ -0,0 +1,70 @@ +package com.knecon.fforesight.tenantcommons.queue; + +import java.util.Set; + +import com.knecon.fforesight.tenantcommons.TenantProvider; +import com.knecon.fforesight.tenantcommons.model.TenantCreatedEvent; +import com.knecon.fforesight.tenantcommons.model.TenantQueueConfiguration; +import com.knecon.fforesight.tenantcommons.model.TenantResponse; + +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import lombok.experimental.FieldDefaults; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@RequiredArgsConstructor +@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) +public abstract class TenantExchangeMessageReceiver { + + RabbitQueueFromExchangeService rabbitQueueService; + TenantProvider tenantProvider; + + + protected abstract Set getTenantQueueConfigs(); + + + public void initializeQueues() { + + log.info("Initializing queues for all tenants."); + tenantProvider.getTenants() + .forEach(tenant -> { + getTenantQueueConfigs().parallelStream() + .forEach(tqc -> rabbitQueueService.addNewQueue(tqc.getListenerId(), + tqc.getQueuePrefix(), + tqc.getExchangeName(), + tenant.getTenantId(), + tqc.getDlqName(), + tqc.getArguments())); + log.info("Initialized queues for tenant " + tenant.getTenantId()); + }); + } + + + public void reactToTenantCreation(TenantCreatedEvent tenantCreatedEvent) { + + String tenantId = tenantCreatedEvent.getTenantId(); + log.info("Creating queues for new tenant " + tenantId); + getTenantQueueConfigs().forEach(tqc -> rabbitQueueService.addNewQueue(tqc.getListenerId(), + tqc.getQueuePrefix(), + tqc.getExchangeName(), + tenantId, + tqc.getDlqName(), + tqc.getArguments())); + } + + + public void reactToTenantDeletion(TenantResponse tenantResponse) { + + String tenantId = tenantResponse.getTenantId(); + log.info("Removing queues for deleted tenant " + tenantId); + getTenantQueueConfigs().forEach(tqc -> rabbitQueueService.deleteQueue(tqc.getListenerId(), + tqc.getQueuePrefix(), + tqc.getExchangeName(), + tenantId, + tqc.getDlqName(), + tqc.getArguments())); + + } + +} diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantMessagingConfiguration.java b/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantMessagingConfiguration.java new file mode 100644 index 0000000..fd0f720 --- /dev/null +++ b/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantMessagingConfiguration.java @@ -0,0 +1,120 @@ +package com.knecon.fforesight.tenantcommons.queue; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.amqp.core.TopicExchange; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; + +import lombok.NoArgsConstructor; + +@NoArgsConstructor +public abstract class TenantMessagingConfiguration { + + @Value("${POD_NAME:}") + private String podName; + + private static final String TENANT_EVENTS_DLQ_SUFFIX = "_tenant_events_dlq"; + private static final String TENANT_CREATED_QUEUE_SUFFIX = "_tenant_created_queue"; + private static final String TENANT_DELETED_QUEUE_SUFFIX = "_tenant_deleted_queue"; + + // time in ms after which a deletion will be executed when no consumer is present + // see: https://www.rabbitmq.com/docs/ttl#queue-ttl + public static final int QUEUE_EXPIRATION_TIME = 300000; // 5 minutes + + + public String getTenantCreatedQueueName() { + + return getQueueNameWithSuffix(TENANT_CREATED_QUEUE_SUFFIX); + } + + + public String getTenantDeletedQueueName() { + + return getQueueNameWithSuffix(TENANT_DELETED_QUEUE_SUFFIX); + } + + + public String getTenantEventsDLQName() { + + return getQueueNameWithSuffix(TENANT_EVENTS_DLQ_SUFFIX); + } + + + protected String getQueueNameWithSuffix(String suffix) { + + if (!useDefaultQueueName() && podName != null && !podName.isEmpty()) { + return podName + suffix; + } else { + return getDefaultQueueName(suffix); + } + } + + + protected boolean useDefaultQueueName() { + + return false; + } + + + // This method will be overridden by subclasses if the POD_NAME is not set. + // Default implementation throws an exception to ensure it's implemented if used. + protected String getDefaultQueueName(String suffix) { + + throw new UnsupportedOperationException("Queue name method not implemented"); + } + + + @Bean(name = "tenantExchange") + public TopicExchange tenantExchange(@Value("${fforesight.tenant-exchange.name:tenants-exchange}") String tenantExchangeName) { + + return new TopicExchange(tenantExchangeName); + } + + + @Bean("tenantCreatedQueue") + public Queue tenantCreatedQueue() { + + return QueueBuilder.durable(getTenantCreatedQueueName()) + .withArgument("x-dead-letter-exchange", "") + .withArgument("x-dead-letter-routing-key", getTenantEventsDLQName()) + .withArgument("x-expires", QUEUE_EXPIRATION_TIME) + .build(); + } + + + @Bean("tenantDeletedQueue") + public Queue tenantDeletedQueue() { + + return QueueBuilder.durable(getTenantDeletedQueueName()) + .withArgument("x-dead-letter-exchange", "") + .withArgument("x-dead-letter-routing-key", getTenantEventsDLQName()) + .withArgument("x-expires", QUEUE_EXPIRATION_TIME) + .build(); + } + + + @Bean + public Queue tenantEventsDLQ() { + + return QueueBuilder.durable(getTenantEventsDLQName()).withArgument("x-expires", QUEUE_EXPIRATION_TIME).build(); + } + + + @Bean + public Binding tenantCreatedBinding(@Qualifier("tenantCreatedQueue") Queue tenantCreatedQueue, @Qualifier("tenantExchange") TopicExchange tenantExchange) { + + return BindingBuilder.bind(tenantCreatedQueue).to(tenantExchange).with("tenant.created"); + } + + + @Bean + public Binding tenantDeletedBinding(@Qualifier("tenantDeletedQueue") Queue tenantDeletedQueue, @Qualifier("tenantExchange") TopicExchange tenantExchange) { + + return BindingBuilder.bind(tenantDeletedQueue).to(tenantExchange).with("tenant.delete"); + } + +}