From 7985dc73706175bab75a68725a1527fa8682c14e Mon Sep 17 00:00:00 2001 From: Maverick Studer Date: Tue, 3 Sep 2024 12:08:11 +0200 Subject: [PATCH] Listener Logging and DLQ fix --- .../tenantcommons/RabbitConfiguration.java | 34 +++++++++++++++++++ .../RabbitQueueFromExchangeServiceImpl.java | 27 +++++++++------ 2 files changed, 50 insertions(+), 11 deletions(-) create mode 100644 src/main/java/com/knecon/fforesight/tenantcommons/RabbitConfiguration.java diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/RabbitConfiguration.java b/src/main/java/com/knecon/fforesight/tenantcommons/RabbitConfiguration.java new file mode 100644 index 0000000..d7edff4 --- /dev/null +++ b/src/main/java/com/knecon/fforesight/tenantcommons/RabbitConfiguration.java @@ -0,0 +1,34 @@ +package com.knecon.fforesight.tenantcommons; + +import org.jetbrains.annotations.NotNull; +import org.springframework.amqp.AmqpRejectAndDontRequeueException; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import lombok.extern.slf4j.Slf4j; + +@Configuration +@Slf4j +public class RabbitConfiguration { + + @Bean + public BeanPostProcessor rabbitListenerContainerFactoryPostProcessor() { + + return new BeanPostProcessor() { + @Override + public Object postProcessAfterInitialization(@NotNull Object bean, @NotNull String beanName) throws BeansException { + + if (bean instanceof SimpleRabbitListenerContainerFactory factory) { + factory.setErrorHandler(t -> { + log.error("Error occurred in Rabbit listener: ", t); + }); + } + return bean; + } + }; + } + +} diff --git a/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeServiceImpl.java b/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeServiceImpl.java index 3f116a3..86eeeb8 100644 --- a/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeServiceImpl.java +++ b/src/main/java/com/knecon/fforesight/tenantcommons/queue/RabbitQueueFromExchangeServiceImpl.java @@ -31,7 +31,8 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan String queueName = queueNamePrefix + "_" + routingKey; Queue queue = new Queue(queueName, true, false, false); - Binding binding = getBinding(exchangeName, routingKey, dlq, queueName, arguments); + addArgumentsToQueue(queue, dlq, arguments); + Binding binding = getBinding(exchangeName, routingKey, queueName); String returnedQueueName = rabbitAdmin.declareQueue(queue); log.debug("Declared queue {}", returnedQueueName); @@ -41,15 +42,25 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan } + private void addArgumentsToQueue(Queue queue, String dlq, Map arguments) { + + if (dlq != null && !dlq.isBlank()) { + queue.addArgument("x-dead-letter-exchange", ""); + queue.addArgument("x-dead-letter-routing-key", dlq); + arguments.forEach(queue::addArgument); + } + } + + @Override public void deleteQueue(String listenerId, String queueNamePrefix, String exchangeName, String routingKey, String dlq, Map arguments) { String queueName = queueNamePrefix + "_" + routingKey; removeQueueFromListener(listenerId, queueName); - Binding binding = getBinding(exchangeName, routingKey, dlq, queueName, arguments); + Binding binding = getBinding(exchangeName, routingKey, queueName); rabbitAdmin.removeBinding(binding); log.debug("Removed binding"); - if(rabbitAdmin.deleteQueue(queueName)) { + if (rabbitAdmin.deleteQueue(queueName)) { log.info("Deleted queue {} successfully", queueName); } else { log.info("Queue deletion failed"); @@ -58,15 +69,9 @@ public class RabbitQueueFromExchangeServiceImpl implements RabbitQueueFromExchan } - private static Binding getBinding(String exchangeName, String routingKey, String dlq, String queueName, Map arguments) { + private static Binding getBinding(String exchangeName, String routingKey, String queueName) { - Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null); - if (dlq != null && !dlq.isBlank()) { - binding.addArgument("x-dead-letter-exchange", ""); - binding.addArgument("x-dead-letter-routing-key", dlq); - arguments.forEach(binding::addArgument); - } - return binding; + return new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null); }