DM-410: Added timeout to Drools execution

This commit is contained in:
Dominique Eifländer 2023-09-08 09:00:26 +02:00
parent e21515979b
commit 15df495507
11 changed files with 169 additions and 141 deletions

View File

@ -16,7 +16,7 @@ val layoutParserVersion = "0.25.0"
val jacksonVersion = "2.15.2"
val droolsVersion = "8.43.0.Final"
val pdfBoxVersion = "3.0.0-alpha2"
val persistenceServiceVersion = "2.156.0"
val persistenceServiceVersion = "2.160.0"
configurations {
all {
@ -34,8 +34,8 @@ dependencies {
implementation("com.iqser.red.commons:metric-commons:2.3.0")
implementation("com.iqser.red.commons:dictionary-merge-commons:1.5.0")
implementation("com.iqser.red.commons:storage-commons:2.40.0")
implementation("com.knecon.fforesight:tenant-commons:0.10.0")
implementation("com.iqser.red.commons:storage-commons:2.43.0")
implementation("com.knecon.fforesight:tenant-commons:0.13.0")
implementation("com.fasterxml.jackson.module:jackson-module-afterburner:${jacksonVersion}")
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}")

View File

@ -0,0 +1,24 @@
package com.iqser.red.service.redaction.v1.server.exception;
import lombok.Data;
@Data
public class DroolsTimeoutException extends RuntimeException {
private static final String DROOLS_TIMEOUT_MESSAGE = "Timeout during rules execution. Possible endless loop?";
private boolean reported;
public DroolsTimeoutException(Throwable cause, boolean reported) {
super(DROOLS_TIMEOUT_MESSAGE, cause);
this.reported = reported;
}
public DroolsTimeoutException(boolean reported) {
super(DROOLS_TIMEOUT_MESSAGE);
this.reported = reported;
}
}

View File

@ -1,32 +1,28 @@
package com.iqser.red.service.redaction.v1.server.queue;
import static com.iqser.red.service.redaction.v1.server.queue.MessagingConfiguration.REDACTION_DQL;
import static com.iqser.red.service.redaction.v1.server.queue.MessagingConfiguration.REDACTION_PRIORITY_QUEUE;
import static com.iqser.red.service.redaction.v1.server.queue.MessagingConfiguration.REDACTION_QUEUE;
import static com.iqser.red.service.redaction.v1.server.queue.MessagingConfiguration.X_ERROR_INFO_HEADER;
import static com.iqser.red.service.redaction.v1.server.queue.MessagingConfiguration.X_ERROR_INFO_TIMESTAMP_HEADER;
import static java.lang.String.format;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iqser.red.service.persistence.service.v1.api.shared.model.AnalyzeRequest;
import com.iqser.red.service.persistence.service.v1.api.shared.model.AnalyzeResult;
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileErrorInfo;
import com.iqser.red.service.redaction.v1.server.client.FileStatusProcessingUpdateClient;
import com.iqser.red.service.redaction.v1.server.client.RulesClient;
import com.iqser.red.service.redaction.v1.server.exception.DroolsTimeoutException;
import com.iqser.red.service.redaction.v1.server.redaction.service.AnalyzeService;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import static com.iqser.red.service.redaction.v1.server.queue.MessagingConfiguration.*;
import static java.lang.String.format;
@Slf4j
@Service
@ -36,6 +32,7 @@ public class RedactionMessageReceiver {
private final ObjectMapper objectMapper;
private final AnalyzeService analyzeService;
private final FileStatusProcessingUpdateClient fileStatusProcessingUpdateClient;
private final RulesClient rulesClient;
@SneakyThrows
@ -104,6 +101,11 @@ public class RedactionMessageReceiver {
fileStatusProcessingUpdateClient.analysisSuccessful(analyzeRequest.getDossierId(), analyzeRequest.getFileId(), result);
} catch (Exception e) {
if (e instanceof DroolsTimeoutException && !((DroolsTimeoutException) e).isReported()) {
rulesClient.setRulesTimeoutDetected(analyzeRequest.getDossierTemplateId());
}
log.warn("Failed to process analyze request: {}", analyzeRequest, e);
var timestamp = OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS);
fileStatusProcessingUpdateClient.analysisFailed(analyzeRequest.getDossierId(),

View File

@ -71,6 +71,10 @@ public class AnalyzeService {
public AnalyzeResult analyze(AnalyzeRequest analyzeRequest) {
long startTime = System.currentTimeMillis();
var wrapper = droolsExecutionService.getLatestKieContainer(analyzeRequest.getDossierTemplateId());
log.info("Updated Rules to Version {} for file {} in dossier {}", wrapper.rulesVersion(), analyzeRequest.getFileId(), analyzeRequest.getDossierId());
Document document = DocumentGraphMapper.toDocumentGraph(redactionStorageService.getDocumentData(analyzeRequest.getDossierId(), analyzeRequest.getFileId()));
log.info("Loaded Document Graph for file {} in dossier {}", analyzeRequest.getFileId(), analyzeRequest.getDossierId());
@ -81,8 +85,6 @@ public class AnalyzeService {
Dictionary dictionary = dictionaryService.getDeepCopyDictionary(analyzeRequest.getDossierTemplateId(), analyzeRequest.getDossierId());
log.info("Updated Dictionaries for file {} in dossier {}", analyzeRequest.getFileId(), analyzeRequest.getDossierId());
var wrapper = droolsExecutionService.getLatestKieContainer(analyzeRequest.getDossierTemplateId());
log.info("Updated Rules to Version {} for file {} in dossier {}", wrapper.rulesVersion(), analyzeRequest.getFileId(), analyzeRequest.getDossierId());
List<ManualEntity> notFoundManualRedactionEntries = manualRedactionEntryService.addManualRedactionEntriesAndReturnNotFoundEntries(analyzeRequest, document);

View File

@ -5,7 +5,14 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.iqser.red.service.redaction.v1.server.exception.DroolsTimeoutException;
import com.iqser.red.service.redaction.v1.server.settings.RedactionServiceSettings;
import feign.FeignException;
import org.apache.commons.lang3.StringUtils;
import org.kie.api.KieServices;
import org.kie.api.builder.KieBuilder;
@ -15,6 +22,7 @@ import org.kie.api.runtime.KieContainer;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.rule.QueryResults;
import org.kie.api.runtime.rule.QueryResultsRow;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import com.iqser.red.service.persistence.service.v1.api.shared.model.FileAttribute;
@ -49,6 +57,8 @@ public class DroolsExecutionService {
DroolsSyntaxValidationFactory droolsSyntaxValidationFactory;
RedactionServiceSettings settings;
@Timed("redactmanager_executeRules")
public List<FileAttribute> executeRules(KieContainer kieContainer,
@ -99,7 +109,24 @@ public class DroolsExecutionService {
kieSession.insert(nerEntities);
kieSession.getAgenda().getAgendaGroup("LOCAL_DICTIONARY_ADDS").setFocus();
kieSession.fireAllRules();
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
kieSession.fireAllRules();
return null;
});
try {
completableFuture.orTimeout(settings.getDroolsExecutionTimeoutSecs(), TimeUnit.SECONDS).get();
} catch (ExecutionException e) {
kieSession.dispose();
if(e.getCause() instanceof TimeoutException){
throw new DroolsTimeoutException(e, false);
}
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
List<FileAttribute> resultingFileAttributes = getFileAttributes(kieSession);
kieSession.dispose();
return resultingFileAttributes;
@ -119,10 +146,16 @@ public class DroolsExecutionService {
public KieWrapper getLatestKieContainer(String dossierTemplateId) {
long version = rulesClient.getVersion(dossierTemplateId);
long version = -1;
try {
version = rulesClient.getVersion(dossierTemplateId);
} catch (FeignException fe) {
if (fe.status() == HttpStatus.UNPROCESSABLE_ENTITY.value()) {
throw new DroolsTimeoutException(fe.getCause(), true);
}
}
return new KieWrapper(getKieContainer(dossierTemplateId, version), version);
}

View File

@ -28,4 +28,6 @@ public class RedactionServiceSettings {
private int dictionaryCacheExpireAfterAccessDays = 3;
private int droolsExecutionTimeoutSecs = 300;
}

View File

@ -1,29 +1,5 @@
package com.iqser.red.service.redaction.v1.server;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.InputStream;
import java.net.URL;
import java.nio.file.Files;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterEach;
import org.mockito.stubbing.Answer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.core.io.ClassPathResource;
import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iqser.red.service.dictionarymerge.commons.DictionaryEntry;
import com.iqser.red.service.persistence.service.v1.api.shared.model.AnalyzeRequest;
@ -46,8 +22,23 @@ import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsi
import com.knecon.fforesight.service.layoutparser.processor.LayoutParsingPipeline;
import com.knecon.fforesight.tenantcommons.TenantContext;
import com.knecon.fforesight.tenantcommons.TenantsClient;
import lombok.SneakyThrows;
import org.junit.jupiter.api.AfterEach;
import org.mockito.stubbing.Answer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.core.io.ClassPathResource;
import java.io.File;
import java.io.InputStream;
import java.net.URL;
import java.nio.file.Files;
import java.time.OffsetDateTime;
import java.util.*;
import java.util.stream.Collectors;
import static org.mockito.Mockito.when;
public abstract class AbstractRedactionIntegrationTest {
@ -115,9 +106,6 @@ public abstract class AbstractRedactionIntegrationTest {
@Autowired
private LayoutParsingPipeline layoutParsingPipeline;
@MockBean
protected AmazonS3 amazonS3;
@MockBean
protected RabbitTemplate rabbitTemplate;

View File

@ -1,8 +1,13 @@
package com.iqser.red.service.redaction.v1.server;
import static java.io.File.createTempFile;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iqser.red.commons.jackson.ObjectMapperFactory;
import com.iqser.red.storage.commons.exception.StorageObjectDoesNotExist;
import com.iqser.red.storage.commons.service.StorageService;
import lombok.SneakyThrows;
import org.apache.commons.io.IOUtils;
import org.springframework.core.io.InputStreamResource;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@ -12,16 +17,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import org.apache.commons.io.IOUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iqser.red.commons.jackson.ObjectMapperFactory;
import com.iqser.red.storage.commons.exception.StorageObjectDoesNotExist;
import com.iqser.red.storage.commons.service.StorageService;
import lombok.SneakyThrows;
import static java.io.File.createTempFile;
public class FileSystemBackedStorageService implements StorageService {
@ -45,6 +42,12 @@ public class FileSystemBackedStorageService implements StorageService {
IOUtils.copy(new FileInputStream(res), new FileOutputStream(destinationFile));
}
@Override
@SneakyThrows
public InputStreamResource getObject(String tenantId, String objectId) {
return new InputStreamResource(new FileInputStream(dataMap.get(objectId)));
}
@Override
public void deleteObject(String tenantId, String objectId) {

View File

@ -211,7 +211,7 @@ public class RedactionIntegrationTest extends AbstractRedactionIntegrationTest {
@Test
public void titleExtraction() throws IOException {
AnalyzeRequest request = uploadFileToStorage("files/new/crafted document.pdf");
AnalyzeRequest request = uploadFileToStorage("files/Metolachlor/S-Metolachlor_RAR_01_Volume_1_2018-09-06.pdf");
System.out.println("Start Full integration test");
analyzeDocumentStructure(LayoutParsingType.REDACT_MANAGER, request);
System.out.println("Finished structure analysis");

View File

@ -1,32 +1,34 @@
package com.iqser.red.service.redaction.v1.server;
import static java.util.Map.entry;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.math.BigInteger;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.iqser.red.service.dictionarymerge.commons.DictionaryEntry;
import com.iqser.red.service.persistence.service.v1.api.shared.model.AnalyzeRequest;
import com.iqser.red.service.persistence.service.v1.api.shared.model.common.JSONPrimitive;
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.configuration.Colors;
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileType;
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.type.Type;
import com.iqser.red.service.persistence.service.v1.api.shared.model.redactionlog.*;
import com.iqser.red.service.redaction.v1.server.client.DictionaryClient;
import com.iqser.red.service.redaction.v1.server.client.LegalBasisClient;
import com.iqser.red.service.redaction.v1.server.client.RulesClient;
import com.iqser.red.service.redaction.v1.server.controller.RedactionController;
import com.iqser.red.service.redaction.v1.server.redaction.service.AnalyzeService;
import com.iqser.red.service.redaction.v1.server.redaction.utils.ResourceLoader;
import com.iqser.red.service.redaction.v1.server.redaction.utils.TextNormalizationUtilities;
import com.iqser.red.service.redaction.v1.server.storage.RedactionStorageService;
import com.iqser.red.service.redaction.v1.server.utils.LayoutParsingRequestProvider;
import com.iqser.red.storage.commons.StorageAutoConfiguration;
import com.iqser.red.storage.commons.service.StorageService;
import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingFinishedEvent;
import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingType;
import com.knecon.fforesight.service.layoutparser.processor.LayoutParsingPipeline;
import com.knecon.fforesight.service.layoutparser.processor.LayoutParsingServiceProcessorConfiguration;
import com.knecon.fforesight.tenantcommons.TenantContext;
import com.knecon.fforesight.tenantcommons.TenantsClient;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -45,53 +47,26 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.FilterType;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.*;
import org.springframework.core.io.ClassPathResource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.iqser.red.service.dictionarymerge.commons.DictionaryEntry;
import com.iqser.red.service.persistence.service.v1.api.shared.model.AnalyzeRequest;
import com.iqser.red.service.persistence.service.v1.api.shared.model.common.JSONPrimitive;
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.configuration.Colors;
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileType;
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.type.Type;
import com.iqser.red.service.persistence.service.v1.api.shared.model.redactionlog.Change;
import com.iqser.red.service.persistence.service.v1.api.shared.model.redactionlog.Engine;
import com.iqser.red.service.persistence.service.v1.api.shared.model.redactionlog.ManualChange;
import com.iqser.red.service.persistence.service.v1.api.shared.model.redactionlog.Rectangle;
import com.iqser.red.service.persistence.service.v1.api.shared.model.redactionlog.RedactionLog;
import com.iqser.red.service.persistence.service.v1.api.shared.model.redactionlog.RedactionLogComment;
import com.iqser.red.service.persistence.service.v1.api.shared.model.redactionlog.RedactionLogEntry;
import com.iqser.red.service.persistence.service.v1.api.shared.model.redactionlog.RedactionLogLegalBasis;
import com.iqser.red.service.redaction.v1.server.client.DictionaryClient;
import com.iqser.red.service.redaction.v1.server.client.LegalBasisClient;
import com.iqser.red.service.redaction.v1.server.client.RulesClient;
import com.iqser.red.service.redaction.v1.server.controller.RedactionController;
import com.iqser.red.service.redaction.v1.server.redaction.service.AnalyzeService;
import com.iqser.red.service.redaction.v1.server.redaction.utils.ResourceLoader;
import com.iqser.red.service.redaction.v1.server.redaction.utils.TextNormalizationUtilities;
import com.iqser.red.service.redaction.v1.server.storage.RedactionStorageService;
import com.iqser.red.service.redaction.v1.server.utils.LayoutParsingRequestProvider;
import com.iqser.red.storage.commons.StorageAutoConfiguration;
import com.iqser.red.storage.commons.service.StorageService;
import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingFinishedEvent;
import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingType;
import com.knecon.fforesight.service.layoutparser.processor.LayoutParsingPipeline;
import com.knecon.fforesight.service.layoutparser.processor.LayoutParsingServiceProcessorConfiguration;
import com.knecon.fforesight.tenantcommons.TenantContext;
import com.knecon.fforesight.tenantcommons.TenantsClient;
import java.io.*;
import java.math.BigInteger;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.time.OffsetDateTime;
import java.util.*;
import java.util.stream.Collectors;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import static java.util.Map.entry;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
@Slf4j
@ExtendWith(SpringExtension.class)
@ -244,8 +219,6 @@ public class RulesTest {
@Autowired
private StorageService storageService;
@MockBean
private AmazonS3 amazonS3;
@MockBean
private RabbitTemplate rabbitTemplate;
@MockBean
private LegalBasisClient legalBasisClient;

View File

@ -3,6 +3,7 @@ package com.iqser.red.service.redaction.v1.server.redaction.service;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.iqser.red.service.redaction.v1.server.settings.RedactionServiceSettings;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockitoAnnotations;
@ -34,7 +35,7 @@ class DroolsExecutionServiceTest {
@SneakyThrows
void testRules() {
DroolsExecutionService droolsExecutionService = new DroolsExecutionService(rulesClient, entityEnrichmentService, new DroolsSyntaxValidationFactory());
DroolsExecutionService droolsExecutionService = new DroolsExecutionService(rulesClient, entityEnrichmentService, new DroolsSyntaxValidationFactory(), new RedactionServiceSettings());
var rulesFile = new ClassPathResource("drools/rules.drl");
String rulesString = new String(rulesFile.getInputStream().readAllBytes());
@ -47,7 +48,7 @@ class DroolsExecutionServiceTest {
@SneakyThrows
void testAllRules() {
DroolsExecutionService droolsExecutionService = new DroolsExecutionService(rulesClient, entityEnrichmentService, new DroolsSyntaxValidationFactory());
DroolsExecutionService droolsExecutionService = new DroolsExecutionService(rulesClient, entityEnrichmentService, new DroolsSyntaxValidationFactory(), new RedactionServiceSettings());
var rulesFile = new ClassPathResource("drools/all_rules.drl");
String rulesString = new String(rulesFile.getInputStream().readAllBytes());
@ -61,7 +62,7 @@ class DroolsExecutionServiceTest {
@SneakyThrows
void testCorruptedRules() {
DroolsExecutionService droolsExecutionService = new DroolsExecutionService(rulesClient, entityEnrichmentService, new DroolsSyntaxValidationFactory());
DroolsExecutionService droolsExecutionService = new DroolsExecutionService(rulesClient, entityEnrichmentService, new DroolsSyntaxValidationFactory(), new RedactionServiceSettings());
var rulesFile = new ClassPathResource("drools/rules.drl");
String rulesString = new String(rulesFile.getInputStream().readAllBytes());