From 1c3f2e8174ce77ffd00e4e912c28aae67cc98ac7 Mon Sep 17 00:00:00 2001 From: Viktor Seifert Date: Fri, 1 Jul 2022 12:01:55 +0200 Subject: [PATCH] RED-1098: Implemented a receiver for dead messages from the pdftron-exchanges --- .../configuration/MessagingConfiguration.java | 23 +++++++- .../download/RedactionDlqMessageReceiver.java | 52 +++++++++++++++++++ 2 files changed, 73 insertions(+), 2 deletions(-) create mode 100644 persistence-service-v1/persistence-service-server-v1/src/main/java/com/iqser/red/service/peristence/v1/server/service/download/RedactionDlqMessageReceiver.java diff --git a/persistence-service-v1/persistence-service-server-v1/src/main/java/com/iqser/red/service/peristence/v1/server/configuration/MessagingConfiguration.java b/persistence-service-v1/persistence-service-server-v1/src/main/java/com/iqser/red/service/peristence/v1/server/configuration/MessagingConfiguration.java index bdfc6b53a..e213785b3 100644 --- a/persistence-service-v1/persistence-service-server-v1/src/main/java/com/iqser/red/service/peristence/v1/server/configuration/MessagingConfiguration.java +++ b/persistence-service-v1/persistence-service-server-v1/src/main/java/com/iqser/red/service/peristence/v1/server/configuration/MessagingConfiguration.java @@ -49,6 +49,7 @@ public class MessagingConfiguration { public static final String PDF_2_IMAGE_DLQ = "pdf2image_dead_letter_queue"; public static final String PDFTRON_QUEUE = "pdftron_queue"; + public static final String PDFTRON_DLQ = "pdftron_dlq"; public static final String PDFTRON_RESULT_QUEUE = "pdftron_result_queue"; @@ -246,13 +247,31 @@ public class MessagingConfiguration { @Bean public Queue pdfTronQueue() { - return QueueBuilder.durable(PDFTRON_QUEUE).build(); + return QueueBuilder.durable(PDFTRON_QUEUE) + .withArgument("x-dead-letter-exchange", "") + .withArgument("x-dead-letter-routing-key", PDFTRON_DLQ) + .withArgument("x-max-priority", 2) + .maxPriority(2) + .build(); } + + @Bean + public Queue pdfTronDlq() { + + return QueueBuilder.durable(PDFTRON_DLQ).build(); + } + + @Bean public Queue pdfTronResultQueue() { - return QueueBuilder.durable(PDFTRON_RESULT_QUEUE).build(); + return QueueBuilder.durable(PDFTRON_RESULT_QUEUE) + .withArgument("x-dead-letter-exchange", "") + .withArgument("x-dead-letter-routing-key", PDFTRON_DLQ) + .withArgument("x-max-priority", 2) + .maxPriority(2) + .build(); } } diff --git a/persistence-service-v1/persistence-service-server-v1/src/main/java/com/iqser/red/service/peristence/v1/server/service/download/RedactionDlqMessageReceiver.java b/persistence-service-v1/persistence-service-server-v1/src/main/java/com/iqser/red/service/peristence/v1/server/service/download/RedactionDlqMessageReceiver.java new file mode 100644 index 000000000..a1041a219 --- /dev/null +++ b/persistence-service-v1/persistence-service-server-v1/src/main/java/com/iqser/red/service/peristence/v1/server/service/download/RedactionDlqMessageReceiver.java @@ -0,0 +1,52 @@ +package com.iqser.red.service.peristence.v1.server.service.download; + +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Service; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.iqser.red.service.peristence.v1.server.configuration.MessagingConfiguration; +import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DownloadStatusPersistenceService; +import com.iqser.red.service.persistence.service.v1.api.model.download.DownloadStatusValue; + +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import lombok.experimental.FieldDefaults; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Service +@RequiredArgsConstructor +@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) +@RabbitListener(queues = MessagingConfiguration.PDFTRON_DLQ) +public class RedactionDlqMessageReceiver { + + private static final String LINE_SEPARATOR = System.lineSeparator(); + + ObjectMapper objectMapper; + DownloadStatusPersistenceService downloadStatusPersistenceService; + + + @RabbitHandler + public void receive(String in) throws JsonProcessingException { + + // Since we receive different message types here, we do not convert to an object here; + // We just assume that the message contains a downloadId. + JsonNode jsonNode = objectMapper.readTree(in); + final String downloadId; + try { + downloadId = jsonNode.findValue("downloadId").asText(); + } catch (Exception e) { + log.warn("Received a message in the " + MessagingConfiguration.PDFTRON_DLQ + " that contains no downloadId" + LINE_SEPARATOR + "{}", jsonNode.asText()); + throw new RuntimeException(e); + } + + log.info("Received a dead message with downloadId:{}, updating the download as failed", downloadId); + + downloadStatusPersistenceService.updateStatus(downloadId, DownloadStatusValue.FAILED); + + } + +}