From ab3b30b28256a93fff79b642080ddc1d940992a9 Mon Sep 17 00:00:00 2001 From: Timo Date: Wed, 21 Apr 2021 14:11:22 +0300 Subject: [PATCH] prepared redaction service for queue handling --- .../redaction/v1/model/AnalyzeResult.java | 3 ++ .../redaction-service-server-v1/pom.xml | 12 +++++ .../server/queue/MessagingConfiguration.java | 35 ++++++++++++++ .../queue/RedactionMessageReceiver.java | 46 +++++++++++++++++++ .../service/AnalyzeResponseService.java | 5 +- .../redaction/service/ReanalyzeService.java | 20 +++++--- .../segmentation/SectionsBuilderService.java | 8 ++++ .../src/main/resources/application.yml | 14 ++++++ .../v1/server/RedactionIntegrationTest.java | 13 ++++++ 9 files changed, 149 insertions(+), 7 deletions(-) create mode 100644 redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/queue/MessagingConfiguration.java create mode 100644 redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/queue/RedactionMessageReceiver.java diff --git a/redaction-service-v1/redaction-service-api-v1/src/main/java/com/iqser/red/service/redaction/v1/model/AnalyzeResult.java b/redaction-service-v1/redaction-service-api-v1/src/main/java/com/iqser/red/service/redaction/v1/model/AnalyzeResult.java index 5cc9de43..3875b00d 100644 --- a/redaction-service-v1/redaction-service-api-v1/src/main/java/com/iqser/red/service/redaction/v1/model/AnalyzeResult.java +++ b/redaction-service-v1/redaction-service-api-v1/src/main/java/com/iqser/red/service/redaction/v1/model/AnalyzeResult.java @@ -11,6 +11,9 @@ import lombok.NoArgsConstructor; @AllArgsConstructor public class AnalyzeResult { + private String projectId; + private String fileId; + private long duration; private int numberOfPages; private boolean hasHints; private boolean hasRequests; diff --git a/redaction-service-v1/redaction-service-server-v1/pom.xml b/redaction-service-v1/redaction-service-server-v1/pom.xml index 8f8d8db8..fd802d43 100644 --- a/redaction-service-v1/redaction-service-server-v1/pom.xml +++ b/redaction-service-v1/redaction-service-server-v1/pom.xml @@ -83,6 +83,12 @@ spring-cloud-starter-openfeign + + org.springframework.boot + spring-boot-starter-amqp + 2.3.1.RELEASE + + org.springframework.boot @@ -94,6 +100,12 @@ test-commons test + + org.springframework.amqp + spring-rabbit-test + 2.3.1 + test + diff --git a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/queue/MessagingConfiguration.java b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/queue/MessagingConfiguration.java new file mode 100644 index 00000000..965163c0 --- /dev/null +++ b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/queue/MessagingConfiguration.java @@ -0,0 +1,35 @@ +package com.iqser.red.service.redaction.v1.server.queue; + +import lombok.RequiredArgsConstructor; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@RequiredArgsConstructor +public class MessagingConfiguration { + + + public static final String REDACTION_QUEUE = "redactionQueue"; + + public static final String REDACTION_DQL = "redactionDQL"; + + + @Bean + public Queue redactionQueue() { + + return QueueBuilder.durable(REDACTION_QUEUE) + .withArgument("x-dead-letter-exchange", "") + .withArgument("x-dead-letter-routing-key", REDACTION_QUEUE) + .maxPriority(2) + .build(); + } + + + @Bean + public Queue redactionDeadLetterQueue() { + + return QueueBuilder.durable(REDACTION_DQL).build(); + } +} diff --git a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/queue/RedactionMessageReceiver.java b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/queue/RedactionMessageReceiver.java new file mode 100644 index 00000000..69d62c60 --- /dev/null +++ b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/queue/RedactionMessageReceiver.java @@ -0,0 +1,46 @@ +package com.iqser.red.service.redaction.v1.server.queue; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.iqser.red.service.redaction.v1.model.AnalyzeRequest; +import com.iqser.red.service.redaction.v1.server.redaction.service.ReanalyzeService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Service; + +import static com.iqser.red.service.redaction.v1.server.queue.MessagingConfiguration.REDACTION_DQL; +import static com.iqser.red.service.redaction.v1.server.queue.MessagingConfiguration.REDACTION_QUEUE; + +@Slf4j +@Service +@RequiredArgsConstructor +public class RedactionMessageReceiver { + + private final ObjectMapper objectMapper; + private final ReanalyzeService reanalyzeService; + + @RabbitHandler + @RabbitListener(queues = REDACTION_QUEUE) + public void receiveAnalyzeRequest(String in) throws JsonProcessingException { + var analyzeRequest = objectMapper.readValue(in, AnalyzeRequest.class); + log.info("Processing analyze request: {}", analyzeRequest); + if (analyzeRequest.isReanalyseOnlyIfPossible()) { + reanalyzeService.reanalyze(analyzeRequest); + } else { + reanalyzeService.analyze(analyzeRequest); + } + log.info("Successfully analyzed {}", analyzeRequest); + // TODO respond to file-api + } + + @RabbitHandler + @RabbitListener(queues = REDACTION_DQL) + public void receiveAnalyzeRequestDQL(String in) throws JsonProcessingException { + var analyzeRequest = objectMapper.readValue(in, AnalyzeRequest.class); + log.info("Failed to process analyze request: {}", analyzeRequest); + // TODO respond to file-api + } + +} diff --git a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/redaction/service/AnalyzeResponseService.java b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/redaction/service/AnalyzeResponseService.java index 97a739f9..677b37a3 100644 --- a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/redaction/service/AnalyzeResponseService.java +++ b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/redaction/service/AnalyzeResponseService.java @@ -9,7 +9,7 @@ import org.springframework.stereotype.Service; @Service public class AnalyzeResponseService { - public AnalyzeResult createAnalyzeResponse(int pageCount, RedactionLog redactionLog, RedactionChangeLog redactionChangeLog) { + public AnalyzeResult createAnalyzeResponse(String projectId, String fileId, long duration, int pageCount, RedactionLog redactionLog, RedactionChangeLog redactionChangeLog) { boolean hasHints = redactionLog.getRedactionLogEntry().stream().anyMatch(RedactionLogEntry::isHint); boolean hasRequests = redactionLog.getRedactionLogEntry() @@ -31,6 +31,9 @@ public class AnalyzeResponseService { .isEmpty() && redactionChangeLog.getRedactionLogEntry().stream().anyMatch(entry -> !entry.getType().equals("false_positive")); return AnalyzeResult.builder() + .projectId(projectId) + .fileId(fileId) + .duration(duration) .numberOfPages(pageCount) .hasHints(hasHints) .hasRedactions(hasRedactions) diff --git a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/redaction/service/ReanalyzeService.java b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/redaction/service/ReanalyzeService.java index a5bcd4f3..dfb14446 100644 --- a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/redaction/service/ReanalyzeService.java +++ b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/redaction/service/ReanalyzeService.java @@ -38,6 +38,7 @@ public class ReanalyzeService { private final AnalyzeResponseService analyzeResponseService; public AnalyzeResult analyze(AnalyzeRequest analyzeRequest) { + long startTime = System.currentTimeMillis(); var pageCount = 0; Document classifiedDoc; @@ -70,12 +71,14 @@ public class ReanalyzeService { redactionStorageService.storeObject(analyzeRequest.getProjectId(), analyzeRequest.getFileId(), FileType.TEXT, new Text(pageCount, classifiedDoc.getSectionText())); redactionStorageService.storeObject(analyzeRequest.getProjectId(), analyzeRequest.getFileId(), FileType.SECTION_GRID, classifiedDoc.getSectionGrid()); - return analyzeResponseService.createAnalyzeResponse(pageCount, redactionLog, changeLog); + long duration = System.currentTimeMillis() - startTime; + return analyzeResponseService.createAnalyzeResponse(analyzeRequest.getProjectId(), analyzeRequest.getFileId(), duration, pageCount, redactionLog, changeLog); } @SneakyThrows public AnalyzeResult reanalyze(@RequestBody AnalyzeRequest analyzeRequest) { + long startTime = System.currentTimeMillis(); var redactionLog = redactionStorageService.getRedactionLog(analyzeRequest.getProjectId(), analyzeRequest.getFileId()); var text = redactionStorageService.getText(analyzeRequest.getProjectId(), analyzeRequest.getFileId()); @@ -131,10 +134,7 @@ public class ReanalyzeService { log.info("Should reanalyze {} sections for request: {}", sectionsToReanalyse.size(), analyzeRequest); if (sectionsToReanalyse.isEmpty() && (manualAdds == null || manualAdds.isEmpty())) { - redactionLog.setDictionaryVersion(dictionaryIncrement.getDictionaryVersion()); - var changeLog = redactionChangeLogService.createAndStoreChangeLog(analyzeRequest.getProjectId(), analyzeRequest.getFileId(), redactionLog); - redactionStorageService.storeObject(analyzeRequest.getProjectId(), analyzeRequest.getFileId(), FileType.REDACTION_LOG, redactionLog); - return analyzeResponseService.createAnalyzeResponse(text.getNumberOfPages(), redactionLog, changeLog); + return finalizeAnalysis(analyzeRequest, startTime, redactionLog, text, dictionaryIncrement); } List reanalysisSections = new ArrayList<>(); @@ -230,12 +230,20 @@ public class ReanalyzeService { redactionLog.getRedactionLogEntry().removeIf(entry -> sectionsToReanalyse.contains(entry.getSectionNumber()) && !entry.isImage() || entry.getSectionNumber() == 0 && !entry.isImage()); redactionLog.getRedactionLogEntry().addAll(newRedactionLogEntries); + return finalizeAnalysis + (analyzeRequest, startTime, redactionLog, text, dictionaryIncrement); + + } + + private AnalyzeResult finalizeAnalysis(@RequestBody AnalyzeRequest analyzeRequest, long startTime, RedactionLog redactionLog, Text text, DictionaryIncrement dictionaryIncrement) { redactionLog.setDictionaryVersion(dictionaryIncrement.getDictionaryVersion()); var changeLog = redactionChangeLogService.createAndStoreChangeLog(analyzeRequest.getProjectId(), analyzeRequest.getFileId(), redactionLog); redactionStorageService.storeObject(analyzeRequest.getProjectId(), analyzeRequest.getFileId(), FileType.REDACTION_LOG, redactionLog); - return analyzeResponseService.createAnalyzeResponse(text.getNumberOfPages(), redactionLog, changeLog); + long duration = System.currentTimeMillis() - startTime; + + return analyzeResponseService.createAnalyzeResponse(analyzeRequest.getProjectId(), analyzeRequest.getFileId(), duration, text.getNumberOfPages(), redactionLog, changeLog); } diff --git a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/segmentation/SectionsBuilderService.java b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/segmentation/SectionsBuilderService.java index 57da945f..7ee5a4c8 100644 --- a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/segmentation/SectionsBuilderService.java +++ b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/segmentation/SectionsBuilderService.java @@ -113,6 +113,14 @@ public class SectionsBuilderService { paragraphMap.computeIfAbsent(1, x -> new TreeSet<>()).add(paragraph); } + // first page is always a paragraph, else we can't process pages 1..N, + // where N is the first found page with a paragraph + if (paragraphMap.get(1) == null) { + Paragraph paragraph = new Paragraph(); + document.getParagraphs().add(paragraph); + paragraphMap.computeIfAbsent(1, x -> new TreeSet<>()).add(paragraph); + } + for (Page page : document.getPages()) { for (PdfImage image : page.getImages()) { SortedSet paragraphsOnPage = paragraphMap.get(page.getPageNumber()); diff --git a/redaction-service-v1/redaction-service-server-v1/src/main/resources/application.yml b/redaction-service-v1/redaction-service-server-v1/src/main/resources/application.yml index 15ff3651..5e895e5d 100644 --- a/redaction-service-v1/redaction-service-server-v1/src/main/resources/application.yml +++ b/redaction-service-v1/redaction-service-server-v1/src/main/resources/application.yml @@ -10,6 +10,20 @@ server: spring: profiles: active: kubernetes + rabbitmq: + host: ${RABBITMQ_HOST:localhost} + port: ${RABBITMQ_PORT:5672} + username: ${RABBITMQ_USERNAME:user} + password: ${RABBITMQ_PASSWORD:rabbitmq} + listener: + simple: + acknowledge-mode: AUTO + concurrency: 2 + retry: + enabled: true + max-attempts: 3 + max-interval: 15000 + prefetch: 1 management: endpoint: diff --git a/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/RedactionIntegrationTest.java b/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/RedactionIntegrationTest.java index c74b653e..03df78f5 100644 --- a/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/RedactionIntegrationTest.java +++ b/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/RedactionIntegrationTest.java @@ -26,6 +26,7 @@ import org.kie.api.builder.KieBuilder; import org.kie.api.builder.KieFileSystem; import org.kie.api.builder.KieModule; import org.kie.api.runtime.KieContainer; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; @@ -99,6 +100,9 @@ public class RedactionIntegrationTest { @MockBean private AmazonS3 amazonS3; + @MockBean + private RabbitTemplate rabbitTemplate; + private final Map> dictionary = new HashMap<>(); private final Map typeColorMap = new HashMap<>(); private final Map hintTypeMap = new HashMap<>(); @@ -450,6 +454,15 @@ public class RedactionIntegrationTest { } + @Test + public void test270Rotated() { + AnalyzeRequest request = prepareStorage("files/Minimal Examples/270Rotated.pdf"); + MemoryStats.printMemoryStats(); + AnalyzeResult result = redactionController.analyze(request); + assertThat(result).isNotNull(); + } + + @Test public void testLargeScannedFileOOM() { AnalyzeRequest request = prepareStorage("scanned/VV-377031.pdf");