Pull request #143: prepared redaction service for queue handling
Merge in RED/redaction-service from redaction-system-queue-prep to master * commit 'ab3b30b28256a93fff79b642080ddc1d940992a9': prepared redaction service for queue handling
This commit is contained in:
commit
b22e3d93c0
@ -11,6 +11,9 @@ import lombok.NoArgsConstructor;
|
|||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
public class AnalyzeResult {
|
public class AnalyzeResult {
|
||||||
|
|
||||||
|
private String projectId;
|
||||||
|
private String fileId;
|
||||||
|
private long duration;
|
||||||
private int numberOfPages;
|
private int numberOfPages;
|
||||||
private boolean hasHints;
|
private boolean hasHints;
|
||||||
private boolean hasRequests;
|
private boolean hasRequests;
|
||||||
|
|||||||
@ -83,6 +83,12 @@
|
|||||||
<artifactId>spring-cloud-starter-openfeign</artifactId>
|
<artifactId>spring-cloud-starter-openfeign</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||||
|
<version>2.3.1.RELEASE</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- test dependencies -->
|
<!-- test dependencies -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
@ -94,6 +100,12 @@
|
|||||||
<artifactId>test-commons</artifactId>
|
<artifactId>test-commons</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.amqp</groupId>
|
||||||
|
<artifactId>spring-rabbit-test</artifactId>
|
||||||
|
<version>2.3.1</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|||||||
@ -0,0 +1,35 @@
|
|||||||
|
package com.iqser.red.service.redaction.v1.server.queue;
|
||||||
|
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import org.springframework.amqp.core.Queue;
|
||||||
|
import org.springframework.amqp.core.QueueBuilder;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class MessagingConfiguration {
|
||||||
|
|
||||||
|
|
||||||
|
public static final String REDACTION_QUEUE = "redactionQueue";
|
||||||
|
|
||||||
|
public static final String REDACTION_DQL = "redactionDQL";
|
||||||
|
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Queue redactionQueue() {
|
||||||
|
|
||||||
|
return QueueBuilder.durable(REDACTION_QUEUE)
|
||||||
|
.withArgument("x-dead-letter-exchange", "")
|
||||||
|
.withArgument("x-dead-letter-routing-key", REDACTION_QUEUE)
|
||||||
|
.maxPriority(2)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Queue redactionDeadLetterQueue() {
|
||||||
|
|
||||||
|
return QueueBuilder.durable(REDACTION_DQL).build();
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,46 @@
|
|||||||
|
package com.iqser.red.service.redaction.v1.server.queue;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.iqser.red.service.redaction.v1.model.AnalyzeRequest;
|
||||||
|
import com.iqser.red.service.redaction.v1.server.redaction.service.ReanalyzeService;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import static com.iqser.red.service.redaction.v1.server.queue.MessagingConfiguration.REDACTION_DQL;
|
||||||
|
import static com.iqser.red.service.redaction.v1.server.queue.MessagingConfiguration.REDACTION_QUEUE;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class RedactionMessageReceiver {
|
||||||
|
|
||||||
|
private final ObjectMapper objectMapper;
|
||||||
|
private final ReanalyzeService reanalyzeService;
|
||||||
|
|
||||||
|
@RabbitHandler
|
||||||
|
@RabbitListener(queues = REDACTION_QUEUE)
|
||||||
|
public void receiveAnalyzeRequest(String in) throws JsonProcessingException {
|
||||||
|
var analyzeRequest = objectMapper.readValue(in, AnalyzeRequest.class);
|
||||||
|
log.info("Processing analyze request: {}", analyzeRequest);
|
||||||
|
if (analyzeRequest.isReanalyseOnlyIfPossible()) {
|
||||||
|
reanalyzeService.reanalyze(analyzeRequest);
|
||||||
|
} else {
|
||||||
|
reanalyzeService.analyze(analyzeRequest);
|
||||||
|
}
|
||||||
|
log.info("Successfully analyzed {}", analyzeRequest);
|
||||||
|
// TODO respond to file-api
|
||||||
|
}
|
||||||
|
|
||||||
|
@RabbitHandler
|
||||||
|
@RabbitListener(queues = REDACTION_DQL)
|
||||||
|
public void receiveAnalyzeRequestDQL(String in) throws JsonProcessingException {
|
||||||
|
var analyzeRequest = objectMapper.readValue(in, AnalyzeRequest.class);
|
||||||
|
log.info("Failed to process analyze request: {}", analyzeRequest);
|
||||||
|
// TODO respond to file-api
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -9,7 +9,7 @@ import org.springframework.stereotype.Service;
|
|||||||
@Service
|
@Service
|
||||||
public class AnalyzeResponseService {
|
public class AnalyzeResponseService {
|
||||||
|
|
||||||
public AnalyzeResult createAnalyzeResponse(int pageCount, RedactionLog redactionLog, RedactionChangeLog redactionChangeLog) {
|
public AnalyzeResult createAnalyzeResponse(String projectId, String fileId, long duration, int pageCount, RedactionLog redactionLog, RedactionChangeLog redactionChangeLog) {
|
||||||
boolean hasHints = redactionLog.getRedactionLogEntry().stream().anyMatch(RedactionLogEntry::isHint);
|
boolean hasHints = redactionLog.getRedactionLogEntry().stream().anyMatch(RedactionLogEntry::isHint);
|
||||||
|
|
||||||
boolean hasRequests = redactionLog.getRedactionLogEntry()
|
boolean hasRequests = redactionLog.getRedactionLogEntry()
|
||||||
@ -31,6 +31,9 @@ public class AnalyzeResponseService {
|
|||||||
.isEmpty() && redactionChangeLog.getRedactionLogEntry().stream().anyMatch(entry -> !entry.getType().equals("false_positive"));
|
.isEmpty() && redactionChangeLog.getRedactionLogEntry().stream().anyMatch(entry -> !entry.getType().equals("false_positive"));
|
||||||
|
|
||||||
return AnalyzeResult.builder()
|
return AnalyzeResult.builder()
|
||||||
|
.projectId(projectId)
|
||||||
|
.fileId(fileId)
|
||||||
|
.duration(duration)
|
||||||
.numberOfPages(pageCount)
|
.numberOfPages(pageCount)
|
||||||
.hasHints(hasHints)
|
.hasHints(hasHints)
|
||||||
.hasRedactions(hasRedactions)
|
.hasRedactions(hasRedactions)
|
||||||
|
|||||||
@ -38,6 +38,7 @@ public class ReanalyzeService {
|
|||||||
private final AnalyzeResponseService analyzeResponseService;
|
private final AnalyzeResponseService analyzeResponseService;
|
||||||
|
|
||||||
public AnalyzeResult analyze(AnalyzeRequest analyzeRequest) {
|
public AnalyzeResult analyze(AnalyzeRequest analyzeRequest) {
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
var pageCount = 0;
|
var pageCount = 0;
|
||||||
Document classifiedDoc;
|
Document classifiedDoc;
|
||||||
@ -70,12 +71,14 @@ public class ReanalyzeService {
|
|||||||
redactionStorageService.storeObject(analyzeRequest.getProjectId(), analyzeRequest.getFileId(), FileType.TEXT, new Text(pageCount, classifiedDoc.getSectionText()));
|
redactionStorageService.storeObject(analyzeRequest.getProjectId(), analyzeRequest.getFileId(), FileType.TEXT, new Text(pageCount, classifiedDoc.getSectionText()));
|
||||||
redactionStorageService.storeObject(analyzeRequest.getProjectId(), analyzeRequest.getFileId(), FileType.SECTION_GRID, classifiedDoc.getSectionGrid());
|
redactionStorageService.storeObject(analyzeRequest.getProjectId(), analyzeRequest.getFileId(), FileType.SECTION_GRID, classifiedDoc.getSectionGrid());
|
||||||
|
|
||||||
return analyzeResponseService.createAnalyzeResponse(pageCount, redactionLog, changeLog);
|
long duration = System.currentTimeMillis() - startTime;
|
||||||
|
return analyzeResponseService.createAnalyzeResponse(analyzeRequest.getProjectId(), analyzeRequest.getFileId(), duration, pageCount, redactionLog, changeLog);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
public AnalyzeResult reanalyze(@RequestBody AnalyzeRequest analyzeRequest) {
|
public AnalyzeResult reanalyze(@RequestBody AnalyzeRequest analyzeRequest) {
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
var redactionLog = redactionStorageService.getRedactionLog(analyzeRequest.getProjectId(), analyzeRequest.getFileId());
|
var redactionLog = redactionStorageService.getRedactionLog(analyzeRequest.getProjectId(), analyzeRequest.getFileId());
|
||||||
var text = redactionStorageService.getText(analyzeRequest.getProjectId(), analyzeRequest.getFileId());
|
var text = redactionStorageService.getText(analyzeRequest.getProjectId(), analyzeRequest.getFileId());
|
||||||
@ -131,10 +134,7 @@ public class ReanalyzeService {
|
|||||||
log.info("Should reanalyze {} sections for request: {}", sectionsToReanalyse.size(), analyzeRequest);
|
log.info("Should reanalyze {} sections for request: {}", sectionsToReanalyse.size(), analyzeRequest);
|
||||||
|
|
||||||
if (sectionsToReanalyse.isEmpty() && (manualAdds == null || manualAdds.isEmpty())) {
|
if (sectionsToReanalyse.isEmpty() && (manualAdds == null || manualAdds.isEmpty())) {
|
||||||
redactionLog.setDictionaryVersion(dictionaryIncrement.getDictionaryVersion());
|
return finalizeAnalysis(analyzeRequest, startTime, redactionLog, text, dictionaryIncrement);
|
||||||
var changeLog = redactionChangeLogService.createAndStoreChangeLog(analyzeRequest.getProjectId(), analyzeRequest.getFileId(), redactionLog);
|
|
||||||
redactionStorageService.storeObject(analyzeRequest.getProjectId(), analyzeRequest.getFileId(), FileType.REDACTION_LOG, redactionLog);
|
|
||||||
return analyzeResponseService.createAnalyzeResponse(text.getNumberOfPages(), redactionLog, changeLog);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
List<SectionText> reanalysisSections = new ArrayList<>();
|
List<SectionText> reanalysisSections = new ArrayList<>();
|
||||||
@ -230,12 +230,20 @@ public class ReanalyzeService {
|
|||||||
|
|
||||||
redactionLog.getRedactionLogEntry().removeIf(entry -> sectionsToReanalyse.contains(entry.getSectionNumber()) && !entry.isImage() || entry.getSectionNumber() == 0 && !entry.isImage());
|
redactionLog.getRedactionLogEntry().removeIf(entry -> sectionsToReanalyse.contains(entry.getSectionNumber()) && !entry.isImage() || entry.getSectionNumber() == 0 && !entry.isImage());
|
||||||
redactionLog.getRedactionLogEntry().addAll(newRedactionLogEntries);
|
redactionLog.getRedactionLogEntry().addAll(newRedactionLogEntries);
|
||||||
|
return finalizeAnalysis
|
||||||
|
(analyzeRequest, startTime, redactionLog, text, dictionaryIncrement);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private AnalyzeResult finalizeAnalysis(@RequestBody AnalyzeRequest analyzeRequest, long startTime, RedactionLog redactionLog, Text text, DictionaryIncrement dictionaryIncrement) {
|
||||||
redactionLog.setDictionaryVersion(dictionaryIncrement.getDictionaryVersion());
|
redactionLog.setDictionaryVersion(dictionaryIncrement.getDictionaryVersion());
|
||||||
|
|
||||||
var changeLog = redactionChangeLogService.createAndStoreChangeLog(analyzeRequest.getProjectId(), analyzeRequest.getFileId(), redactionLog);
|
var changeLog = redactionChangeLogService.createAndStoreChangeLog(analyzeRequest.getProjectId(), analyzeRequest.getFileId(), redactionLog);
|
||||||
redactionStorageService.storeObject(analyzeRequest.getProjectId(), analyzeRequest.getFileId(), FileType.REDACTION_LOG, redactionLog);
|
redactionStorageService.storeObject(analyzeRequest.getProjectId(), analyzeRequest.getFileId(), FileType.REDACTION_LOG, redactionLog);
|
||||||
return analyzeResponseService.createAnalyzeResponse(text.getNumberOfPages(), redactionLog, changeLog);
|
|
||||||
|
|
||||||
|
long duration = System.currentTimeMillis() - startTime;
|
||||||
|
|
||||||
|
return analyzeResponseService.createAnalyzeResponse(analyzeRequest.getProjectId(), analyzeRequest.getFileId(), duration, text.getNumberOfPages(), redactionLog, changeLog);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -113,6 +113,14 @@ public class SectionsBuilderService {
|
|||||||
paragraphMap.computeIfAbsent(1, x -> new TreeSet<>()).add(paragraph);
|
paragraphMap.computeIfAbsent(1, x -> new TreeSet<>()).add(paragraph);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// first page is always a paragraph, else we can't process pages 1..N,
|
||||||
|
// where N is the first found page with a paragraph
|
||||||
|
if (paragraphMap.get(1) == null) {
|
||||||
|
Paragraph paragraph = new Paragraph();
|
||||||
|
document.getParagraphs().add(paragraph);
|
||||||
|
paragraphMap.computeIfAbsent(1, x -> new TreeSet<>()).add(paragraph);
|
||||||
|
}
|
||||||
|
|
||||||
for (Page page : document.getPages()) {
|
for (Page page : document.getPages()) {
|
||||||
for (PdfImage image : page.getImages()) {
|
for (PdfImage image : page.getImages()) {
|
||||||
SortedSet<Paragraph> paragraphsOnPage = paragraphMap.get(page.getPageNumber());
|
SortedSet<Paragraph> paragraphsOnPage = paragraphMap.get(page.getPageNumber());
|
||||||
|
|||||||
@ -10,6 +10,20 @@ server:
|
|||||||
spring:
|
spring:
|
||||||
profiles:
|
profiles:
|
||||||
active: kubernetes
|
active: kubernetes
|
||||||
|
rabbitmq:
|
||||||
|
host: ${RABBITMQ_HOST:localhost}
|
||||||
|
port: ${RABBITMQ_PORT:5672}
|
||||||
|
username: ${RABBITMQ_USERNAME:user}
|
||||||
|
password: ${RABBITMQ_PASSWORD:rabbitmq}
|
||||||
|
listener:
|
||||||
|
simple:
|
||||||
|
acknowledge-mode: AUTO
|
||||||
|
concurrency: 2
|
||||||
|
retry:
|
||||||
|
enabled: true
|
||||||
|
max-attempts: 3
|
||||||
|
max-interval: 15000
|
||||||
|
prefetch: 1
|
||||||
|
|
||||||
management:
|
management:
|
||||||
endpoint:
|
endpoint:
|
||||||
|
|||||||
@ -26,6 +26,7 @@ import org.kie.api.builder.KieBuilder;
|
|||||||
import org.kie.api.builder.KieFileSystem;
|
import org.kie.api.builder.KieFileSystem;
|
||||||
import org.kie.api.builder.KieModule;
|
import org.kie.api.builder.KieModule;
|
||||||
import org.kie.api.runtime.KieContainer;
|
import org.kie.api.runtime.KieContainer;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.test.context.SpringBootTest;
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
import org.springframework.boot.test.context.TestConfiguration;
|
import org.springframework.boot.test.context.TestConfiguration;
|
||||||
@ -99,6 +100,9 @@ public class RedactionIntegrationTest {
|
|||||||
@MockBean
|
@MockBean
|
||||||
private AmazonS3 amazonS3;
|
private AmazonS3 amazonS3;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private RabbitTemplate rabbitTemplate;
|
||||||
|
|
||||||
private final Map<String, List<String>> dictionary = new HashMap<>();
|
private final Map<String, List<String>> dictionary = new HashMap<>();
|
||||||
private final Map<String, String> typeColorMap = new HashMap<>();
|
private final Map<String, String> typeColorMap = new HashMap<>();
|
||||||
private final Map<String, Boolean> hintTypeMap = new HashMap<>();
|
private final Map<String, Boolean> hintTypeMap = new HashMap<>();
|
||||||
@ -450,6 +454,15 @@ public class RedactionIntegrationTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test270Rotated() {
|
||||||
|
AnalyzeRequest request = prepareStorage("files/Minimal Examples/270Rotated.pdf");
|
||||||
|
MemoryStats.printMemoryStats();
|
||||||
|
AnalyzeResult result = redactionController.analyze(request);
|
||||||
|
assertThat(result).isNotNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testLargeScannedFileOOM() {
|
public void testLargeScannedFileOOM() {
|
||||||
AnalyzeRequest request = prepareStorage("scanned/VV-377031.pdf");
|
AnalyzeRequest request = prepareStorage("scanned/VV-377031.pdf");
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user