Merge branch 'log-rabbit-listener-errors' into 'main'
Listener Logging and DLQ fix See merge request fforesight/tenant-commons!19
This commit is contained in:
commit
f1b0bf453a
@ -0,0 +1,34 @@
|
||||
package com.knecon.fforesight.tenantcommons;
|
||||
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
|
||||
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.config.BeanPostProcessor;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Configuration
|
||||
@Slf4j
|
||||
public class RabbitConfiguration {
|
||||
|
||||
@Bean
|
||||
public BeanPostProcessor rabbitListenerContainerFactoryPostProcessor() {
|
||||
|
||||
return new BeanPostProcessor() {
|
||||
@Override
|
||||
public Object postProcessAfterInitialization(@NotNull Object bean, @NotNull String beanName) throws BeansException {
|
||||
|
||||
if (bean instanceof SimpleRabbitListenerContainerFactory factory) {
|
||||
factory.setErrorHandler(t -> {
|
||||
log.error("Error occurred in Rabbit listener: ", t);
|
||||
});
|
||||
}
|
||||
return bean;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
@ -31,7 +31,8 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan
|
||||
|
||||
String queueName = queueNamePrefix + "_" + routingKey;
|
||||
Queue queue = new Queue(queueName, true, false, false);
|
||||
Binding binding = getBinding(exchangeName, routingKey, dlq, queueName, arguments);
|
||||
addArgumentsToQueue(queue, dlq, arguments);
|
||||
Binding binding = getBinding(exchangeName, routingKey, queueName);
|
||||
|
||||
String returnedQueueName = rabbitAdmin.declareQueue(queue);
|
||||
log.debug("Declared queue {}", returnedQueueName);
|
||||
@ -41,12 +42,22 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan
|
||||
}
|
||||
|
||||
|
||||
private void addArgumentsToQueue(Queue queue, String dlq, Map<String, Object> arguments) {
|
||||
|
||||
if (dlq != null && !dlq.isBlank()) {
|
||||
queue.addArgument("x-dead-letter-exchange", "");
|
||||
queue.addArgument("x-dead-letter-routing-key", dlq);
|
||||
arguments.forEach(queue::addArgument);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void deleteQueue(String listenerId, String queueNamePrefix, String exchangeName, String routingKey, String dlq, Map<String, Object> arguments) {
|
||||
|
||||
String queueName = queueNamePrefix + "_" + routingKey;
|
||||
removeQueueFromListener(listenerId, queueName);
|
||||
Binding binding = getBinding(exchangeName, routingKey, dlq, queueName, arguments);
|
||||
Binding binding = getBinding(exchangeName, routingKey, queueName);
|
||||
rabbitAdmin.removeBinding(binding);
|
||||
log.debug("Removed binding");
|
||||
if (rabbitAdmin.deleteQueue(queueName)) {
|
||||
@ -58,15 +69,9 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan
|
||||
}
|
||||
|
||||
|
||||
private static Binding getBinding(String exchangeName, String routingKey, String dlq, String queueName, Map<String, Object> arguments) {
|
||||
private static Binding getBinding(String exchangeName, String routingKey, String queueName) {
|
||||
|
||||
Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);
|
||||
if (dlq != null && !dlq.isBlank()) {
|
||||
binding.addArgument("x-dead-letter-exchange", "");
|
||||
binding.addArgument("x-dead-letter-routing-key", dlq);
|
||||
arguments.forEach(binding::addArgument);
|
||||
}
|
||||
return binding;
|
||||
return new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);
|
||||
}
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user