Merge branch 'clean-extendible-messaging-config' into 'main'

Removed the BeanPostProcessor raw auto-config in favor of a more extendible approach

See merge request fforesight/tenant-commons!6
This commit is contained in:
Timo Bejan 2023-11-27 21:52:16 +01:00
commit e1221cc05d
2 changed files with 59 additions and 32 deletions

View File

@ -2,14 +2,20 @@ package com.knecon.fforesight.tenantcommons;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.RabbitTemplateCustomizer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import static com.knecon.fforesight.tenantcommons.ForwardTenantInterceptor.TENANT_HEADER_NAME;
@ -17,47 +23,59 @@ import static com.knecon.fforesight.tenantcommons.ForwardTenantInterceptor.TENAN
@ConditionalOnClass(RabbitTemplate.class)
public class MultiTenancyMessagingConfiguration {
/**
* Spring boot-autoconfigure 3.14 only allows for one {@link ContainerCustomizer<SimpleMessageListenerContainer>}
* to be defined. If 2 or more are defined instead, nothing ends up customizing the {@link SimpleRabbitListenerContainerFactory}
* (for more details, see RabbitAnnotationDrivenConfiguration::simpleRabbitListenerContainerFactory).
* To bypass this limitation, we define and inject our own {@link SimpleMessageListenerContainerCustomizer} which allows
* us to define as many container customization points as needed.
*/
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
@Primary
public ContainerCustomizer<SimpleMessageListenerContainer> simpleMessageListenerContainerCustomizer(
ObjectProvider<SimpleMessageListenerContainerCustomizer> customizers) {
return container -> customizers.orderedStream().forEach((customizer) -> customizer.customize(container));
}
@Bean
public MessageConverter producerJackson2MessageConverter() {
ObjectMapper mapper = new ObjectMapper().findAndRegisterModules();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return new Jackson2JsonMessageConverter(mapper);
}
@Bean
public RabbitTemplateCustomizer rabbitTemplatePublishTenantIdHeaderCustomizer(
@Qualifier("tenantIdSetterPostProcessor") MessagePostProcessor messagePostProcessor) {
return template -> template.addBeforePublishPostProcessors(messagePostProcessor);
}
@Bean
public static BeanPostProcessor multitenancyBeanPostProcessor() {
public SimpleMessageListenerContainerCustomizer rabbitInterceptTenantIdHeaderCustomizer(
@Qualifier("tenantIdGetterPostProcessor") MessagePostProcessor messagePostProcessor) {
return container -> container.addAfterReceivePostProcessors(messagePostProcessor);
}
return new BeanPostProcessor() {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof RabbitTemplate) {
((RabbitTemplate) bean).setBeforePublishPostProcessors(m -> {
m.getMessageProperties().setHeader(TENANT_HEADER_NAME, TenantContext.getTenantId());
return m;
});
} else if (bean instanceof AbstractRabbitListenerContainerFactory) {
((AbstractRabbitListenerContainerFactory<?>) bean).setAfterReceivePostProcessors(m -> {
String tenant = m.getMessageProperties().getHeader(TENANT_HEADER_NAME);
if (tenant != null) {
TenantContext.setTenantId(tenant);
} else {
throw new RuntimeException("No Tenant is set queue message");
}
return m;
});
}
return bean;
}
@Bean
public MessagePostProcessor tenantIdSetterPostProcessor() {
return message -> {
message.getMessageProperties().setHeader(TENANT_HEADER_NAME, TenantContext.getTenantId());
return message;
};
}
@Bean
public MessagePostProcessor tenantIdGetterPostProcessor() {
return message -> {
String tenant = message.getMessageProperties().getHeader(TENANT_HEADER_NAME);
if (tenant != null) {
TenantContext.setTenantId(tenant);
} else {
throw new RuntimeException("No Tenant is set queue message");
}
return message;
};
}
}

View File

@ -0,0 +1,9 @@
package com.knecon.fforesight.tenantcommons;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
@FunctionalInterface
public interface SimpleMessageListenerContainerCustomizer {
void customize(SimpleMessageListenerContainer simpleMessageListenerContainer);
}