From 060f4c45690204ac1bddec30e8a50b8490237de8 Mon Sep 17 00:00:00 2001 From: deiflaender Date: Tue, 27 Sep 2022 12:45:58 +0200 Subject: [PATCH] RED-5151: Reject redelivered messages --- .../migration/MigrationStarterService.java | 8 ++----- .../server/queue/IndexingMessageReceiver.java | 21 +++++++++++++++++-- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/migration/MigrationStarterService.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/migration/MigrationStarterService.java index 440c4a0..20bf125 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/migration/MigrationStarterService.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/migration/MigrationStarterService.java @@ -6,8 +6,6 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.iqser.red.service.search.v1.model.IndexMessage; import com.iqser.red.service.search.v1.model.IndexMessageType; import com.iqser.red.service.search.v1.server.queue.IndexingMessageReceiver; @@ -25,21 +23,19 @@ public class MigrationStarterService { private final ApplicationContext ctx; private final IndexInformationService indexInformationService; private final IndexingMessageReceiver indexingMessageReceiver; - private final ObjectMapper objectMapper; private final SearchServiceSettings settings; @EventListener(ApplicationReadyEvent.class) - public void migrate() throws JsonProcessingException { + public void migrate() { // This can only run in post upgrade hook, because otherwise the old service is still runnnig. if (settings.isMigrateOnly()) { if (indexInformationService.hasIndexChanged()) { log.info("Index has changed and will be closed, dropped, recreated and all files will be indexed"); - String indexMessage = objectMapper.writeValueAsString(IndexMessage.builder() + indexingMessageReceiver.receiveIndexingRequest(IndexMessage.builder() .messageType(IndexMessageType.DROP) .build()); - indexingMessageReceiver.receiveIndexingRequest(indexMessage); } System.exit(SpringApplication.exit(ctx, () -> 0)); } diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/queue/IndexingMessageReceiver.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/queue/IndexingMessageReceiver.java index d0ac321..65cd49c 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/queue/IndexingMessageReceiver.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/queue/IndexingMessageReceiver.java @@ -7,6 +7,8 @@ import static com.iqser.red.service.search.v1.server.queue.MessagingConfiguratio import java.util.List; +import org.springframework.amqp.AmqpRejectAndDontRequeueException; +import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; @@ -32,6 +34,7 @@ import com.iqser.red.service.search.v1.server.service.TextStorageService; import io.micrometer.core.annotation.Timed; import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -53,11 +56,25 @@ public class IndexingMessageReceiver { private final IndexInformationService indexInformationService; + @SneakyThrows @RabbitHandler @RabbitListener(queues = INDEXING_QUEUE) - public void receiveIndexingRequest(String in) throws JsonProcessingException { + public void receiveIndexingRequest(Message message){ + + var indexRequest = objectMapper.readValue(message.getBody(), IndexMessage.class); + + // This prevents from endless retries oom errors. + if(message.getMessageProperties().isRedelivered()){ + throw new AmqpRejectAndDontRequeueException(String.format("Error during last processing of request with dossierId: %s and fileId: %s, do not retry.", indexRequest.getDossierId(), indexRequest.getFileId())); + } + + receiveIndexingRequest(indexRequest); + } + + + + public void receiveIndexingRequest(IndexMessage indexRequest) { - var indexRequest = objectMapper.readValue(in, IndexMessage.class); log.info("Processing indexing request: {}", indexRequest); switch (indexRequest.getMessageType()) {