Merge branch 'SPIKE-LLM_NER' into 'master'

Spike: LLM NER

See merge request redactmanager/persistence-service!699
This commit is contained in:
Kilian Schüttler 2024-08-27 10:28:29 +02:00
commit bb4af4c1ce
17 changed files with 382 additions and 28 deletions

View File

@ -42,6 +42,7 @@ dependencies {
exclude(group = "com.iqser.red.service", module = "persistence-service-internal-api-v1")
exclude(group = "com.iqser.red.service", module = "persistence-service-shared-api-v1")
}
implementation("com.knecon.fforesight:llm-service-api:1.10.0")
api("com.knecon.fforesight:jobs-commons:0.10.0")
api("com.knecon.fforesight:database-tenant-commons:0.24.0")
api("com.knecon.fforesight:keycloak-commons:0.29.0")

View File

@ -8,6 +8,8 @@ import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.knecon.fforesight.llm.service.QueueNames;
import lombok.RequiredArgsConstructor;
@Configuration
@ -54,6 +56,14 @@ public class MessagingConfiguration {
public static final String NER_SERVICE_RESPONSE_QUEUE = "entity_response_queue";
public static final String NER_SERVICE_DLQ = "entity_dead_letter_queue";
public static final String CHUNKING_SERVICE_QUEUE = "chunking_service_request_queue";
public static final String CHUNKING_SERVICE_RESPONSE_QUEUE = "chunking_service_response_queue";
public static final String CHUNKING_SERVICE_DLQ = "chunking_service_dlq";
public static final String LLM_NER_SERVICE_QUEUE = QueueNames.LLM_NER_SERVICE_QUEUE;
public static final String LLM_NER_SERVICE_RESPONSE_QUEUE = QueueNames.LLM_NER_SERVICE_RESPONSE_QUEUE;
public static final String LLM_NER_SERVICE_DLQ = QueueNames.LLM_NER_SERVICE_DLQ;
public static final String AZURE_NER_SERVICE_QUEUE = "azure_entity_request_queue";
public static final String AZURE_NER_SERVICE_RESPONSE_QUEUE = "azure_entity_response_queue";
public static final String AZURE_NER_SERVICE_DLQ = "azure_entity_dead_letter_queue";
@ -77,7 +87,6 @@ public class MessagingConfiguration {
public static final String X_ERROR_INFO_HEADER = "x-error-message";
public static final String X_ERROR_INFO_TIMESTAMP_HEADER = "x-error-message-timestamp";
public static final String LAYOUT_PARSING_DLQ = "layout_parsing_dead_letter_queue";
// --- Saas Migration, can be removed later ----
@ -156,6 +165,54 @@ public class MessagingConfiguration {
}
@Bean
public Queue llmNerRequestQueue() {
return QueueBuilder.durable(LLM_NER_SERVICE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", LLM_NER_SERVICE_DLQ).build();
}
@Bean
public Queue llmNerResponseQueue() {
return QueueBuilder.durable(LLM_NER_SERVICE_RESPONSE_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", LLM_NER_SERVICE_DLQ)
.build();
}
@Bean
public Queue llmNerResponseDLQ() {
return QueueBuilder.durable(LLM_NER_SERVICE_DLQ).build();
}
@Bean
public Queue chunkingRequestQueue() {
return QueueBuilder.durable(CHUNKING_SERVICE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", CHUNKING_SERVICE_DLQ).build();
}
@Bean
public Queue chunkingResponseQueue() {
return QueueBuilder.durable(CHUNKING_SERVICE_RESPONSE_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", CHUNKING_SERVICE_DLQ)
.build();
}
@Bean
public Queue chunkingDLQ() {
return QueueBuilder.durable(CHUNKING_SERVICE_DLQ).build();
}
@Bean
public Queue imageRequestQueue() {
@ -276,19 +333,18 @@ public class MessagingConfiguration {
.build();
}
@Bean
public Queue ocrResponseQueue() {
return QueueBuilder.durable(OCR_RESPONSE_QUEUE)
.build();
return QueueBuilder.durable(OCR_RESPONSE_QUEUE).build();
}
@Bean
public Queue ocrDLQ() {
return QueueBuilder.durable(OCR_DLQ)
.build();
return QueueBuilder.durable(OCR_DLQ).build();
}

View File

@ -22,10 +22,12 @@ public class FileExchangeNames {
public static Definition STRUCTURE = new Definition(FileType.DOCUMENT_STRUCTURE);
public static Definition PAGES = new Definition(FileType.DOCUMENT_PAGES);
public static Definition CHUNKS = new Definition(FileType.DOCUMENT_CHUNKS);
public static Definition TEXT = new Definition(FileType.DOCUMENT_TEXT);
public static Definition POSITIONS = new Definition(FileType.DOCUMENT_POSITION);
public static Definition SIMPLIFIED_TEXT = new Definition(FileType.SIMPLIFIED_TEXT);
public static Definition NER_ENTITIES = new Definition(FileType.NER_ENTITIES);
public static Definition LLM_NER_ENTITIES = new Definition(FileType.LLM_NER_ENTITIES);
public static Definition AZURE_NER_ENTITIES = new Definition(FileType.AZURE_NER_ENTITIES);
public static Definition TABLES = new Definition(FileType.TABLES);

View File

@ -62,8 +62,10 @@ public class FileExportService {
addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.POSITIONS);
addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.PAGES);
addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.NER_ENTITIES);
addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.LLM_NER_ENTITIES);
addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.AZURE_NER_ENTITIES);
addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.SIMPLIFIED_TEXT);
addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.CHUNKS);
}
addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.FIGURE);

