RED-5151: Reject redelivered messages

This commit is contained in:
deiflaender 2022-09-27 13:00:08 +02:00
parent 11b1f479bf
commit 18569a0960
4 changed files with 52 additions and 17 deletions

View File

@ -1,14 +1,19 @@
package com.iqser.red.service.peristence.v1.server.service.download;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iqser.red.service.peristence.v1.server.configuration.MessagingConfiguration;
import com.iqser.red.service.peristence.v1.server.model.DownloadJob;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@ -20,12 +25,24 @@ public class DownloadMessageReceiver {
private final ObjectMapper objectMapper;
@SneakyThrows
@RabbitHandler
public void receive(String in) throws JsonProcessingException {
public void receive(Message message) throws JsonProcessingException {
DownloadJob downloadJob = objectMapper.readValue(message.getBody(), DownloadJob.class);
// This prevents from endless retries oom errors.
if(message.getMessageProperties().isRedelivered()){
throw new AmqpRejectAndDontRequeueException(String.format("Error during last processing of request with downloadId: %s, do not retry.", downloadJob.getStorageId()));
}
receive(downloadJob);
}
public void receive(DownloadJob downloadJob) throws JsonProcessingException {
DownloadJob downloadJob = objectMapper.readValue(in, DownloadJob.class);
log.info("Preparing download for userId: {} and storageId: {}", downloadJob.getUserId(), downloadJob.getStorageId());
downloadProcessorService.processDownload(downloadJob);
}

View File

@ -1,5 +1,7 @@
package com.iqser.red.service.peristence.v1.server.service.download;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@ -11,6 +13,7 @@ import com.iqser.red.service.redaction.report.v1.api.model.ReportResultMessage;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.experimental.FieldDefaults;
import lombok.extern.slf4j.Slf4j;
@ -25,13 +28,23 @@ public class DownloadReportMessageReceiver {
DownloadPreparationService downloadPreparationService;
@SneakyThrows
@RabbitHandler
public void receive(String in) throws JsonProcessingException {
public void receive(Message message){
ReportResultMessage reportResultMessage = objectMapper.readValue(in, ReportResultMessage.class);
ReportResultMessage reportResultMessage = objectMapper.readValue(message.getBody(), ReportResultMessage.class);
// This prevents from endless retries oom errors.
if(message.getMessageProperties().isRedelivered()){
throw new AmqpRejectAndDontRequeueException(String.format("Error during last processing of request with downloadId: %s, do not retry.", reportResultMessage.getDownloadId()));
}
receive(reportResultMessage);
}
public void receive(ReportResultMessage reportResultMessage) throws JsonProcessingException {
log.info("Received request for userId:{} downloadId:{}", reportResultMessage.getUserId(), reportResultMessage.getDownloadId());
downloadPreparationService.createDownload(reportResultMessage);
}

View File

@ -1,5 +1,7 @@
package com.iqser.red.service.peristence.v1.server.service.download;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@ -11,6 +13,7 @@ import com.iqser.red.service.peristence.v1.server.configuration.MessagingConfigu
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.experimental.FieldDefaults;
import lombok.extern.slf4j.Slf4j;
@ -25,10 +28,16 @@ public class RedactionResultMessageReceiver {
DownloadPreparationService downloadPreparationService;
@SneakyThrows
@RabbitHandler
public void receive(String in) throws JsonProcessingException {
public void receive(Message message) throws JsonProcessingException {
RedactionResultMessage redactionResultMessage = objectMapper.readValue(in, RedactionResultMessage.class);
RedactionResultMessage redactionResultMessage = objectMapper.readValue(message.getBody(), RedactionResultMessage.class);
// This prevents from endless retries oom errors.
if(message.getMessageProperties().isRedelivered()){
throw new AmqpRejectAndDontRequeueException(String.format("Error during last processing of request with downloadId: %s, do not retry.", redactionResultMessage.getDownloadId()));
}
log.info("Received redaction results for downloadId:{}", redactionResultMessage.getDownloadId());

View File

@ -10,7 +10,6 @@ import java.util.stream.Collectors;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iqser.red.service.peristence.v1.server.integration.client.DossierClient;
import com.iqser.red.service.peristence.v1.server.integration.client.DownloadClient;
import com.iqser.red.service.peristence.v1.server.integration.client.FileClient;
@ -43,10 +42,7 @@ public class DownloadPreparationTest extends AbstractPersistenceServerServiceTes
@Autowired
private DossierTesterAndProvider dossierTesterAndProvider;
@Autowired
private ObjectMapper objectMapper;
@Autowired
private DownloadClient downloadClient;
@ -128,6 +124,6 @@ public class DownloadPreparationTest extends AbstractPersistenceServerServiceTes
reportResultMessage.setUserId("1");
reportResultMessage.setDownloadId(statuses.iterator().next().getStorageId());
downloadReportMessageReceiver.receive(objectMapper.writeValueAsString(reportResultMessage));
downloadReportMessageReceiver.receive(reportResultMessage);
}
}