diff --git a/persistence-service-v1/persistence-service-server-v1/src/main/java/com/iqser/red/service/peristence/v1/server/service/download/DownloadMessageReceiver.java b/persistence-service-v1/persistence-service-server-v1/src/main/java/com/iqser/red/service/peristence/v1/server/service/download/DownloadMessageReceiver.java index 2077b900b..81ffb7b3f 100644 --- a/persistence-service-v1/persistence-service-server-v1/src/main/java/com/iqser/red/service/peristence/v1/server/service/download/DownloadMessageReceiver.java +++ b/persistence-service-v1/persistence-service-server-v1/src/main/java/com/iqser/red/service/peristence/v1/server/service/download/DownloadMessageReceiver.java @@ -1,14 +1,19 @@ package com.iqser.red.service.peristence.v1.server.service.download; +import org.springframework.amqp.AmqpRejectAndDontRequeueException; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Service; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.iqser.red.service.peristence.v1.server.configuration.MessagingConfiguration; import com.iqser.red.service.peristence.v1.server.model.DownloadJob; + import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.rabbit.annotation.RabbitHandler; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.stereotype.Service; @Slf4j @Service @@ -20,12 +25,24 @@ public class DownloadMessageReceiver { private final ObjectMapper objectMapper; + @SneakyThrows @RabbitHandler - public void receive(String in) throws JsonProcessingException { + public void receive(Message message) throws JsonProcessingException { + + DownloadJob downloadJob = objectMapper.readValue(message.getBody(), DownloadJob.class); + + // This prevents from endless retries oom errors. + if(message.getMessageProperties().isRedelivered()){ + throw new AmqpRejectAndDontRequeueException(String.format("Error during last processing of request with downloadId: %s, do not retry.", downloadJob.getStorageId())); + } + + receive(downloadJob); + } + + + public void receive(DownloadJob downloadJob) throws JsonProcessingException { - DownloadJob downloadJob = objectMapper.readValue(in, DownloadJob.class); log.info("Preparing download for userId: {} and storageId: {}", downloadJob.getUserId(), downloadJob.getStorageId()); - downloadProcessorService.processDownload(downloadJob); } diff --git a/persistence-service-v1/persistence-service-server-v1/src/main/java/com/iqser/red/service/peristence/v1/server/service/download/DownloadReportMessageReceiver.java b/persistence-service-v1/persistence-service-server-v1/src/main/java/com/iqser/red/service/peristence/v1/server/service/download/DownloadReportMessageReceiver.java index d7f631ab1..97d246c7b 100644 --- a/persistence-service-v1/persistence-service-server-v1/src/main/java/com/iqser/red/service/peristence/v1/server/service/download/DownloadReportMessageReceiver.java +++ b/persistence-service-v1/persistence-service-server-v1/src/main/java/com/iqser/red/service/peristence/v1/server/service/download/DownloadReportMessageReceiver.java @@ -1,5 +1,7 @@ package com.iqser.red.service.peristence.v1.server.service.download; +import org.springframework.amqp.AmqpRejectAndDontRequeueException; +import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @@ -11,6 +13,7 @@ import com.iqser.red.service.redaction.report.v1.api.model.ReportResultMessage; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import lombok.experimental.FieldDefaults; import lombok.extern.slf4j.Slf4j; @@ -25,13 +28,23 @@ public class DownloadReportMessageReceiver { DownloadPreparationService downloadPreparationService; + @SneakyThrows @RabbitHandler - public void receive(String in) throws JsonProcessingException { + public void receive(Message message){ - ReportResultMessage reportResultMessage = objectMapper.readValue(in, ReportResultMessage.class); + ReportResultMessage reportResultMessage = objectMapper.readValue(message.getBody(), ReportResultMessage.class); + + // This prevents from endless retries oom errors. + if(message.getMessageProperties().isRedelivered()){ + throw new AmqpRejectAndDontRequeueException(String.format("Error during last processing of request with downloadId: %s, do not retry.", reportResultMessage.getDownloadId())); + } + + receive(reportResultMessage); + } + + public void receive(ReportResultMessage reportResultMessage) throws JsonProcessingException { log.info("Received request for userId:{} downloadId:{}", reportResultMessage.getUserId(), reportResultMessage.getDownloadId()); - downloadPreparationService.createDownload(reportResultMessage); } diff --git a/persistence-service-v1/persistence-service-server-v1/src/main/java/com/iqser/red/service/peristence/v1/server/service/download/RedactionResultMessageReceiver.java b/persistence-service-v1/persistence-service-server-v1/src/main/java/com/iqser/red/service/peristence/v1/server/service/download/RedactionResultMessageReceiver.java index 246b8a72c..b532467ff 100644 --- a/persistence-service-v1/persistence-service-server-v1/src/main/java/com/iqser/red/service/peristence/v1/server/service/download/RedactionResultMessageReceiver.java +++ b/persistence-service-v1/persistence-service-server-v1/src/main/java/com/iqser/red/service/peristence/v1/server/service/download/RedactionResultMessageReceiver.java @@ -1,5 +1,7 @@ package com.iqser.red.service.peristence.v1.server.service.download; +import org.springframework.amqp.AmqpRejectAndDontRequeueException; +import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @@ -11,6 +13,7 @@ import com.iqser.red.service.peristence.v1.server.configuration.MessagingConfigu import lombok.AccessLevel; import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import lombok.experimental.FieldDefaults; import lombok.extern.slf4j.Slf4j; @@ -25,10 +28,16 @@ public class RedactionResultMessageReceiver { DownloadPreparationService downloadPreparationService; + @SneakyThrows @RabbitHandler - public void receive(String in) throws JsonProcessingException { + public void receive(Message message) throws JsonProcessingException { - RedactionResultMessage redactionResultMessage = objectMapper.readValue(in, RedactionResultMessage.class); + RedactionResultMessage redactionResultMessage = objectMapper.readValue(message.getBody(), RedactionResultMessage.class); + + // This prevents from endless retries oom errors. + if(message.getMessageProperties().isRedelivered()){ + throw new AmqpRejectAndDontRequeueException(String.format("Error during last processing of request with downloadId: %s, do not retry.", redactionResultMessage.getDownloadId())); + } log.info("Received redaction results for downloadId:{}", redactionResultMessage.getDownloadId()); diff --git a/persistence-service-v1/persistence-service-server-v1/src/test/java/com/iqser/red/service/peristence/v1/server/integration/tests/DownloadPreparationTest.java b/persistence-service-v1/persistence-service-server-v1/src/test/java/com/iqser/red/service/peristence/v1/server/integration/tests/DownloadPreparationTest.java index 5a7498dbe..47a302d60 100644 --- a/persistence-service-v1/persistence-service-server-v1/src/test/java/com/iqser/red/service/peristence/v1/server/integration/tests/DownloadPreparationTest.java +++ b/persistence-service-v1/persistence-service-server-v1/src/test/java/com/iqser/red/service/peristence/v1/server/integration/tests/DownloadPreparationTest.java @@ -10,7 +10,6 @@ import java.util.stream.Collectors; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; -import com.fasterxml.jackson.databind.ObjectMapper; import com.iqser.red.service.peristence.v1.server.integration.client.DossierClient; import com.iqser.red.service.peristence.v1.server.integration.client.DownloadClient; import com.iqser.red.service.peristence.v1.server.integration.client.FileClient; @@ -43,10 +42,7 @@ public class DownloadPreparationTest extends AbstractPersistenceServerServiceTes @Autowired private DossierTesterAndProvider dossierTesterAndProvider; - - @Autowired - private ObjectMapper objectMapper; - + @Autowired private DownloadClient downloadClient; @@ -128,6 +124,6 @@ public class DownloadPreparationTest extends AbstractPersistenceServerServiceTes reportResultMessage.setUserId("1"); reportResultMessage.setDownloadId(statuses.iterator().next().getStorageId()); - downloadReportMessageReceiver.receive(objectMapper.writeValueAsString(reportResultMessage)); + downloadReportMessageReceiver.receive(reportResultMessage); } }