Pull request #475: RED-5151: Do not retry messages on oom errors
Merge in RED/redaction-service from RED-5151 to master * commit '0e71613ffcce147b2ae659e47ad6ab1c6405f0e7': RED-5151: Do not retry messages on oom errors
This commit is contained in:
commit
ec60d0eab4
@ -2,6 +2,7 @@ package com.iqser.red.service.redaction.v1.server.queue;
|
||||
|
||||
import static com.iqser.red.service.redaction.v1.server.queue.MessagingConfiguration.REDACTION_QUEUE;
|
||||
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
|
||||
@ -18,8 +19,8 @@ public class MessageReceiver {
|
||||
|
||||
@RabbitHandler
|
||||
@RabbitListener(queues = REDACTION_QUEUE)
|
||||
public void receiveAnalyzeRequest(String in) throws JsonProcessingException {
|
||||
redactionMessageReceiver.receiveAnalyzeRequest(in, false);
|
||||
public void receiveAnalyzeRequest(Message message) {
|
||||
redactionMessageReceiver.receiveAnalyzeRequest(message, false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -2,6 +2,7 @@ package com.iqser.red.service.redaction.v1.server.queue;
|
||||
|
||||
import static com.iqser.red.service.redaction.v1.server.queue.MessagingConfiguration.REDACTION_PRIORITY_QUEUE;
|
||||
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
|
||||
@ -19,9 +20,9 @@ public class PriorityMessageReceiver {
|
||||
|
||||
@RabbitHandler
|
||||
@RabbitListener(queues = REDACTION_PRIORITY_QUEUE)
|
||||
public void receiveAnalyzeRequest(String in) throws JsonProcessingException {
|
||||
public void receiveAnalyzeRequest(Message message) {
|
||||
|
||||
redactionMessageReceiver.receiveAnalyzeRequest(in, true);
|
||||
redactionMessageReceiver.receiveAnalyzeRequest(message, true);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -2,6 +2,8 @@ package com.iqser.red.service.redaction.v1.server.queue;
|
||||
|
||||
import static com.iqser.red.service.redaction.v1.server.queue.MessagingConfiguration.REDACTION_DQL;
|
||||
|
||||
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
@ -16,6 +18,7 @@ import com.iqser.red.service.redaction.v1.server.redaction.service.AnalyzeServic
|
||||
import com.iqser.red.service.redaction.v1.server.redaction.service.ManualRedactionSurroundingTextService;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@ -29,11 +32,24 @@ public class RedactionMessageReceiver {
|
||||
private final ManualRedactionSurroundingTextService manualRedactionSurroundingTextService;
|
||||
|
||||
|
||||
public void receiveAnalyzeRequest(String in, boolean priority) throws JsonProcessingException {
|
||||
@SneakyThrows
|
||||
public void receiveAnalyzeRequest(Message message, boolean priority) {
|
||||
|
||||
var analyzeRequest = objectMapper.readValue(message.getBody(), AnalyzeRequest.class);
|
||||
|
||||
// This prevents from endless retries oom errors.
|
||||
if(message.getMessageProperties().isRedelivered()){
|
||||
throw new AmqpRejectAndDontRequeueException(String.format("Error during last processing of request with dossierId: %s and fileId: %s, do not retry.", analyzeRequest.getDossierId(), analyzeRequest.getFileId()));
|
||||
}
|
||||
|
||||
receiveAnalyzeRequest(analyzeRequest, priority);
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
public void receiveAnalyzeRequest(AnalyzeRequest analyzeRequest, boolean priority) {
|
||||
|
||||
var analyzeRequest = objectMapper.readValue(in, AnalyzeRequest.class);
|
||||
log.info("Processing priority: {} analyze request for file: {}", priority, analyzeRequest.getFileId());
|
||||
AnalyzeResult result = null;
|
||||
AnalyzeResult result;
|
||||
|
||||
try {
|
||||
switch (analyzeRequest.getMessageType()) {
|
||||
|
||||
@ -64,8 +64,7 @@ public class AnalyseFileRealDataIntegrationTest extends LiveDataIntegrationTest
|
||||
log.info("No text file provided, Performing Structure analysis");
|
||||
|
||||
ar.setMessageType(MessageType.STRUCTURE_ANALYSE);
|
||||
String in = om.writeValueAsString(ar);
|
||||
redactionMessageReceiver.receiveAnalyzeRequest(in, false);
|
||||
redactionMessageReceiver.receiveAnalyzeRequest(ar, false);
|
||||
}
|
||||
|
||||
|
||||
@ -75,15 +74,13 @@ public class AnalyseFileRealDataIntegrationTest extends LiveDataIntegrationTest
|
||||
log.info("No redaction log provided, Performing full analysis");
|
||||
|
||||
ar.setMessageType(MessageType.ANALYSE);
|
||||
String in = om.writeValueAsString(ar);
|
||||
redactionMessageReceiver.receiveAnalyzeRequest(in, false);
|
||||
redactionMessageReceiver.receiveAnalyzeRequest(ar, false);
|
||||
}
|
||||
|
||||
|
||||
simulateIncrement(List.of("Desiree"), "PII", 3L);
|
||||
ar.setMessageType(MessageType.REANALYSE);
|
||||
String in = om.writeValueAsString(ar);
|
||||
redactionMessageReceiver.receiveAnalyzeRequest(in, false);
|
||||
redactionMessageReceiver.receiveAnalyzeRequest(ar, false);
|
||||
|
||||
|
||||
// var redactionLog = redactionStorageService.getRedactionLog("dossierId","fileId");
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user