From 0e71613ffcce147b2ae659e47ad6ab1c6405f0e7 Mon Sep 17 00:00:00 2001 From: deiflaender Date: Mon, 26 Sep 2022 12:52:39 +0200 Subject: [PATCH] RED-5151: Do not retry messages on oom errors --- .../v1/server/queue/MessageReceiver.java | 5 +++-- .../server/queue/PriorityMessageReceiver.java | 5 +++-- .../queue/RedactionMessageReceiver.java | 22 ++++++++++++++++--- .../AnalyseFileRealDataIntegrationTest.java | 9 +++----- 4 files changed, 28 insertions(+), 13 deletions(-) diff --git a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/queue/MessageReceiver.java b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/queue/MessageReceiver.java index 581ebdb2..754c74e9 100644 --- a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/queue/MessageReceiver.java +++ b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/queue/MessageReceiver.java @@ -2,6 +2,7 @@ package com.iqser.red.service.redaction.v1.server.queue; import static com.iqser.red.service.redaction.v1.server.queue.MessagingConfiguration.REDACTION_QUEUE; +import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; @@ -18,8 +19,8 @@ public class MessageReceiver { @RabbitHandler @RabbitListener(queues = REDACTION_QUEUE) - public void receiveAnalyzeRequest(String in) throws JsonProcessingException { - redactionMessageReceiver.receiveAnalyzeRequest(in, false); + public void receiveAnalyzeRequest(Message message) { + redactionMessageReceiver.receiveAnalyzeRequest(message, false); } } diff --git a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/queue/PriorityMessageReceiver.java b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/queue/PriorityMessageReceiver.java index 50340c69..98d9f3d2 100644 --- a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/queue/PriorityMessageReceiver.java +++ b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/queue/PriorityMessageReceiver.java @@ -2,6 +2,7 @@ package com.iqser.red.service.redaction.v1.server.queue; import static com.iqser.red.service.redaction.v1.server.queue.MessagingConfiguration.REDACTION_PRIORITY_QUEUE; +import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; @@ -19,9 +20,9 @@ public class PriorityMessageReceiver { @RabbitHandler @RabbitListener(queues = REDACTION_PRIORITY_QUEUE) - public void receiveAnalyzeRequest(String in) throws JsonProcessingException { + public void receiveAnalyzeRequest(Message message) { - redactionMessageReceiver.receiveAnalyzeRequest(in, true); + redactionMessageReceiver.receiveAnalyzeRequest(message, true); } } diff --git a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/queue/RedactionMessageReceiver.java b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/queue/RedactionMessageReceiver.java index f5335eb4..e9861c83 100644 --- a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/queue/RedactionMessageReceiver.java +++ b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/queue/RedactionMessageReceiver.java @@ -2,6 +2,8 @@ package com.iqser.red.service.redaction.v1.server.queue; import static com.iqser.red.service.redaction.v1.server.queue.MessagingConfiguration.REDACTION_DQL; +import org.springframework.amqp.AmqpRejectAndDontRequeueException; +import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @@ -16,6 +18,7 @@ import com.iqser.red.service.redaction.v1.server.redaction.service.AnalyzeServic import com.iqser.red.service.redaction.v1.server.redaction.service.ManualRedactionSurroundingTextService; import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -29,11 +32,24 @@ public class RedactionMessageReceiver { private final ManualRedactionSurroundingTextService manualRedactionSurroundingTextService; - public void receiveAnalyzeRequest(String in, boolean priority) throws JsonProcessingException { + @SneakyThrows + public void receiveAnalyzeRequest(Message message, boolean priority) { + + var analyzeRequest = objectMapper.readValue(message.getBody(), AnalyzeRequest.class); + + // This prevents from endless retries oom errors. + if(message.getMessageProperties().isRedelivered()){ + throw new AmqpRejectAndDontRequeueException(String.format("Error during last processing of request with dossierId: %s and fileId: %s, do not retry.", analyzeRequest.getDossierId(), analyzeRequest.getFileId())); + } + + receiveAnalyzeRequest(analyzeRequest, priority); + } + + @SneakyThrows + public void receiveAnalyzeRequest(AnalyzeRequest analyzeRequest, boolean priority) { - var analyzeRequest = objectMapper.readValue(in, AnalyzeRequest.class); log.info("Processing priority: {} analyze request for file: {}", priority, analyzeRequest.getFileId()); - AnalyzeResult result = null; + AnalyzeResult result; try { switch (analyzeRequest.getMessageType()) { diff --git a/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/realdata/AnalyseFileRealDataIntegrationTest.java b/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/realdata/AnalyseFileRealDataIntegrationTest.java index c1e0c469..ca98bf47 100644 --- a/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/realdata/AnalyseFileRealDataIntegrationTest.java +++ b/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/realdata/AnalyseFileRealDataIntegrationTest.java @@ -64,8 +64,7 @@ public class AnalyseFileRealDataIntegrationTest extends LiveDataIntegrationTest log.info("No text file provided, Performing Structure analysis"); ar.setMessageType(MessageType.STRUCTURE_ANALYSE); - String in = om.writeValueAsString(ar); - redactionMessageReceiver.receiveAnalyzeRequest(in, false); + redactionMessageReceiver.receiveAnalyzeRequest(ar, false); } @@ -75,15 +74,13 @@ public class AnalyseFileRealDataIntegrationTest extends LiveDataIntegrationTest log.info("No redaction log provided, Performing full analysis"); ar.setMessageType(MessageType.ANALYSE); - String in = om.writeValueAsString(ar); - redactionMessageReceiver.receiveAnalyzeRequest(in, false); + redactionMessageReceiver.receiveAnalyzeRequest(ar, false); } simulateIncrement(List.of("Desiree"), "PII", 3L); ar.setMessageType(MessageType.REANALYSE); - String in = om.writeValueAsString(ar); - redactionMessageReceiver.receiveAnalyzeRequest(in, false); + redactionMessageReceiver.receiveAnalyzeRequest(ar, false); // var redactionLog = redactionStorageService.getRedactionLog("dossierId","fileId");