View File

@ -100,6 +100,7 @@ public class FileExchangeArchiveReader {
FileExchangeNames.SIMPLIFIED_TEXT,
FileExchangeNames.NER_ENTITIES,
FileExchangeNames.AZURE_NER_ENTITIES,
FileExchangeNames.LLM_NER_ENTITIES,
FileExchangeNames.TABLES,
FileExchangeNames.IMAGES,
FileExchangeNames.VISUAL_LAYOUT,
@ -107,6 +108,7 @@ public class FileExchangeArchiveReader {
FileExchangeNames.HIGHLIGHTS,
FileExchangeNames.FIGURE,
FileExchangeNames.ORIGIN,
FileExchangeNames.CHUNKS,
FileExchangeNames.UNTOUCHED,
FileExchangeNames.VIEWER);
if (optionalFileType.isPresent()) {

View File

@ -0,0 +1,48 @@
package com.iqser.red.service.persistence.management.v1.processor.model;
import java.util.HashMap;
import java.util.Map;
import com.iqser.red.service.persistence.management.v1.processor.utils.StorageIdUtils;
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileType;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.FieldDefaults;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@FieldDefaults(level = AccessLevel.PRIVATE)
public class PyInfraRequest {
Map<String, String> targetFilePath = new HashMap<>();
String responseFilePath;
String dossierUuid; // can't be called dossierId, as pyInfra gets confused
String fileUuid; // can't be called fileId, as pyInfra gets confused
public PyInfraRequest(String dossierUuid, String fileUuid) {
this.dossierUuid = dossierUuid;
this.fileUuid = fileUuid;
this.targetFilePath = new HashMap<>();
}
public void addTargetFilePath(String key, FileType fileType) {
targetFilePath.put(key, StorageIdUtils.getStorageId(dossierUuid, fileUuid, fileType) + ".gz");
}
public void setResponseFilePath(FileType fileType) {
responseFilePath = StorageIdUtils.getStorageId(dossierUuid, fileUuid, fileType) + ".gz";
}
}

View File

@ -1,10 +1,12 @@
package com.iqser.red.service.persistence.management.v1.processor.service;
import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.CHUNKING_SERVICE_QUEUE;
import static com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_QUEUE;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@ -14,6 +16,8 @@ import com.iqser.red.service.persistence.management.v1.processor.exception.NotFo
import com.iqser.red.service.persistence.management.v1.processor.model.websocket.AnalyseStatus;
import com.iqser.red.service.persistence.management.v1.processor.model.websocket.FileEventType;
import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.ChunkingRequestFactory;
import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.QueueMessageIdentifierService;
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DossierTemplatePersistenceService;
import org.apache.commons.lang3.StringUtils;
@ -50,6 +54,7 @@ import com.iqser.red.service.persistence.management.v1.processor.service.persist
import com.iqser.red.service.persistence.management.v1.processor.service.websocket.WebsocketService;
import com.iqser.red.service.persistence.management.v1.processor.settings.FileManagementServiceSettings;
import com.iqser.red.service.persistence.management.v1.processor.utils.FileModelMapper;
import com.iqser.red.service.persistence.management.v1.processor.utils.StorageIdUtils;
import com.iqser.red.service.persistence.service.v1.api.shared.model.AnalyzeRequest;
import com.iqser.red.service.persistence.service.v1.api.shared.model.AnalyzeResult;
import com.iqser.red.service.persistence.service.v1.api.shared.model.FileAttribute;
@ -62,6 +67,7 @@ import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemp
import com.iqser.red.service.persistence.service.v1.api.shared.mongo.service.ComponentLogMongoService;
import com.iqser.red.service.persistence.service.v1.api.shared.mongo.service.EntityLogMongoService;
import com.knecon.fforesight.databasetenantcommons.providers.utils.MagicConverter;
import com.knecon.fforesight.llm.service.LlmNerMessage;
import com.knecon.fforesight.service.ocr.v1.api.model.DocumentRequest;
import jakarta.transaction.Transactional;
@ -284,6 +290,22 @@ public class FileStatusService {
sendReadOnlyAnalysisEvent(dossierId, fileId, fileEntity);
return;
}
if (settings.isLlmNerServiceEnabled() && !fileManagementStorageService.objectExists(dossierId, fileId, FileType.DOCUMENT_CHUNKS)) {
var chunkingRequest = ChunkingRequestFactory.createChunkingRequest(dossierId, fileId);
setStatusFullProcessing(fileId);
log.info("Sending request to {} for {}/{}", CHUNKING_SERVICE_QUEUE, dossierId, fileId);
rabbitTemplate.convertAndSend(CHUNKING_SERVICE_QUEUE, chunkingRequest);
sendReadOnlyAnalysisEvent(dossierId, fileId, fileEntity);
return;
}
if (settings.isLlmNerServiceEnabled() && !fileManagementStorageService.objectExists(dossierId, fileId, FileType.LLM_NER_ENTITIES)) {
log.info("Add file: {} from dossier {} to NER queue", fileId, dossierId);
addToLLMNerQueue(dossierId, fileId);
sendReadOnlyAnalysisEvent(dossierId, fileId, fileEntity);
return;
}
if (settings.isAzureNerServiceEnabled() && !fileManagementStorageService.objectExists(dossierId, fileId, FileType.AZURE_NER_ENTITIES)) {
log.info("Add file: {} from dossier {} to AZURE NER queue", fileId, dossierId);
addToAzureNerQueue(dossierId, fileId);
@ -488,6 +510,26 @@ public class FileStatusService {
}
protected void addToLLMNerQueue(String dossierId, String fileId) {
setStatusNerAnalyzing(fileId);
rabbitTemplate.convertAndSend(MessagingConfiguration.LLM_NER_SERVICE_QUEUE,
LlmNerMessage.builder()
.identifier(QueueMessageIdentifierService.buildIdentifier(dossierId, fileId, false))
.chunksStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.DOCUMENT_CHUNKS))
.documentPagesStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.DOCUMENT_PAGES))
.documentStructureStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.DOCUMENT_STRUCTURE))
.documentTextStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.DOCUMENT_TEXT))
.documentPositionStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.DOCUMENT_POSITION))
.resultStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.LLM_NER_ENTITIES))
.build(),
message -> {
message.getMessageProperties().setPriority(1);
return message;
});
}
protected void addToAzureNerQueue(String dossierId, String fileId) {
setStatusNerAnalyzing(fileId);
@ -773,6 +815,7 @@ public class FileStatusService {
fileManagementStorageService.deleteObject(dossierId, fileId, FileType.DOCUMENT_TEXT);
fileManagementStorageService.deleteObject(dossierId, fileId, FileType.DOCUMENT_STRUCTURE);
fileManagementStorageService.deleteObject(dossierId, fileId, FileType.NER_ENTITIES);
fileManagementStorageService.deleteObject(dossierId, fileId, FileType.LLM_NER_ENTITIES);
}
addToAnalysisQueue(dossierId, fileId, priority, Sets.newHashSet(), false);

View File

@ -0,0 +1,28 @@
package com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing;
import com.iqser.red.service.persistence.management.v1.processor.model.PyInfraRequest;
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileType;
import lombok.experimental.UtilityClass;
@UtilityClass
public class ChunkingRequestFactory {
private static final String PAGES = "pages";
private static final String TEXT_BLOCKS = "textBlocks";
private static final String TEXT_POSITIONS = "positions";
private static final String STRUCTURE = "structure";
public PyInfraRequest createChunkingRequest(String dossierId, String fileId) {
PyInfraRequest pyInfraRequest = new PyInfraRequest(dossierId, fileId);
pyInfraRequest.addTargetFilePath(PAGES, FileType.DOCUMENT_PAGES);
pyInfraRequest.addTargetFilePath(TEXT_BLOCKS, FileType.DOCUMENT_TEXT);
pyInfraRequest.addTargetFilePath(TEXT_POSITIONS, FileType.DOCUMENT_POSITION);
pyInfraRequest.addTargetFilePath(STRUCTURE, FileType.DOCUMENT_STRUCTURE);
pyInfraRequest.setResponseFilePath(FileType.DOCUMENT_CHUNKS);
return pyInfraRequest;
}
}

View File

@ -0,0 +1,25 @@
package com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing;
import java.util.List;
import java.util.Map;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.FieldDefaults;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@FieldDefaults(level = AccessLevel.PRIVATE)
public class ChunkingResponse {
Map<String, String> targetFilePath;
String responseFilePath;
List<ChunkingResponseData> data;
}

View File

@ -0,0 +1,26 @@
package com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing;
import java.util.List;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.FieldDefaults;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@FieldDefaults(level = AccessLevel.PRIVATE)
public class ChunkingResponseData {
Integer chunkId;
String text;
List<String> types;
List<List<Integer>> treeIds;
float[] embedding;
Integer tokenCount;
}

View File

@ -5,7 +5,6 @@ import java.util.Optional;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.DossierEntity;
import com.iqser.red.service.persistence.management.v1.processor.service.FileManagementStorageService;
import com.iqser.red.service.persistence.management.v1.processor.settings.FileManagementServiceSettings;
import com.iqser.red.service.persistence.management.v1.processor.utils.StorageIdUtils;
@ -23,7 +22,7 @@ public class LayoutParsingRequestFactory {
private String applicationType;
private final FileManagementStorageService fileManagementStorageService;
private final LayoutParsingRequestIdentifierService layoutParsingRequestIdentifierService;
private final FileManagementServiceSettings fileManagementServiceSettings;
@ -46,7 +45,7 @@ public class LayoutParsingRequestFactory {
return LayoutParsingRequest.builder()
.layoutParsingType(type)
.identifier(layoutParsingRequestIdentifierService.buildIdentifier(dossierId, fileId, priority))
.identifier(QueueMessageIdentifierService.buildIdentifier(dossierId, fileId, priority))
.originFileStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.ORIGIN))
.imagesFileStorageId(optionalImageFileId)
.tablesFileStorageId(optionalTableFileId)

View File

@ -4,10 +4,10 @@ import java.util.Map;
import org.springframework.stereotype.Service;
import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.DossierEntity;
import lombok.experimental.UtilityClass;
@Service
public class LayoutParsingRequestIdentifierService {
@UtilityClass
public class QueueMessageIdentifierService {
private enum IdentifierNames {
DOSSIER_ID,

View File

@ -0,0 +1,84 @@
package com.iqser.red.service.persistence.management.v1.processor.service.queue;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
import com.iqser.red.service.persistence.management.v1.processor.model.PyInfraRequest;
import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusProcessingUpdateService;
import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusService;
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileErrorInfo;
import io.micrometer.observation.ObservationRegistry;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.experimental.FieldDefaults;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
@RequiredArgsConstructor
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class ChunkingMessageReceiver {
FileStatusService fileStatusService;
ObservationRegistry observationRegistry;
FileStatusProcessingUpdateService fileStatusProcessingUpdateService;
ObjectMapper mapper;
@RabbitListener(queues = MessagingConfiguration.CHUNKING_SERVICE_RESPONSE_QUEUE)
public void receive(Message message) {
PyInfraRequest pyInfraRequest = parsePyInfraRequest(message);
addFileIdToTrace(pyInfraRequest.getFileUuid());
log.info("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.CHUNKING_SERVICE_RESPONSE_QUEUE, pyInfraRequest.getDossierUuid(), pyInfraRequest.getFileUuid());
fileStatusService.setStatusAnalyse(pyInfraRequest.getDossierUuid(), pyInfraRequest.getFileUuid(), false);
}
@RabbitListener(queues = MessagingConfiguration.CHUNKING_SERVICE_DLQ)
public void handleDLQ(Message message) {
PyInfraRequest pyInfraRequest = parsePyInfraRequest(message);
addFileIdToTrace(pyInfraRequest.getFileUuid());
log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.CHUNKING_SERVICE_DLQ, pyInfraRequest.getDossierUuid(), pyInfraRequest.getFileUuid());
fileStatusProcessingUpdateService.analysisFailed(pyInfraRequest.getDossierUuid(),
pyInfraRequest.getFileUuid(),
new FileErrorInfo("chunking service failed",
MessagingConfiguration.CHUNKING_SERVICE_DLQ,
"junker-chunker-service",
OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS)));
}
private PyInfraRequest parsePyInfraRequest(Message message) {
PyInfraRequest pyInfraRequest = null;
try {
pyInfraRequest = mapper.readValue(message.getBody(), PyInfraRequest.class);
} catch (IOException e) {
log.error("Could not decode message {}", new String(message.getBody()));
throw new RuntimeException(e);
}
return pyInfraRequest;
}
// This can be removed if tracing is implemented in python services.
private void addFileIdToTrace(String fileId) {
if (observationRegistry.getCurrentObservation() != null) {
observationRegistry.getCurrentObservation().highCardinalityKeyValue("fileId", fileId);
}
}
}

View File

@ -17,7 +17,7 @@ import com.iqser.red.service.persistence.management.v1.processor.service.FileSta
import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusService;
import com.iqser.red.service.persistence.management.v1.processor.service.ImageSimilarityService;
import com.iqser.red.service.persistence.management.v1.processor.service.websocket.WebsocketService;
import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.LayoutParsingRequestIdentifierService;
import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.QueueMessageIdentifierService;
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.SaasMigrationStatusPersistenceService;
import com.iqser.red.service.persistence.management.v1.processor.utils.StorageIdUtils;
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileErrorInfo;
@ -38,7 +38,6 @@ public class LayoutParsingFinishedMessageReceiver {
private final FileStatusService fileStatusService;
private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService;
private final ObjectMapper objectMapper;
private final LayoutParsingRequestIdentifierService layoutParsingRequestIdentifierService;
private final SaasMigrationStatusPersistenceService saasMigrationStatusPersistenceService;
private final SaasMigrationService saasMigrationService;
private final ImageSimilarityService imageSimilarityService;
@ -49,22 +48,23 @@ public class LayoutParsingFinishedMessageReceiver {
@RabbitListener(queues = LayoutParsingQueueNames.LAYOUT_PARSING_FINISHED_EVENT_QUEUE)
public void receive(LayoutParsingFinishedEvent response) {
if (saasMigrationStatusPersistenceService.isMigrating(layoutParsingRequestIdentifierService.parseFileId(response.identifier()))) {
saasMigrationService.handleLayoutParsingFinished(layoutParsingRequestIdentifierService.parseDossierId(response.identifier()),
layoutParsingRequestIdentifierService.parseFileId(response.identifier()));
var dossierId = QueueMessageIdentifierService.parseDossierId(response.identifier());
var fileId = QueueMessageIdentifierService.parseFileId(response.identifier());
log.info("Layout parsing has finished for {}/{} in {}", dossierId, fileId, LayoutParsingQueueNames.LAYOUT_PARSING_FINISHED_EVENT_QUEUE);
if (saasMigrationStatusPersistenceService.isMigrating(QueueMessageIdentifierService.parseFileId(response.identifier()))) {
saasMigrationService.handleLayoutParsingFinished(QueueMessageIdentifierService.parseDossierId(response.identifier()),
QueueMessageIdentifierService.parseFileId(response.identifier()));
return;
}
var templateId = "";
var dossierId = layoutParsingRequestIdentifierService.parseDossierId(response.identifier());
var fileId = layoutParsingRequestIdentifierService.parseFileId(response.identifier());
var storageId = StorageIdUtils.getStorageId(dossierId, fileId, FileType.DOCUMENT_STRUCTURE);
fileStatusService.setStatusAnalyse(layoutParsingRequestIdentifierService.parseDossierId(response.identifier()),
layoutParsingRequestIdentifierService.parseFileId(response.identifier()),
layoutParsingRequestIdentifierService.parsePriority(response.identifier()));
fileStatusService.setStatusAnalyse(QueueMessageIdentifierService.parseDossierId(response.identifier()),
QueueMessageIdentifierService.parseFileId(response.identifier()),
QueueMessageIdentifierService.parsePriority(response.identifier()));
fileStatusService.updateLayoutProcessedTime(layoutParsingRequestIdentifierService.parseFileId(response.identifier()));
fileStatusService.updateLayoutProcessedTime(QueueMessageIdentifierService.parseFileId(response.identifier()));
websocketService.sendAnalysisEvent(dossierId, fileId, AnalyseStatus.LAYOUT_UPDATE, fileStatusService.getStatus(fileId).getNumberOfAnalyses() + 1);
try {
@ -74,7 +74,6 @@ public class LayoutParsingFinishedMessageReceiver {
throw e;
}
log.info("Received message {} in {}", response, LayoutParsingQueueNames.LAYOUT_PARSING_FINISHED_EVENT_QUEUE);
}
@ -88,9 +87,9 @@ public class LayoutParsingFinishedMessageReceiver {
if (errorCause == null) {
errorCause = "Error occured during layout parsing!";
}
if (saasMigrationStatusPersistenceService.isMigrating(layoutParsingRequestIdentifierService.parseFileId(analyzeRequest.identifier()))) {
saasMigrationService.handleError(layoutParsingRequestIdentifierService.parseDossierId(analyzeRequest.identifier()),
layoutParsingRequestIdentifierService.parseFileId(analyzeRequest.identifier()),
if (saasMigrationStatusPersistenceService.isMigrating(QueueMessageIdentifierService.parseFileId(analyzeRequest.identifier()))) {
saasMigrationService.handleError(QueueMessageIdentifierService.parseDossierId(analyzeRequest.identifier()),
QueueMessageIdentifierService.parseFileId(analyzeRequest.identifier()),
errorCause,
LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_QUEUE);
return;
@ -99,8 +98,8 @@ public class LayoutParsingFinishedMessageReceiver {
OffsetDateTime timestamp = failedMessage.getMessageProperties().getHeader(MessagingConfiguration.X_ERROR_INFO_TIMESTAMP_HEADER);
timestamp = timestamp != null ? timestamp : OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS);
log.info("Failed to process layout parsing request, errorCause: {}, timestamp: {}", errorCause, timestamp);
fileStatusProcessingUpdateService.analysisFailed(layoutParsingRequestIdentifierService.parseDossierId(analyzeRequest.identifier()),
layoutParsingRequestIdentifierService.parseFileId(analyzeRequest.identifier()),
fileStatusProcessingUpdateService.analysisFailed(QueueMessageIdentifierService.parseDossierId(analyzeRequest.identifier()),
QueueMessageIdentifierService.parseFileId(analyzeRequest.identifier()),
new FileErrorInfo(errorCause, LAYOUT_PARSING_DLQ, "layoutparser-service", timestamp));
}

View File

@ -14,7 +14,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusProcessingUpdateService;
import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusService;
import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.QueueMessageIdentifierService;
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileErrorInfo;
import com.knecon.fforesight.llm.service.LlmNerMessage;
import com.knecon.fforesight.llm.service.LlmNerResponseMessage;
import io.micrometer.observation.ObservationRegistry;
import lombok.RequiredArgsConstructor;
@ -48,6 +51,19 @@ public class NerMessageReceiver {
}
@SneakyThrows
@RabbitListener(queues = MessagingConfiguration.LLM_NER_SERVICE_RESPONSE_QUEUE)
public void receiveLLM(LlmNerResponseMessage message) {
String dossierId = QueueMessageIdentifierService.parseDossierId(message.getIdentifier());
String fileId = QueueMessageIdentifierService.parseFileId(message.getIdentifier());
addFileIdToTrace(fileId);
log.info("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.LLM_NER_SERVICE_RESPONSE_QUEUE, dossierId, fileId);
fileStatusService.setStatusAnalyse(dossierId, fileId, false);
}
@RabbitListener(queues = MessagingConfiguration.NER_SERVICE_DLQ)
public void handleDLQMessage(Message failedMessage) throws IOException {
@ -67,6 +83,26 @@ public class NerMessageReceiver {
}
@RabbitListener(queues = MessagingConfiguration.LLM_NER_SERVICE_DLQ)
public void handleLLMDLQMessage(Message failedMessage) throws IOException {
LlmNerMessage entityResponse = objectMapper.readValue(failedMessage.getBody(), LlmNerMessage.class);
String dossierId = QueueMessageIdentifierService.parseDossierId(entityResponse.getIdentifier());
String fileId = QueueMessageIdentifierService.parseDossierId(entityResponse.getIdentifier());
addFileIdToTrace(fileId);
log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.LLM_NER_SERVICE_DLQ, dossierId, fileId);
fileStatusProcessingUpdateService.analysisFailed(dossierId,
fileId,
new FileErrorInfo("llm ner service failed",
MessagingConfiguration.LLM_NER_SERVICE_DLQ,
"entity-recognition-service-v3",
OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS)));
}
// This can be removed if tracing is implemented in python services.
private void addFileIdToTrace(String fileId) {

View File

@ -25,6 +25,7 @@ public class FileManagementServiceSettings {
private boolean imageServiceEnabled = true;
private boolean nerServiceEnabled = true;
private boolean azureNerServiceEnabled;
private boolean llmNerServiceEnabled;
private boolean visualLayoutParsingEnabled;
private boolean storeImageFile = true;

View File

@ -11,6 +11,7 @@ public enum FileType {
SIMPLIFIED_TEXT(".json"),
TEXT(".json"), // deprecated file type, only present in legacy migrations
NER_ENTITIES(".json"),
LLM_NER_ENTITIES(".json"),
AZURE_NER_ENTITIES(".json"),
IMAGE_INFO(".json"),
IMPORTED_REDACTIONS(".json"),
@ -26,6 +27,7 @@ public enum FileType {
DOCUMENT_STRUCTURE(".json"),
DOCUMENT_POSITION(".json"),
DOCUMENT_PAGES(".json"),
DOCUMENT_CHUNKS(".json"),
ENTITY_LOG(".json"),
ENTITY_LOG_BAK(".json"),
COMPONENT_LOG(".json"),