diff --git a/.gitignore b/.gitignore index f8f4245..04dacd1 100644 --- a/.gitignore +++ b/.gitignore @@ -38,4 +38,5 @@ gradlew.bat gradle/ **/.gradle -**/build \ No newline at end of file +**/build +.DS_Store diff --git a/build.gradle.kts b/build.gradle.kts index f779e86..dde257b 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,7 +1,6 @@ plugins { `java-library` `maven-publish` - `kotlin-dsl` pmd checkstyle jacoco @@ -13,7 +12,8 @@ val storageCommonsVersion = "2.43.0" val springBootVersion = "3.1.5" val springCloudVersion = "4.0.4" val springRabbitTest = "3.0.9" -val lombokVersion = "1.18.30" +val testContainersVersion = "1.20.1" + dependencies { api("com.iqser.red.commons:storage-commons:${storageCommonsVersion}") @@ -21,10 +21,10 @@ dependencies { api("org.springframework.boot:spring-boot-starter-web:${springBootVersion}") api("org.springframework.cloud:spring-cloud-starter-openfeign:${springCloudVersion}") api("org.springframework.boot:spring-boot-configuration-processor:${springBootVersion}") - api("org.projectlombok:lombok:${lombokVersion}") - runtimeOnly("org.springframework.boot:spring-boot-devtools:${springBootVersion}") + testImplementation("org.springframework.boot:spring-boot-configuration-processor:${springBootVersion}") testImplementation("org.springframework.boot:spring-boot-starter-test:${springBootVersion}") testImplementation("org.springframework.amqp:spring-rabbit-test:${springRabbitTest}") + testImplementation("org.testcontainers:rabbitmq:${testContainersVersion}") } group = "com.knecon.fforesight" @@ -87,7 +87,7 @@ tasks.named("test") { sonarqube { properties { - property("sonar.login", providers.gradleProperty("sonarToken").getOrNull()) + providers.gradleProperty("sonarToken").getOrNull()?.let { property("sonar.login", it) } property("sonar.host.url", "https://sonarqube.knecon.com") } } @@ -107,4 +107,4 @@ tasks.jacocoTestReport { java { withJavadocJar() -} \ No newline at end of file +} diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/MultiTenancyMessagingConfiguration.java b/src/main/java/com/knecon/fforesight/tenantcommons/MultiTenancyMessagingConfiguration.java index 1f63601..850de19 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/MultiTenancyMessagingConfiguration.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/MultiTenancyMessagingConfiguration.java @@ -1,7 +1,7 @@ package com.knecon.fforesight.tenantcommons; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; +import static com.knecon.fforesight.tenantcommons.ForwardTenantInterceptor.TENANT_HEADER_NAME; + import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.config.ContainerCustomizer; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; @@ -19,7 +19,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; -import static com.knecon.fforesight.tenantcommons.ForwardTenantInterceptor.TENANT_HEADER_NAME; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; @Configuration @ConditionalOnClass(RabbitTemplate.class) @@ -37,6 +38,7 @@ public class MultiTenancyMessagingConfiguration { @Primary public ContainerCustomizer simpleMessageListenerContainerCustomizer( ObjectProvider customizers) { + return container -> customizers.orderedStream().forEach((customizer) -> customizer.customize(container)); } @@ -45,39 +47,50 @@ public class MultiTenancyMessagingConfiguration { @Primary public RabbitTemplateCustomizer rabbitTemplateMultiCustomizer( ObjectProvider customizers) { + return rabbitTemplate -> customizers.orderedStream().forEach((customizer) -> customizer.customize(rabbitTemplate)); } + @Bean public MessageConverter producerJackson2MessageConverter() { + ObjectMapper mapper = new ObjectMapper().findAndRegisterModules(); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); return new Jackson2JsonMessageConverter(mapper); } + @Bean public RabbitTemplateMultiCustomizer rabbitTemplatePublishTenantIdHeaderCustomizer( @Qualifier("tenantIdSetterPostProcessor") MessagePostProcessor messagePostProcessor) { + return template -> template.addBeforePublishPostProcessors(messagePostProcessor); } + @Bean public SimpleMessageListenerContainerCustomizer rabbitInterceptTenantIdHeaderCustomizer( @Qualifier("tenantIdGetterPostProcessor") MessagePostProcessor messagePostProcessor) { + return container -> container.addAfterReceivePostProcessors(messagePostProcessor); } + @Bean public MessagePostProcessor tenantIdSetterPostProcessor() { + return message -> { message.getMessageProperties().setHeader(TENANT_HEADER_NAME, TenantContext.getTenantId()); return message; }; } + @Bean public MessagePostProcessor tenantIdGetterPostProcessor() { + return message -> { String tenant = message.getMessageProperties().getHeader(TENANT_HEADER_NAME); @@ -90,9 +103,11 @@ public class MultiTenancyMessagingConfiguration { }; } + @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } + } diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/RabbitConfiguration.java b/src/main/java/com/knecon/fforesight/tenantcommons/RabbitConfiguration.java index d7edff4..2e4b921 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/RabbitConfiguration.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/RabbitConfiguration.java @@ -1,7 +1,5 @@ package com.knecon.fforesight.tenantcommons; -import org.jetbrains.annotations.NotNull; -import org.springframework.amqp.AmqpRejectAndDontRequeueException; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; @@ -17,18 +15,30 @@ public class RabbitConfiguration { @Bean public BeanPostProcessor rabbitListenerContainerFactoryPostProcessor() { - return new BeanPostProcessor() { - @Override - public Object postProcessAfterInitialization(@NotNull Object bean, @NotNull String beanName) throws BeansException { + return new CustomBeanPostProcessor(); + } - if (bean instanceof SimpleRabbitListenerContainerFactory factory) { - factory.setErrorHandler(t -> { - log.error("Error occurred in Rabbit listener: ", t); - }); - } - return bean; + + public static class CustomBeanPostProcessor implements BeanPostProcessor { + + @Override + public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { + + return bean; + } + + + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + + if (bean instanceof SimpleRabbitListenerContainerFactory factory) { + factory.setErrorHandler(t -> { + log.error("Error occurred in Rabbit listener: ", t); + }); } - }; + return bean; + } + } } diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/SimpleMessageListenerContainerCustomizer.java b/src/main/java/com/knecon/fforesight/tenantcommons/SimpleMessageListenerContainerCustomizer.java index 9fe3143..41e6acd 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/SimpleMessageListenerContainerCustomizer.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/SimpleMessageListenerContainerCustomizer.java @@ -6,4 +6,5 @@ import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; public interface SimpleMessageListenerContainerCustomizer { void customize(SimpleMessageListenerContainer simpleMessageListenerContainer); + } diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/StorageConnectionProviderImpl.java b/src/main/java/com/knecon/fforesight/tenantcommons/StorageConnectionProviderImpl.java index 530c49a..eecb8eb 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/StorageConnectionProviderImpl.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/StorageConnectionProviderImpl.java @@ -4,7 +4,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.stereotype.Service; import com.iqser.red.storage.commons.service.StorageConnectionProvider; - import lombok.RequiredArgsConstructor; @Service diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/TenantAwareTaskDecorator.java b/src/main/java/com/knecon/fforesight/tenantcommons/TenantAwareTaskDecorator.java index 1a2b96e..b36ff15 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/TenantAwareTaskDecorator.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/TenantAwareTaskDecorator.java @@ -1,8 +1,9 @@ package com.knecon.fforesight.tenantcommons; -import com.knecon.fforesight.tenantcommons.task.KneconTaskDecorator; import org.springframework.lang.NonNull; +import com.knecon.fforesight.tenantcommons.task.KneconTaskDecorator; + public class TenantAwareTaskDecorator implements KneconTaskDecorator { @Override diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/listener/ITenantEventHandler.java b/src/main/java/com/knecon/fforesight/tenantcommons/listener/ITenantEventHandler.java new file mode 100644 index 0000000..b87fd47 --- /dev/null +++ b/src/main/java/com/knecon/fforesight/tenantcommons/listener/ITenantEventHandler.java @@ -0,0 +1,12 @@ +package com.knecon.fforesight.tenantcommons.listener; + +import com.knecon.fforesight.tenantcommons.model.ITenantEvent; + +public interface ITenantEventHandler { + + void handle(T event); + + + Class getEventClass(); + +} diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/listener/impl/TenantEventQueueCreationHandler.java b/src/main/java/com/knecon/fforesight/tenantcommons/listener/impl/TenantEventQueueCreationHandler.java new file mode 100644 index 0000000..b8b9e89 --- /dev/null +++ b/src/main/java/com/knecon/fforesight/tenantcommons/listener/impl/TenantEventQueueCreationHandler.java @@ -0,0 +1,53 @@ +package com.knecon.fforesight.tenantcommons.listener.impl; + +import java.util.Optional; + +import org.springframework.stereotype.Service; + +import com.knecon.fforesight.tenantcommons.listener.ITenantEventHandler; +import com.knecon.fforesight.tenantcommons.model.TenantCreatedEvent; +import com.knecon.fforesight.tenantcommons.model.TenantQueueProvider; +import com.knecon.fforesight.tenantcommons.queue.RabbitQueueFromExchangeServiceImpl; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Service +@RequiredArgsConstructor +public class TenantEventQueueCreationHandler implements ITenantEventHandler { + + private final RabbitQueueFromExchangeServiceImpl rabbitQueueService; + private final Optional tenantQueueProvider; + + + @PostConstruct + public void init() { + + log.info("TenantEventQueueCreationHandler initialized"); + } + + + @Override + public void handle(TenantCreatedEvent tenantCreatedEvent) { + + tenantQueueProvider.ifPresentOrElse(t -> { + log.info("Creating queues for new tenant {}", tenantCreatedEvent.getTenantId()); + + t.getTenantQueueConfigurations().forEach(tqc -> rabbitQueueService.addNewQueue(tqc.getListenerId(), + tqc.getQueuePrefix(), + tqc.getExchangeName(), + tenantCreatedEvent.getTenantId(), + tqc.getDlqName(), + tqc.getArguments())); + }, () -> log.warn("This service does not provide any queues to create")); + } + + + @Override + public Class getEventClass() { + + return TenantCreatedEvent.class; + } + +} diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/listener/impl/TenantEventQueueDeletionHandler.java b/src/main/java/com/knecon/fforesight/tenantcommons/listener/impl/TenantEventQueueDeletionHandler.java new file mode 100644 index 0000000..efe9906 --- /dev/null +++ b/src/main/java/com/knecon/fforesight/tenantcommons/listener/impl/TenantEventQueueDeletionHandler.java @@ -0,0 +1,53 @@ +package com.knecon.fforesight.tenantcommons.listener.impl; + +import java.util.Optional; + +import org.springframework.stereotype.Service; + +import com.knecon.fforesight.tenantcommons.listener.ITenantEventHandler; +import com.knecon.fforesight.tenantcommons.model.TenantQueueProvider; +import com.knecon.fforesight.tenantcommons.model.TenantResponse; +import com.knecon.fforesight.tenantcommons.queue.RabbitQueueFromExchangeServiceImpl; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Service +@RequiredArgsConstructor +public class TenantEventQueueDeletionHandler implements ITenantEventHandler { + + private final RabbitQueueFromExchangeServiceImpl rabbitQueueService; + private final Optional tenantQueueProvider; + + + @PostConstruct + public void init() { + + log.info("TenantEventQueueDeletionHandler initialized"); + } + + + @Override + public void handle(TenantResponse tenantResponse) { + + tenantQueueProvider.ifPresentOrElse(t -> { + log.info("Deleting queues for tenant {}", tenantResponse.getTenantId()); + + t.getTenantQueueConfigurations().forEach(tqc -> rabbitQueueService.deleteQueue(tqc.getListenerId(), + tqc.getQueuePrefix(), + tqc.getExchangeName(), + tenantResponse.getTenantId(), + tqc.getDlqName(), + tqc.getArguments())); + }, () -> log.warn("This service does not provide any queues to delete")); + } + + + @Override + public Class getEventClass() { + + return TenantResponse.class; + } + +} diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/model/AuthDetails.java b/src/main/java/com/knecon/fforesight/tenantcommons/model/AuthDetails.java index c4a255a..c76fb22 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/model/AuthDetails.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/model/AuthDetails.java @@ -4,7 +4,6 @@ import java.util.HashSet; import java.util.Set; import com.fasterxml.jackson.annotation.JsonAlias; - import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/model/ITenantEvent.java b/src/main/java/com/knecon/fforesight/tenantcommons/model/ITenantEvent.java new file mode 100644 index 0000000..0391a95 --- /dev/null +++ b/src/main/java/com/knecon/fforesight/tenantcommons/model/ITenantEvent.java @@ -0,0 +1,7 @@ +package com.knecon.fforesight.tenantcommons.model; + +public interface ITenantEvent { + + String getTenantId(); + +} diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/model/TenantCreatedEvent.java b/src/main/java/com/knecon/fforesight/tenantcommons/model/TenantCreatedEvent.java index 43260f4..01f53e4 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/model/TenantCreatedEvent.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/model/TenantCreatedEvent.java @@ -7,7 +7,7 @@ import lombok.NoArgsConstructor; @Data @AllArgsConstructor @NoArgsConstructor -public class TenantCreatedEvent { +public class TenantCreatedEvent implements ITenantEvent { private String tenantId; diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/model/TenantQueueProvider.java b/src/main/java/com/knecon/fforesight/tenantcommons/model/TenantQueueProvider.java new file mode 100644 index 0000000..20efe3e --- /dev/null +++ b/src/main/java/com/knecon/fforesight/tenantcommons/model/TenantQueueProvider.java @@ -0,0 +1,14 @@ +package com.knecon.fforesight.tenantcommons.model; + +import java.util.Set; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +@Getter +public class TenantQueueProvider { + + public final Set tenantQueueConfigurations; + +} diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/model/TenantResponse.java b/src/main/java/com/knecon/fforesight/tenantcommons/model/TenantResponse.java index d560fd3..c91db3b 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/model/TenantResponse.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/model/TenantResponse.java @@ -12,7 +12,7 @@ import lombok.NoArgsConstructor; @Builder @AllArgsConstructor @NoArgsConstructor -public class TenantResponse { +public class TenantResponse implements ITenantEvent { private String tenantId; private String displayName; diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/model/TenantSyncEvent.java b/src/main/java/com/knecon/fforesight/tenantcommons/model/TenantSyncEvent.java new file mode 100644 index 0000000..f6d0ced --- /dev/null +++ b/src/main/java/com/knecon/fforesight/tenantcommons/model/TenantSyncEvent.java @@ -0,0 +1,14 @@ +package com.knecon.fforesight.tenantcommons.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class TenantSyncEvent implements ITenantEvent { + + private String tenantId; + +} diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeService.java b/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeService.java index 80b8381..88f8d83 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeService.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeService.java @@ -6,6 +6,7 @@ public interface RabbitQueueFromExchangeService { void addNewQueue(String listenerId, String queueNamePrefix, String exchangeName, String routingKey, String dlq, Map arguments); + void deleteQueue(String listenerId, String queueNamePrefix, String exchangeName, String routingKey, String dlq, Map arguments); diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeServiceImpl.java b/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeServiceImpl.java index 86eeeb8..1698aeb 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeServiceImpl.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeServiceImpl.java @@ -52,6 +52,19 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan } + private static Binding getBinding(String exchangeName, String routingKey, String queueName) { + + return new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null); + } + + + private AbstractMessageListenerContainer getMessageListenerContainerById(String listenerId) { + + log.debug("Getting message listener container by id {}", listenerId); + return ((AbstractMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer(listenerId)); + } + + @Override public void deleteQueue(String listenerId, String queueNamePrefix, String exchangeName, String routingKey, String dlq, Map arguments) { @@ -69,16 +82,10 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan } - private static Binding getBinding(String exchangeName, String routingKey, String queueName) { - - return new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null); - } - - @Override public void addQueueToListener(String listenerId, String queueName) { - log.info("Adding queue " + queueName + " to listener " + listenerId); + log.info("Adding queue {} to listener {}", queueName, listenerId); if (!checkQueueExistOnListener(listenerId, queueName)) { getMessageListenerContainerById(listenerId).addQueueNames(queueName); } @@ -88,7 +95,7 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan @Override public void removeQueueFromListener(String listenerId, String queueName) { - log.info("Removing queue " + queueName + " from listener " + listenerId); + log.info("Removing queue {} from listener {}", queueName, listenerId); if (checkQueueExistOnListener(listenerId, queueName)) { getMessageListenerContainerById(listenerId).removeQueueNames(queueName); @@ -100,23 +107,16 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan @Override public boolean checkQueueExistOnListener(String listenerId, String queueName) { - log.debug("Checking if queue " + queueName + " exists on listener " + listenerId); + log.debug("Checking if queue {} exists on listener {}", queueName, listenerId); String[] queueNames = getMessageListenerContainerById(listenerId).getQueueNames(); for (String name : queueNames) { if (name.equals(queueName)) { - log.debug("Queue with name : " + queueName + " already exists on listener"); + log.debug("Queue with name : {} already exists on listener", queueName); return true; } } - log.debug("Queue with name : " + queueName + " does not exist on listener"); + log.debug("Queue with name : {} does not exist on listener", queueName); return false; } - - private AbstractMessageListenerContainer getMessageListenerContainerById(String listenerId) { - - log.debug("Getting message listener container by id " + listenerId); - return ((AbstractMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer(listenerId)); - } - } diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantEventsLock.java b/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantEventsLock.java new file mode 100644 index 0000000..98dd28f --- /dev/null +++ b/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantEventsLock.java @@ -0,0 +1,23 @@ +package com.knecon.fforesight.tenantcommons.queue; + +import java.util.concurrent.Semaphore; + +import lombok.SneakyThrows; + +public class TenantEventsLock { + + private final static Semaphore semaphore = new Semaphore(1); + + + @SneakyThrows + public static void executeInLock(Runnable runnable) { + + try { + semaphore.acquire(); + runnable.run(); + } finally { + semaphore.release(); + } + } + +} diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantExchangeMessageReceiver.java b/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantExchangeMessageReceiver.java deleted file mode 100644 index faa2381..0000000 --- a/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantExchangeMessageReceiver.java +++ /dev/null @@ -1,94 +0,0 @@ -package com.knecon.fforesight.tenantcommons.queue; - -import java.util.Set; -import java.util.concurrent.Semaphore; - -import com.knecon.fforesight.tenantcommons.TenantProvider; -import com.knecon.fforesight.tenantcommons.model.TenantCreatedEvent; -import com.knecon.fforesight.tenantcommons.model.TenantQueueConfiguration; -import com.knecon.fforesight.tenantcommons.model.TenantResponse; - -import lombok.AccessLevel; -import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; -import lombok.experimental.FieldDefaults; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -@RequiredArgsConstructor -@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) -public abstract class TenantExchangeMessageReceiver { - - RabbitQueueFromExchangeService rabbitQueueService; - TenantProvider tenantProvider; - Semaphore semaphore = new Semaphore(1); - - - protected abstract Set getTenantQueueConfigs(); - - - @SneakyThrows - public void initializeQueues() { - - try { - semaphore.acquire(); - - log.info("Initializing queues for all tenants."); - tenantProvider.getTenants() - .forEach(tenant -> { - getTenantQueueConfigs().parallelStream() - .forEach(tqc -> rabbitQueueService.addNewQueue(tqc.getListenerId(), - tqc.getQueuePrefix(), - tqc.getExchangeName(), - tenant.getTenantId(), - tqc.getDlqName(), - tqc.getArguments())); - log.info("Initialized queues for tenant " + tenant.getTenantId()); - }); - } finally { - semaphore.release(); - } - } - - - @SneakyThrows - public void reactToTenantCreation(TenantCreatedEvent tenantCreatedEvent) { - - try { - semaphore.acquire(); - - String tenantId = tenantCreatedEvent.getTenantId(); - log.info("Creating queues for new tenant " + tenantId); - getTenantQueueConfigs().forEach(tqc -> rabbitQueueService.addNewQueue(tqc.getListenerId(), - tqc.getQueuePrefix(), - tqc.getExchangeName(), - tenantId, - tqc.getDlqName(), - tqc.getArguments())); - } finally { - semaphore.release(); - } - } - - - @SneakyThrows - public void reactToTenantDeletion(TenantResponse tenantResponse) { - - try { - semaphore.acquire(); - - String tenantId = tenantResponse.getTenantId(); - log.info("Removing queues for deleted tenant " + tenantId); - getTenantQueueConfigs().forEach(tqc -> rabbitQueueService.deleteQueue(tqc.getListenerId(), - tqc.getQueuePrefix(), - tqc.getExchangeName(), - tenantId, - tqc.getDlqName(), - tqc.getArguments())); - } finally { - semaphore.release(); - } - - } - -} diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantMessagingConfiguration.java b/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantMessagingConfiguration.java index 0d8cdab..6c7b65b 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantMessagingConfiguration.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantMessagingConfiguration.java @@ -1,29 +1,53 @@ package com.knecon.fforesight.tenantcommons.queue; +import static com.knecon.fforesight.tenantcommons.queue.TenantRabbitListener.TENANT_CREATED_LISTENER; +import static com.knecon.fforesight.tenantcommons.queue.TenantRabbitListener.TENANT_DELETED_LISTENER; +import static com.knecon.fforesight.tenantcommons.queue.TenantRabbitListener.TENANT_SYNC_LISTENER; + import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.core.TopicExchange; +import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.EventListener; -import lombok.NoArgsConstructor; +import lombok.RequiredArgsConstructor; -@NoArgsConstructor -public abstract class TenantMessagingConfiguration { +@Configuration +@RequiredArgsConstructor +public class TenantMessagingConfiguration { - @Value("${POD_NAME:}") - private String podName; - - private static final String TENANT_EVENTS_DLQ_SUFFIX = "_tenant_events_error"; - private static final String TENANT_CREATED_QUEUE_SUFFIX = "_tenant_created"; - private static final String TENANT_DELETED_QUEUE_SUFFIX = "_tenant_deleted"; + public static final String TENANT_CREATED_QUEUE = "tenantCreatedQueue"; + public static final String TENANT_DELETED_QUEUE = "tenantDeletedQueue"; + public static final String TENANT_SYNC_QUEUE = "tenantSyncQueue"; + public static final String TENANT_EXCHANGE = "tenantExchange"; // time in ms after which a deletion will be executed when no consumer is present // see: https://www.rabbitmq.com/docs/ttl#queue-ttl public static final int QUEUE_EXPIRATION_TIME = 300000; // 5 minutes + private static final String TENANT_EVENTS_DLQ_SUFFIX = "_tenant_events_error"; + private static final String TENANT_CREATED_QUEUE_SUFFIX = "_tenant_created"; + private static final String TENANT_DELETED_QUEUE_SUFFIX = "_tenant_deleted"; + private static final String TENANT_SYNC_QUEUE_SUFFIX = "_tenant_sync"; + private final RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry; + @Value("${POD_NAME:}") + private String podName; + + + @EventListener(ApplicationReadyEvent.class) + public void assignQueuesToListeners() { + + rabbitListenerEndpointRegistry.getListenerContainer(TENANT_CREATED_LISTENER).setQueueNames(getTenantCreatedQueueName()); + rabbitListenerEndpointRegistry.getListenerContainer(TENANT_DELETED_LISTENER).setQueueNames(getTenantDeletedQueueName()); + rabbitListenerEndpointRegistry.getListenerContainer(TENANT_SYNC_LISTENER).setQueueNames(getTenantSyncQueueName()); + + } public String getTenantCreatedQueueName() { @@ -38,6 +62,48 @@ public abstract class TenantMessagingConfiguration { } + public String getTenantSyncQueueName() { + + return this.getQueueNameWithSuffix(TENANT_SYNC_QUEUE_SUFFIX); + } + + + @Bean(name = TENANT_EXCHANGE) + public TopicExchange tenantExchange(@Value("${fforesight.tenant-exchange.name:tenants-exchange}") String tenantExchangeName) { + + return new TopicExchange(tenantExchangeName); + } + + + @Bean(TENANT_CREATED_QUEUE) + public Queue tenantCreatedQueue() { + + return QueueBuilder.durable(getTenantCreatedQueueName()) + .withArgument("x-dead-letter-exchange", "") + .withArgument("x-dead-letter-routing-key", getTenantEventsDLQName()) + .withArgument("x-expires", QUEUE_EXPIRATION_TIME) + .build(); + } + + + @Bean(TENANT_DELETED_QUEUE) + public Queue tenantDeletedQueue() { + + return QueueBuilder.durable(getTenantDeletedQueueName()) + .withArgument("x-dead-letter-exchange", "") + .withArgument("x-dead-letter-routing-key", getTenantEventsDLQName()) + .withArgument("x-expires", QUEUE_EXPIRATION_TIME) + .build(); + } + + + @Bean + public Queue tenantEventsDLQ() { + + return QueueBuilder.durable(getTenantEventsDLQName()).withArgument("x-expires", QUEUE_EXPIRATION_TIME).build(); + } + + public String getTenantEventsDLQName() { return getQueueNameWithSuffix(TENANT_EVENTS_DLQ_SUFFIX); @@ -68,53 +134,36 @@ public abstract class TenantMessagingConfiguration { } - @Bean(name = "tenantExchange") - public TopicExchange tenantExchange(@Value("${fforesight.tenant-exchange.name:tenants-exchange}") String tenantExchangeName) { - - return new TopicExchange(tenantExchangeName); - } - - - @Bean("tenantCreatedQueue") - public Queue tenantCreatedQueue() { - - return QueueBuilder.durable(getTenantCreatedQueueName()) - .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", getTenantEventsDLQName()) - .withArgument("x-expires", QUEUE_EXPIRATION_TIME) - .build(); - } - - - @Bean("tenantDeletedQueue") - public Queue tenantDeletedQueue() { - - return QueueBuilder.durable(getTenantDeletedQueueName()) - .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", getTenantEventsDLQName()) - .withArgument("x-expires", QUEUE_EXPIRATION_TIME) - .build(); - } - - @Bean - public Queue tenantEventsDLQ() { - - return QueueBuilder.durable(getTenantEventsDLQName()).withArgument("x-expires", QUEUE_EXPIRATION_TIME).build(); - } - - - @Bean - public Binding tenantCreatedBinding(@Qualifier("tenantCreatedQueue") Queue tenantCreatedQueue, @Qualifier("tenantExchange") TopicExchange tenantExchange) { + public Binding tenantCreatedBinding(@Qualifier(TENANT_CREATED_QUEUE) Queue tenantCreatedQueue, @Qualifier(TENANT_EXCHANGE) TopicExchange tenantExchange) { return BindingBuilder.bind(tenantCreatedQueue).to(tenantExchange).with("tenant.created"); + } @Bean - public Binding tenantDeletedBinding(@Qualifier("tenantDeletedQueue") Queue tenantDeletedQueue, @Qualifier("tenantExchange") TopicExchange tenantExchange) { + public Binding tenantDeletedBinding(@Qualifier(TENANT_DELETED_QUEUE) Queue tenantDeletedQueue, @Qualifier(TENANT_EXCHANGE) TopicExchange tenantExchange) { return BindingBuilder.bind(tenantDeletedQueue).to(tenantExchange).with("tenant.delete"); } + + @Bean({TENANT_SYNC_QUEUE}) + public Queue tenantSyncQueue() { + + return QueueBuilder.durable(this.getTenantSyncQueueName()) + .withArgument("x-dead-letter-exchange", "") + .withArgument("x-dead-letter-routing-key", this.getTenantEventsDLQName()) + .withArgument("x-expires", 300000) + .build(); + } + + + @Bean + public Binding tenantSyncBinding(@Qualifier(TENANT_SYNC_QUEUE) Queue tenantCreatedQueue, @Qualifier(TENANT_EXCHANGE) TopicExchange tenantExchange) { + + return BindingBuilder.bind(tenantCreatedQueue).to(tenantExchange).with("tenant.sync"); + } + } diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantQueueInitializer.java b/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantQueueInitializer.java new file mode 100644 index 0000000..da797e5 --- /dev/null +++ b/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantQueueInitializer.java @@ -0,0 +1,54 @@ +package com.knecon.fforesight.tenantcommons.queue; + +import java.util.Optional; + +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Service; + +import com.knecon.fforesight.tenantcommons.TenantProvider; +import com.knecon.fforesight.tenantcommons.model.TenantQueueProvider; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@RequiredArgsConstructor +@Service +public class TenantQueueInitializer { + + private final Optional tenantQueueProvider; + private final TenantProvider tenantProvider; + private final RabbitQueueFromExchangeService rabbitQueueService; + + + @EventListener(ApplicationReadyEvent.class) + public void onApplicationReady() { + + initializeQueues(); + } + + + @SneakyThrows + public void initializeQueues() { + + tenantQueueProvider.ifPresent(tenantQueueProvider -> { + log.info("Initializing queues for all tenants."); + TenantEventsLock.executeInLock(() -> { + tenantProvider.getTenants() + .forEach(tenant -> { + tenantQueueProvider.getTenantQueueConfigurations().parallelStream() + .forEach(tqc -> rabbitQueueService.addNewQueue(tqc.getListenerId(), + tqc.getQueuePrefix(), + tqc.getExchangeName(), + tenant.getTenantId(), + tqc.getDlqName(), + tqc.getArguments())); + log.info("Initialized queues for tenant {}", tenant.getTenantId()); + }); + }); + }); + + } + +} diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantRabbitListener.java b/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantRabbitListener.java new file mode 100644 index 0000000..34bd745 --- /dev/null +++ b/src/main/java/com/knecon/fforesight/tenantcommons/queue/TenantRabbitListener.java @@ -0,0 +1,83 @@ +package com.knecon.fforesight.tenantcommons.queue; + +import java.util.ArrayList; +import java.util.List; + +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Service; + +import com.knecon.fforesight.tenantcommons.listener.ITenantEventHandler; +import com.knecon.fforesight.tenantcommons.model.ITenantEvent; +import com.knecon.fforesight.tenantcommons.model.TenantCreatedEvent; +import com.knecon.fforesight.tenantcommons.model.TenantResponse; +import com.knecon.fforesight.tenantcommons.model.TenantSyncEvent; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Service +@RequiredArgsConstructor +public class TenantRabbitListener { + + public static final String TENANT_CREATED_LISTENER = "tenantCreatedListener"; + public static final String TENANT_DELETED_LISTENER = "tenantDeletedListener"; + public static final String TENANT_SYNC_LISTENER = "tenantSyncListener"; + private final List eventListeners; + + + @SneakyThrows + @RabbitHandler + @RabbitListener(id = TENANT_CREATED_LISTENER) + public void listenOnTenantCreated(TenantCreatedEvent tenantCreatedEvent) { + + this.processTenantEvent(tenantCreatedEvent); + + } + + + @SneakyThrows + private void processTenantEvent(T event) { + + TenantEventsLock.executeInLock(() -> { + List exceptions = new ArrayList<>(); + eventListeners.stream() + .filter(listener -> listener.getEventClass().equals(event.getClass())) + .forEach(listener -> { + log.info("Processing listener {} for {} event {}", listener.getClass().getSimpleName(), event.getClass().getSimpleName(), event.getTenantId()); + try { + listener.handle(event); + } catch (Exception e) { + log.error("Error handling event {} at listener: {}", event.getTenantId(), listener.getEventClass().getSimpleName(), e); + exceptions.add(e); + } + }); + if (!exceptions.isEmpty()) { + log.error("Some listeners ({}) failed to handle tenant created event {}", exceptions.size(), event.getTenantId()); + // TODO determine if we should do something if some listeners failed ... + } + }); + } + + + @SneakyThrows + @RabbitHandler + @RabbitListener(id = TENANT_DELETED_LISTENER) + public void listenOnTenantDeleted(TenantResponse tenantResponse) { + + this.processTenantEvent(tenantResponse); + + } + + + @SneakyThrows + @RabbitHandler + @RabbitListener(id = TENANT_SYNC_LISTENER) + public void listenOnTenantSync(TenantSyncEvent tenantSyncEvent) { + + this.processTenantEvent(tenantSyncEvent); + + } + +} diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/task/ApplicationTaskExecutorBeanConfig.java b/src/main/java/com/knecon/fforesight/tenantcommons/task/ApplicationTaskExecutorBeanConfig.java index b11536c..8a27e49 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/task/ApplicationTaskExecutorBeanConfig.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/task/ApplicationTaskExecutorBeanConfig.java @@ -19,7 +19,8 @@ public class ApplicationTaskExecutorBeanConfig { @Bean(name = {APPLICATION_TASK_EXECUTOR_BEAN_NAME, DEFAULT_TASK_EXECUTOR_BEAN_NAME}) @ConditionalOnMissingBean(name = {APPLICATION_TASK_EXECUTOR_BEAN_NAME, DEFAULT_TASK_EXECUTOR_BEAN_NAME}) public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { + return builder.build(); } -} \ No newline at end of file +} diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/task/KneconTaskDecorator.java b/src/main/java/com/knecon/fforesight/tenantcommons/task/KneconTaskDecorator.java index f47d0c3..0167afb 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/task/KneconTaskDecorator.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/task/KneconTaskDecorator.java @@ -4,4 +4,5 @@ import org.springframework.core.task.TaskDecorator; // Marker interface public interface KneconTaskDecorator extends TaskDecorator { + } diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/task/TaskDecoratorConfiguration.java b/src/main/java/com/knecon/fforesight/tenantcommons/task/TaskDecoratorConfiguration.java index b95a31e..cde7f92 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/task/TaskDecoratorConfiguration.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/task/TaskDecoratorConfiguration.java @@ -1,10 +1,11 @@ package com.knecon.fforesight.tenantcommons.task; -import com.knecon.fforesight.tenantcommons.TenantAwareTaskDecorator; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import com.knecon.fforesight.tenantcommons.TenantAwareTaskDecorator; + @Configuration(proxyBeanMethods = false) public class TaskDecoratorConfiguration { @@ -14,4 +15,5 @@ public class TaskDecoratorConfiguration { return new TenantAwareTaskDecorator(); } + } diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/task/TaskExecutionConfiguration.java b/src/main/java/com/knecon/fforesight/tenantcommons/task/TaskExecutionConfiguration.java index 908b6ba..d9e2238 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/task/TaskExecutionConfiguration.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/task/TaskExecutionConfiguration.java @@ -26,8 +26,10 @@ public class TaskExecutionConfiguration { @Value("${spring.task.execution.pool.warnPercentageThreshold:80}") private int executorWarnPercentageThreshold; + @Bean public TaskExecutorCustomizer taskExecutorCustomizer(ObjectProvider taskDecorator) { + return taskExecutor -> { taskExecutor.setTaskDecorator((KneconTaskDecorator) runnable -> { val taskDecoratorUsed = taskDecorator.getIfUnique(IDENTITY_KNECON_TASK_DECORATOR_SUPPLIER); @@ -37,11 +39,12 @@ public class TaskExecutionConfiguration { if (queueOccupancyPercentage >= executorWarnPercentageThreshold) { log.warn("Executor pool [ " + taskExecutor + " ] queue size reached " + actualQueueSize + "/" + taskExecutor.getMaxPoolSize() + - " entries awaiting execution triggering the warn level set for " + executorWarnPercentageThreshold +"% occupancy."); + " entries awaiting execution triggering the warn level set for " + executorWarnPercentageThreshold + "% occupancy."); } return decoratedRunnable; }); }; } + } diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/utils/MongoConnectionStringHelper.java b/src/main/java/com/knecon/fforesight/tenantcommons/utils/MongoConnectionStringHelper.java index a45d40a..c67226c 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/utils/MongoConnectionStringHelper.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/utils/MongoConnectionStringHelper.java @@ -1,7 +1,6 @@ package com.knecon.fforesight.tenantcommons.utils; import com.knecon.fforesight.tenantcommons.model.MongoDBConnection; - import lombok.experimental.UtilityClass; @UtilityClass diff --git a/src/test/java/com/knecon/fforesight/tenantcommons/BasicIntegrationTest.java b/src/test/java/com/knecon/fforesight/tenantcommons/BasicIntegrationTest.java new file mode 100644 index 0000000..40b79ea --- /dev/null +++ b/src/test/java/com/knecon/fforesight/tenantcommons/BasicIntegrationTest.java @@ -0,0 +1,81 @@ +package com.knecon.fforesight.tenantcommons; + +import java.util.Collections; + +import org.springframework.amqp.core.DirectExchange; +import org.springframework.boot.autoconfigure.ImportAutoConfiguration; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.util.TestPropertyValues; +import org.springframework.context.ApplicationContextInitializer; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ContextConfiguration; + +import com.knecon.fforesight.tenantcommons.model.TenantQueueConfiguration; +import com.knecon.fforesight.tenantcommons.model.TenantQueueProvider; +import com.knecon.fforesight.tenantcommons.utils.TestMessageService; +import lombok.extern.slf4j.Slf4j; +import org.testcontainers.containers.RabbitMQContainer; + +@Slf4j +@ContextConfiguration(initializers = {BasicIntegrationTest.Initializer.class}) +@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD) +@ImportAutoConfiguration({MultiTenancyAutoConfiguration.class}) +@SuppressWarnings("PMD.TestClassWithoutTestCases") +@SpringBootTest +public abstract class BasicIntegrationTest { + + static class Initializer implements ApplicationContextInitializer { + + @SuppressWarnings("PMD") + public void initialize(ConfigurableApplicationContext configurableApplicationContext) { + + RabbitMQContainer rabbitContainer = new RabbitMQContainer("rabbitmq:3.12"); + + rabbitContainer.start(); + log.info("Rabbit container started and available at {}", rabbitContainer.getHttpUrl()); + + TestPropertyValues.of("RABBITMQ_USERNAME=" + rabbitContainer.getAdminUsername(), + "RABBITMQ_PASSWORD=" + rabbitContainer.getAdminPassword(), + "RABBITMQ_HOST=" + rabbitContainer.getHost(), + "RABBITMQ_PORT=" + rabbitContainer.getAmqpPort(), + "POD_NAME=tenant-commons" + ).applyTo(configurableApplicationContext.getEnvironment()); + } + + } + + @SpringBootApplication + static class TestConfiguration { + + @Bean + public DirectExchange testExchange() { + + return new DirectExchange("testExchange"); + } + + + @Bean + public TenantQueueProvider tenantQueueProvider() { + + var testQueue = TenantQueueConfiguration.builder() + .listenerId("testListener") + .exchangeName("testExchange") + .queuePrefix("test") + .dlqName("test_error") + .build(); + return new TenantQueueProvider(Collections.singleton(testQueue)); + } + + + @Bean + public TestMessageService testMessageService() { + + return new TestMessageService(); + } + + } + +} diff --git a/src/test/java/com/knecon/fforesight/tenantcommons/TenantQueuesIntegrationTest.java b/src/test/java/com/knecon/fforesight/tenantcommons/TenantQueuesIntegrationTest.java new file mode 100644 index 0000000..5241859 --- /dev/null +++ b/src/test/java/com/knecon/fforesight/tenantcommons/TenantQueuesIntegrationTest.java @@ -0,0 +1,79 @@ +package com.knecon.fforesight.tenantcommons; + +import static com.knecon.fforesight.tenantcommons.queue.TenantMessagingConfiguration.TENANT_EXCHANGE; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.junit.jupiter.api.Assertions.fail; + +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; + +import com.knecon.fforesight.tenantcommons.model.TenantCreatedEvent; +import com.knecon.fforesight.tenantcommons.utils.TestMessage; +import com.knecon.fforesight.tenantcommons.utils.TestMessageService; +import com.knecon.fforesight.tenantcommons.utils.TestTenantProvider; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; + +public class TenantQueuesIntegrationTest extends BasicIntegrationTest { + + @Autowired + private RabbitTemplate rabbitTemplate; + + @Autowired + private TestTenantProvider tenantProvider; + + @Autowired + private TestMessageService testMessageService; + + @Autowired + private RabbitAdmin rabbitAdmin; + + @Value("${fforesight.tenant-exchange.name:tenants-exchange}") + String tenantExchangeName; + + + @Test + @SneakyThrows + public void testTenantCreatedQueue() { + + TenantContext.setTenantId("tenant1"); + // create a new tenant + rabbitTemplate.convertAndSend(tenantExchangeName, "tenant.created", new TenantCreatedEvent("tenant1")); + // wait for message to send and be processed + int iterations = 0; + do { + iterations++; + Thread.sleep(1000); + if (iterations > 10) { + fail("Tenant not created within 10 seconds"); + } + if (!tenantProvider.getTenants().isEmpty()) { + assertThat(tenantProvider.getTenant("tenant1")).isNotNull(); + break; + } + } while (true); + + + rabbitTemplate.convertAndSend("testExchange", "tenant1", new TestMessage("Hello Tenant Commons!")); + + iterations = 0; + do { + iterations++; + Thread.sleep(1000); + if (iterations > 10) { + fail("Message not sent within 10 seconds"); + } + if (!testMessageService.getMessages().isEmpty()) { + + break; + } + } while (true); + + assertThat(testMessageService.getMessages().size()).isEqualTo(1); + assertThat(testMessageService.getMessages().get(0).getMessage()).isEqualTo("Hello Tenant Commons!"); + } + +} diff --git a/src/test/java/com/knecon/fforesight/tenantcommons/utils/TestMessage.java b/src/test/java/com/knecon/fforesight/tenantcommons/utils/TestMessage.java new file mode 100644 index 0000000..d658e95 --- /dev/null +++ b/src/test/java/com/knecon/fforesight/tenantcommons/utils/TestMessage.java @@ -0,0 +1,13 @@ +package com.knecon.fforesight.tenantcommons.utils; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class TestMessage { + + private String message; +} diff --git a/src/test/java/com/knecon/fforesight/tenantcommons/utils/TestMessageService.java b/src/test/java/com/knecon/fforesight/tenantcommons/utils/TestMessageService.java new file mode 100644 index 0000000..14ad357 --- /dev/null +++ b/src/test/java/com/knecon/fforesight/tenantcommons/utils/TestMessageService.java @@ -0,0 +1,44 @@ +package com.knecon.fforesight.tenantcommons.utils; + +import java.util.ArrayList; +import java.util.List; + +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; + +import jakarta.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class TestMessageService { + + public List messages = new ArrayList<>(); + + + @PostConstruct + protected void init() { + + log.info("TestMessageService initialized"); + } + + @RabbitHandler + @RabbitListener(id = "testListener") + public void handleTestMessage(TestMessage message) { + + log.info("Received message: {}", message); + addMessage(message); + } + + + public void addMessage(TestMessage message) { + + messages.add(message); + } + + + public List getMessages() { + + return messages; + } + +} diff --git a/src/test/java/com/knecon/fforesight/tenantcommons/utils/TestTenantProvider.java b/src/test/java/com/knecon/fforesight/tenantcommons/utils/TestTenantProvider.java new file mode 100644 index 0000000..3baa687 --- /dev/null +++ b/src/test/java/com/knecon/fforesight/tenantcommons/utils/TestTenantProvider.java @@ -0,0 +1,56 @@ +package com.knecon.fforesight.tenantcommons.utils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.springframework.stereotype.Service; + +import com.knecon.fforesight.tenantcommons.TenantProvider; +import com.knecon.fforesight.tenantcommons.listener.ITenantEventHandler; +import com.knecon.fforesight.tenantcommons.model.TenantCreatedEvent; +import com.knecon.fforesight.tenantcommons.model.TenantResponse; +import com.knecon.fforesight.tenantcommons.model.UpdateDetailsRequest; + +@Service +public class TestTenantProvider implements TenantProvider, ITenantEventHandler { + + private List tenants = new ArrayList<>(); + + + @Override + public void updateDetails(String s, UpdateDetailsRequest updateDetailsRequest) { + //noop + } + + + @Override + public TenantResponse getTenant(String s) { + + return tenants.stream().filter(t -> t.getTenantId().equals(s)).findFirst().orElse(null); + } + + + @Override + public List getTenants() { + + return tenants; + } + + + @Override + public void handle(TenantCreatedEvent event) { + + tenants.add(TenantResponse.builder() + .tenantId(event.getTenantId()) + .build()); + } + + + @Override + public Class getEventClass() { + + return TenantCreatedEvent.class; + } + +} diff --git a/src/test/resources/application.yaml b/src/test/resources/application.yaml new file mode 100644 index 0000000..a2217dc --- /dev/null +++ b/src/test/resources/application.yaml @@ -0,0 +1,19 @@ +spring: + application: + name: tenant-commons-tester + rabbitmq: + host: ${RABBITMQ_HOST:localhost} + port: ${RABBITMQ_PORT:5672} + username: ${RABBITMQ_USERNAME:user} + password: ${RABBITMQ_PASSWORD:rabbitmq} + listener: + simple: + acknowledge-mode: AUTO + concurrency: 5 + retry: + enabled: true + max-attempts: 3 + max-interval: 15000 + prefetch: 1 + +logging.type: ${LOGGING_TYPE:CONSOLE} diff --git a/src/test/resources/logback-spring.xml b/src/test/resources/logback-spring.xml new file mode 100644 index 0000000..1d0677d --- /dev/null +++ b/src/test/resources/logback-spring.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + diff --git a/src/test/resources/testcontainers.properties b/src/test/resources/testcontainers.properties new file mode 100644 index 0000000..ead058a --- /dev/null +++ b/src/test/resources/testcontainers.properties @@ -0,0 +1 @@ +hub.image.name.prefix=docker-dev.knecon.com/tests/