Spike: LLM NER
This commit is contained in:
parent
9eb3ef0181
commit
a99b2e8165
@ -42,6 +42,7 @@ dependencies {
|
||||
exclude(group = "com.iqser.red.service", module = "persistence-service-internal-api-v1")
|
||||
exclude(group = "com.iqser.red.service", module = "persistence-service-shared-api-v1")
|
||||
}
|
||||
implementation("com.knecon.fforesight:llm-service-api:1.10.0")
|
||||
api("com.knecon.fforesight:jobs-commons:0.10.0")
|
||||
api("com.knecon.fforesight:database-tenant-commons:0.24.0")
|
||||
api("com.knecon.fforesight:keycloak-commons:0.29.0")
|
||||
|
||||
@ -8,6 +8,8 @@ import org.springframework.amqp.core.QueueBuilder;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import com.knecon.fforesight.llm.service.QueueNames;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
@Configuration
|
||||
@ -54,6 +56,14 @@ public class MessagingConfiguration {
|
||||
public static final String NER_SERVICE_RESPONSE_QUEUE = "entity_response_queue";
|
||||
public static final String NER_SERVICE_DLQ = "entity_dead_letter_queue";
|
||||
|
||||
public static final String CHUNKING_SERVICE_QUEUE = "chunking_service_request_queue";
|
||||
public static final String CHUNKING_SERVICE_RESPONSE_QUEUE = "chunking_service_response_queue";
|
||||
public static final String CHUNKING_SERVICE_DLQ = "chunking_service_dlq";
|
||||
|
||||
public static final String LLM_NER_SERVICE_QUEUE = QueueNames.LLM_NER_SERVICE_QUEUE;
|
||||
public static final String LLM_NER_SERVICE_RESPONSE_QUEUE = QueueNames.LLM_NER_SERVICE_RESPONSE_QUEUE;
|
||||
public static final String LLM_NER_SERVICE_DLQ = QueueNames.LLM_NER_SERVICE_DLQ;
|
||||
|
||||
public static final String AZURE_NER_SERVICE_QUEUE = "azure_entity_request_queue";
|
||||
public static final String AZURE_NER_SERVICE_RESPONSE_QUEUE = "azure_entity_response_queue";
|
||||
public static final String AZURE_NER_SERVICE_DLQ = "azure_entity_dead_letter_queue";
|
||||
@ -77,7 +87,6 @@ public class MessagingConfiguration {
|
||||
public static final String X_ERROR_INFO_HEADER = "x-error-message";
|
||||
public static final String X_ERROR_INFO_TIMESTAMP_HEADER = "x-error-message-timestamp";
|
||||
|
||||
|
||||
public static final String LAYOUT_PARSING_DLQ = "layout_parsing_dead_letter_queue";
|
||||
|
||||
// --- Saas Migration, can be removed later ----
|
||||
@ -156,6 +165,54 @@ public class MessagingConfiguration {
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue llmNerRequestQueue() {
|
||||
|
||||
return QueueBuilder.durable(LLM_NER_SERVICE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", LLM_NER_SERVICE_DLQ).build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue llmNerResponseQueue() {
|
||||
|
||||
return QueueBuilder.durable(LLM_NER_SERVICE_RESPONSE_QUEUE)
|
||||
.withArgument("x-dead-letter-exchange", "")
|
||||
.withArgument("x-dead-letter-routing-key", LLM_NER_SERVICE_DLQ)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue llmNerResponseDLQ() {
|
||||
|
||||
return QueueBuilder.durable(LLM_NER_SERVICE_DLQ).build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue chunkingRequestQueue() {
|
||||
|
||||
return QueueBuilder.durable(CHUNKING_SERVICE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", CHUNKING_SERVICE_DLQ).build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue chunkingResponseQueue() {
|
||||
|
||||
return QueueBuilder.durable(CHUNKING_SERVICE_RESPONSE_QUEUE)
|
||||
.withArgument("x-dead-letter-exchange", "")
|
||||
.withArgument("x-dead-letter-routing-key", CHUNKING_SERVICE_DLQ)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue chunkingDLQ() {
|
||||
|
||||
return QueueBuilder.durable(CHUNKING_SERVICE_DLQ).build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue imageRequestQueue() {
|
||||
|
||||
@ -276,19 +333,18 @@ public class MessagingConfiguration {
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue ocrResponseQueue() {
|
||||
|
||||
return QueueBuilder.durable(OCR_RESPONSE_QUEUE)
|
||||
.build();
|
||||
return QueueBuilder.durable(OCR_RESPONSE_QUEUE).build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue ocrDLQ() {
|
||||
|
||||
return QueueBuilder.durable(OCR_DLQ)
|
||||
.build();
|
||||
return QueueBuilder.durable(OCR_DLQ).build();
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -22,10 +22,12 @@ public class FileExchangeNames {
|
||||
|
||||
public static Definition STRUCTURE = new Definition(FileType.DOCUMENT_STRUCTURE);
|
||||
public static Definition PAGES = new Definition(FileType.DOCUMENT_PAGES);
|
||||
public static Definition CHUNKS = new Definition(FileType.DOCUMENT_CHUNKS);
|
||||
public static Definition TEXT = new Definition(FileType.DOCUMENT_TEXT);
|
||||
public static Definition POSITIONS = new Definition(FileType.DOCUMENT_POSITION);
|
||||
public static Definition SIMPLIFIED_TEXT = new Definition(FileType.SIMPLIFIED_TEXT);
|
||||
public static Definition NER_ENTITIES = new Definition(FileType.NER_ENTITIES);
|
||||
public static Definition LLM_NER_ENTITIES = new Definition(FileType.LLM_NER_ENTITIES);
|
||||
public static Definition AZURE_NER_ENTITIES = new Definition(FileType.AZURE_NER_ENTITIES);
|
||||
|
||||
public static Definition TABLES = new Definition(FileType.TABLES);
|
||||
|
||||
@ -62,8 +62,10 @@ public class FileExportService {
|
||||
addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.POSITIONS);
|
||||
addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.PAGES);
|
||||
addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.NER_ENTITIES);
|
||||
addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.LLM_NER_ENTITIES);
|
||||
addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.AZURE_NER_ENTITIES);
|
||||
addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.SIMPLIFIED_TEXT);
|
||||
addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.CHUNKS);
|
||||
}
|
||||
|
||||
addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.FIGURE);
|
||||
|
||||
@ -100,6 +100,7 @@ public class FileExchangeArchiveReader {
|
||||
FileExchangeNames.SIMPLIFIED_TEXT,
|
||||
FileExchangeNames.NER_ENTITIES,
|
||||
FileExchangeNames.AZURE_NER_ENTITIES,
|
||||
FileExchangeNames.LLM_NER_ENTITIES,
|
||||
FileExchangeNames.TABLES,
|
||||
FileExchangeNames.IMAGES,
|
||||
FileExchangeNames.VISUAL_LAYOUT,
|
||||
@ -107,6 +108,7 @@ public class FileExchangeArchiveReader {
|
||||
FileExchangeNames.HIGHLIGHTS,
|
||||
FileExchangeNames.FIGURE,
|
||||
FileExchangeNames.ORIGIN,
|
||||
FileExchangeNames.CHUNKS,
|
||||
FileExchangeNames.UNTOUCHED,
|
||||
FileExchangeNames.VIEWER);
|
||||
if (optionalFileType.isPresent()) {
|
||||
|
||||
@ -0,0 +1,48 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.model;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import com.iqser.red.service.persistence.management.v1.processor.utils.StorageIdUtils;
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileType;
|
||||
|
||||
import lombok.AccessLevel;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.FieldDefaults;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@FieldDefaults(level = AccessLevel.PRIVATE)
|
||||
public class PyInfraRequest {
|
||||
|
||||
Map<String, String> targetFilePath = new HashMap<>();
|
||||
String responseFilePath;
|
||||
String dossierUuid; // can't be called dossierId, as pyInfra gets confused
|
||||
String fileUuid; // can't be called fileId, as pyInfra gets confused
|
||||
|
||||
|
||||
public PyInfraRequest(String dossierUuid, String fileUuid) {
|
||||
|
||||
this.dossierUuid = dossierUuid;
|
||||
this.fileUuid = fileUuid;
|
||||
this.targetFilePath = new HashMap<>();
|
||||
}
|
||||
|
||||
|
||||
public void addTargetFilePath(String key, FileType fileType) {
|
||||
|
||||
targetFilePath.put(key, StorageIdUtils.getStorageId(dossierUuid, fileUuid, fileType) + ".gz");
|
||||
}
|
||||
|
||||
|
||||
public void setResponseFilePath(FileType fileType) {
|
||||
|
||||
responseFilePath = StorageIdUtils.getStorageId(dossierUuid, fileUuid, fileType) + ".gz";
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,10 +1,12 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.service;
|
||||
|
||||
import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.CHUNKING_SERVICE_QUEUE;
|
||||
import static com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_QUEUE;
|
||||
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -14,6 +16,8 @@ import com.iqser.red.service.persistence.management.v1.processor.exception.NotFo
|
||||
|
||||
import com.iqser.red.service.persistence.management.v1.processor.model.websocket.AnalyseStatus;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.model.websocket.FileEventType;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.ChunkingRequestFactory;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.QueueMessageIdentifierService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DossierTemplatePersistenceService;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@ -50,6 +54,7 @@ import com.iqser.red.service.persistence.management.v1.processor.service.persist
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.websocket.WebsocketService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.settings.FileManagementServiceSettings;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.utils.FileModelMapper;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.utils.StorageIdUtils;
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.AnalyzeRequest;
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.AnalyzeResult;
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.FileAttribute;
|
||||
@ -62,6 +67,7 @@ import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemp
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.mongo.service.ComponentLogMongoService;
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.mongo.service.EntityLogMongoService;
|
||||
import com.knecon.fforesight.databasetenantcommons.providers.utils.MagicConverter;
|
||||
import com.knecon.fforesight.llm.service.LlmNerMessage;
|
||||
import com.knecon.fforesight.service.ocr.v1.api.model.DocumentRequest;
|
||||
|
||||
import jakarta.transaction.Transactional;
|
||||
@ -284,6 +290,22 @@ public class FileStatusService {
|
||||
sendReadOnlyAnalysisEvent(dossierId, fileId, fileEntity);
|
||||
return;
|
||||
}
|
||||
|
||||
if (settings.isLlmNerServiceEnabled() && !fileManagementStorageService.objectExists(dossierId, fileId, FileType.DOCUMENT_CHUNKS)) {
|
||||
var chunkingRequest = ChunkingRequestFactory.createChunkingRequest(dossierId, fileId);
|
||||
setStatusFullProcessing(fileId);
|
||||
log.info("Sending request to {} for {}/{}", CHUNKING_SERVICE_QUEUE, dossierId, fileId);
|
||||
rabbitTemplate.convertAndSend(CHUNKING_SERVICE_QUEUE, chunkingRequest);
|
||||
sendReadOnlyAnalysisEvent(dossierId, fileId, fileEntity);
|
||||
return;
|
||||
}
|
||||
|
||||
if (settings.isLlmNerServiceEnabled() && !fileManagementStorageService.objectExists(dossierId, fileId, FileType.LLM_NER_ENTITIES)) {
|
||||
log.info("Add file: {} from dossier {} to NER queue", fileId, dossierId);
|
||||
addToLLMNerQueue(dossierId, fileId);
|
||||
sendReadOnlyAnalysisEvent(dossierId, fileId, fileEntity);
|
||||
return;
|
||||
}
|
||||
if (settings.isAzureNerServiceEnabled() && !fileManagementStorageService.objectExists(dossierId, fileId, FileType.AZURE_NER_ENTITIES)) {
|
||||
log.info("Add file: {} from dossier {} to AZURE NER queue", fileId, dossierId);
|
||||
addToAzureNerQueue(dossierId, fileId);
|
||||
@ -488,6 +510,26 @@ public class FileStatusService {
|
||||
}
|
||||
|
||||
|
||||
protected void addToLLMNerQueue(String dossierId, String fileId) {
|
||||
|
||||
setStatusNerAnalyzing(fileId);
|
||||
rabbitTemplate.convertAndSend(MessagingConfiguration.LLM_NER_SERVICE_QUEUE,
|
||||
LlmNerMessage.builder()
|
||||
.identifier(QueueMessageIdentifierService.buildIdentifier(dossierId, fileId, false))
|
||||
.chunksStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.DOCUMENT_CHUNKS))
|
||||
.documentPagesStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.DOCUMENT_PAGES))
|
||||
.documentStructureStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.DOCUMENT_STRUCTURE))
|
||||
.documentTextStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.DOCUMENT_TEXT))
|
||||
.documentPositionStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.DOCUMENT_POSITION))
|
||||
.resultStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.LLM_NER_ENTITIES))
|
||||
.build(),
|
||||
message -> {
|
||||
message.getMessageProperties().setPriority(1);
|
||||
return message;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
protected void addToAzureNerQueue(String dossierId, String fileId) {
|
||||
|
||||
setStatusNerAnalyzing(fileId);
|
||||
@ -773,6 +815,7 @@ public class FileStatusService {
|
||||
fileManagementStorageService.deleteObject(dossierId, fileId, FileType.DOCUMENT_TEXT);
|
||||
fileManagementStorageService.deleteObject(dossierId, fileId, FileType.DOCUMENT_STRUCTURE);
|
||||
fileManagementStorageService.deleteObject(dossierId, fileId, FileType.NER_ENTITIES);
|
||||
fileManagementStorageService.deleteObject(dossierId, fileId, FileType.LLM_NER_ENTITIES);
|
||||
}
|
||||
|
||||
addToAnalysisQueue(dossierId, fileId, priority, Sets.newHashSet(), false);
|
||||
|
||||
@ -0,0 +1,28 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing;
|
||||
|
||||
import com.iqser.red.service.persistence.management.v1.processor.model.PyInfraRequest;
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileType;
|
||||
|
||||
import lombok.experimental.UtilityClass;
|
||||
|
||||
@UtilityClass
|
||||
public class ChunkingRequestFactory {
|
||||
|
||||
private static final String PAGES = "pages";
|
||||
private static final String TEXT_BLOCKS = "textBlocks";
|
||||
private static final String TEXT_POSITIONS = "positions";
|
||||
private static final String STRUCTURE = "structure";
|
||||
|
||||
|
||||
public PyInfraRequest createChunkingRequest(String dossierId, String fileId) {
|
||||
|
||||
PyInfraRequest pyInfraRequest = new PyInfraRequest(dossierId, fileId);
|
||||
pyInfraRequest.addTargetFilePath(PAGES, FileType.DOCUMENT_PAGES);
|
||||
pyInfraRequest.addTargetFilePath(TEXT_BLOCKS, FileType.DOCUMENT_TEXT);
|
||||
pyInfraRequest.addTargetFilePath(TEXT_POSITIONS, FileType.DOCUMENT_POSITION);
|
||||
pyInfraRequest.addTargetFilePath(STRUCTURE, FileType.DOCUMENT_STRUCTURE);
|
||||
pyInfraRequest.setResponseFilePath(FileType.DOCUMENT_CHUNKS);
|
||||
return pyInfraRequest;
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,25 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import lombok.AccessLevel;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.FieldDefaults;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@FieldDefaults(level = AccessLevel.PRIVATE)
|
||||
public class ChunkingResponse {
|
||||
|
||||
Map<String, String> targetFilePath;
|
||||
String responseFilePath;
|
||||
|
||||
List<ChunkingResponseData> data;
|
||||
|
||||
}
|
||||
@ -0,0 +1,26 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import lombok.AccessLevel;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.FieldDefaults;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@FieldDefaults(level = AccessLevel.PRIVATE)
|
||||
public class ChunkingResponseData {
|
||||
|
||||
Integer chunkId;
|
||||
String text;
|
||||
List<String> types;
|
||||
List<List<Integer>> treeIds;
|
||||
float[] embedding;
|
||||
Integer tokenCount;
|
||||
|
||||
}
|
||||
@ -5,7 +5,6 @@ import java.util.Optional;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.DossierEntity;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.FileManagementStorageService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.settings.FileManagementServiceSettings;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.utils.StorageIdUtils;
|
||||
@ -23,7 +22,7 @@ public class LayoutParsingRequestFactory {
|
||||
private String applicationType;
|
||||
|
||||
private final FileManagementStorageService fileManagementStorageService;
|
||||
private final LayoutParsingRequestIdentifierService layoutParsingRequestIdentifierService;
|
||||
|
||||
private final FileManagementServiceSettings fileManagementServiceSettings;
|
||||
|
||||
|
||||
@ -46,7 +45,7 @@ public class LayoutParsingRequestFactory {
|
||||
|
||||
return LayoutParsingRequest.builder()
|
||||
.layoutParsingType(type)
|
||||
.identifier(layoutParsingRequestIdentifierService.buildIdentifier(dossierId, fileId, priority))
|
||||
.identifier(QueueMessageIdentifierService.buildIdentifier(dossierId, fileId, priority))
|
||||
.originFileStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.ORIGIN))
|
||||
.imagesFileStorageId(optionalImageFileId)
|
||||
.tablesFileStorageId(optionalTableFileId)
|
||||
|
||||
@ -4,10 +4,10 @@ import java.util.Map;
|
||||
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.DossierEntity;
|
||||
import lombok.experimental.UtilityClass;
|
||||
|
||||
@Service
|
||||
public class LayoutParsingRequestIdentifierService {
|
||||
@UtilityClass
|
||||
public class QueueMessageIdentifierService {
|
||||
|
||||
private enum IdentifierNames {
|
||||
DOSSIER_ID,
|
||||
@ -0,0 +1,84 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.service.queue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.model.PyInfraRequest;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusProcessingUpdateService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusService;
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileErrorInfo;
|
||||
|
||||
import io.micrometer.observation.ObservationRegistry;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.experimental.FieldDefaults;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
|
||||
public class ChunkingMessageReceiver {
|
||||
|
||||
FileStatusService fileStatusService;
|
||||
ObservationRegistry observationRegistry;
|
||||
FileStatusProcessingUpdateService fileStatusProcessingUpdateService;
|
||||
ObjectMapper mapper;
|
||||
|
||||
|
||||
@RabbitListener(queues = MessagingConfiguration.CHUNKING_SERVICE_RESPONSE_QUEUE)
|
||||
public void receive(Message message) {
|
||||
|
||||
PyInfraRequest pyInfraRequest = parsePyInfraRequest(message);
|
||||
addFileIdToTrace(pyInfraRequest.getFileUuid());
|
||||
|
||||
log.info("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.CHUNKING_SERVICE_RESPONSE_QUEUE, pyInfraRequest.getDossierUuid(), pyInfraRequest.getFileUuid());
|
||||
fileStatusService.setStatusAnalyse(pyInfraRequest.getDossierUuid(), pyInfraRequest.getFileUuid(), false);
|
||||
}
|
||||
|
||||
|
||||
@RabbitListener(queues = MessagingConfiguration.CHUNKING_SERVICE_DLQ)
|
||||
public void handleDLQ(Message message) {
|
||||
|
||||
PyInfraRequest pyInfraRequest = parsePyInfraRequest(message);
|
||||
addFileIdToTrace(pyInfraRequest.getFileUuid());
|
||||
|
||||
log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.CHUNKING_SERVICE_DLQ, pyInfraRequest.getDossierUuid(), pyInfraRequest.getFileUuid());
|
||||
fileStatusProcessingUpdateService.analysisFailed(pyInfraRequest.getDossierUuid(),
|
||||
pyInfraRequest.getFileUuid(),
|
||||
new FileErrorInfo("chunking service failed",
|
||||
MessagingConfiguration.CHUNKING_SERVICE_DLQ,
|
||||
"junker-chunker-service",
|
||||
OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS)));
|
||||
}
|
||||
|
||||
|
||||
private PyInfraRequest parsePyInfraRequest(Message message) {
|
||||
|
||||
PyInfraRequest pyInfraRequest = null;
|
||||
try {
|
||||
pyInfraRequest = mapper.readValue(message.getBody(), PyInfraRequest.class);
|
||||
} catch (IOException e) {
|
||||
log.error("Could not decode message {}", new String(message.getBody()));
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return pyInfraRequest;
|
||||
}
|
||||
|
||||
|
||||
// This can be removed if tracing is implemented in python services.
|
||||
private void addFileIdToTrace(String fileId) {
|
||||
|
||||
if (observationRegistry.getCurrentObservation() != null) {
|
||||
observationRegistry.getCurrentObservation().highCardinalityKeyValue("fileId", fileId);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -17,7 +17,7 @@ import com.iqser.red.service.persistence.management.v1.processor.service.FileSta
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.ImageSimilarityService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.websocket.WebsocketService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.LayoutParsingRequestIdentifierService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.QueueMessageIdentifierService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.SaasMigrationStatusPersistenceService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.utils.StorageIdUtils;
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileErrorInfo;
|
||||
@ -38,7 +38,6 @@ public class LayoutParsingFinishedMessageReceiver {
|
||||
private final FileStatusService fileStatusService;
|
||||
private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final LayoutParsingRequestIdentifierService layoutParsingRequestIdentifierService;
|
||||
private final SaasMigrationStatusPersistenceService saasMigrationStatusPersistenceService;
|
||||
private final SaasMigrationService saasMigrationService;
|
||||
private final ImageSimilarityService imageSimilarityService;
|
||||
@ -49,22 +48,23 @@ public class LayoutParsingFinishedMessageReceiver {
|
||||
@RabbitListener(queues = LayoutParsingQueueNames.LAYOUT_PARSING_FINISHED_EVENT_QUEUE)
|
||||
public void receive(LayoutParsingFinishedEvent response) {
|
||||
|
||||
if (saasMigrationStatusPersistenceService.isMigrating(layoutParsingRequestIdentifierService.parseFileId(response.identifier()))) {
|
||||
saasMigrationService.handleLayoutParsingFinished(layoutParsingRequestIdentifierService.parseDossierId(response.identifier()),
|
||||
layoutParsingRequestIdentifierService.parseFileId(response.identifier()));
|
||||
var dossierId = QueueMessageIdentifierService.parseDossierId(response.identifier());
|
||||
var fileId = QueueMessageIdentifierService.parseFileId(response.identifier());
|
||||
log.info("Layout parsing has finished for {}/{} in {}", dossierId, fileId, LayoutParsingQueueNames.LAYOUT_PARSING_FINISHED_EVENT_QUEUE);
|
||||
if (saasMigrationStatusPersistenceService.isMigrating(QueueMessageIdentifierService.parseFileId(response.identifier()))) {
|
||||
saasMigrationService.handleLayoutParsingFinished(QueueMessageIdentifierService.parseDossierId(response.identifier()),
|
||||
QueueMessageIdentifierService.parseFileId(response.identifier()));
|
||||
return;
|
||||
}
|
||||
|
||||
var templateId = "";
|
||||
var dossierId = layoutParsingRequestIdentifierService.parseDossierId(response.identifier());
|
||||
var fileId = layoutParsingRequestIdentifierService.parseFileId(response.identifier());
|
||||
var storageId = StorageIdUtils.getStorageId(dossierId, fileId, FileType.DOCUMENT_STRUCTURE);
|
||||
|
||||
fileStatusService.setStatusAnalyse(layoutParsingRequestIdentifierService.parseDossierId(response.identifier()),
|
||||
layoutParsingRequestIdentifierService.parseFileId(response.identifier()),
|
||||
layoutParsingRequestIdentifierService.parsePriority(response.identifier()));
|
||||
fileStatusService.setStatusAnalyse(QueueMessageIdentifierService.parseDossierId(response.identifier()),
|
||||
QueueMessageIdentifierService.parseFileId(response.identifier()),
|
||||
QueueMessageIdentifierService.parsePriority(response.identifier()));
|
||||
|
||||
fileStatusService.updateLayoutProcessedTime(layoutParsingRequestIdentifierService.parseFileId(response.identifier()));
|
||||
fileStatusService.updateLayoutProcessedTime(QueueMessageIdentifierService.parseFileId(response.identifier()));
|
||||
websocketService.sendAnalysisEvent(dossierId, fileId, AnalyseStatus.LAYOUT_UPDATE, fileStatusService.getStatus(fileId).getNumberOfAnalyses() + 1);
|
||||
|
||||
try {
|
||||
@ -74,7 +74,6 @@ public class LayoutParsingFinishedMessageReceiver {
|
||||
throw e;
|
||||
}
|
||||
|
||||
log.info("Received message {} in {}", response, LayoutParsingQueueNames.LAYOUT_PARSING_FINISHED_EVENT_QUEUE);
|
||||
}
|
||||
|
||||
|
||||
@ -88,9 +87,9 @@ public class LayoutParsingFinishedMessageReceiver {
|
||||
if (errorCause == null) {
|
||||
errorCause = "Error occured during layout parsing!";
|
||||
}
|
||||
if (saasMigrationStatusPersistenceService.isMigrating(layoutParsingRequestIdentifierService.parseFileId(analyzeRequest.identifier()))) {
|
||||
saasMigrationService.handleError(layoutParsingRequestIdentifierService.parseDossierId(analyzeRequest.identifier()),
|
||||
layoutParsingRequestIdentifierService.parseFileId(analyzeRequest.identifier()),
|
||||
if (saasMigrationStatusPersistenceService.isMigrating(QueueMessageIdentifierService.parseFileId(analyzeRequest.identifier()))) {
|
||||
saasMigrationService.handleError(QueueMessageIdentifierService.parseDossierId(analyzeRequest.identifier()),
|
||||
QueueMessageIdentifierService.parseFileId(analyzeRequest.identifier()),
|
||||
errorCause,
|
||||
LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_QUEUE);
|
||||
return;
|
||||
@ -99,8 +98,8 @@ public class LayoutParsingFinishedMessageReceiver {
|
||||
OffsetDateTime timestamp = failedMessage.getMessageProperties().getHeader(MessagingConfiguration.X_ERROR_INFO_TIMESTAMP_HEADER);
|
||||
timestamp = timestamp != null ? timestamp : OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS);
|
||||
log.info("Failed to process layout parsing request, errorCause: {}, timestamp: {}", errorCause, timestamp);
|
||||
fileStatusProcessingUpdateService.analysisFailed(layoutParsingRequestIdentifierService.parseDossierId(analyzeRequest.identifier()),
|
||||
layoutParsingRequestIdentifierService.parseFileId(analyzeRequest.identifier()),
|
||||
fileStatusProcessingUpdateService.analysisFailed(QueueMessageIdentifierService.parseDossierId(analyzeRequest.identifier()),
|
||||
QueueMessageIdentifierService.parseFileId(analyzeRequest.identifier()),
|
||||
new FileErrorInfo(errorCause, LAYOUT_PARSING_DLQ, "layoutparser-service", timestamp));
|
||||
|
||||
}
|
||||
|
||||
@ -14,7 +14,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusProcessingUpdateService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.QueueMessageIdentifierService;
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileErrorInfo;
|
||||
import com.knecon.fforesight.llm.service.LlmNerMessage;
|
||||
import com.knecon.fforesight.llm.service.LlmNerResponseMessage;
|
||||
|
||||
import io.micrometer.observation.ObservationRegistry;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
@ -48,6 +51,19 @@ public class NerMessageReceiver {
|
||||
}
|
||||
|
||||
|
||||
@SneakyThrows
|
||||
@RabbitListener(queues = MessagingConfiguration.LLM_NER_SERVICE_RESPONSE_QUEUE)
|
||||
public void receiveLLM(LlmNerResponseMessage message) {
|
||||
|
||||
String dossierId = QueueMessageIdentifierService.parseDossierId(message.getIdentifier());
|
||||
String fileId = QueueMessageIdentifierService.parseFileId(message.getIdentifier());
|
||||
addFileIdToTrace(fileId);
|
||||
|
||||
log.info("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.LLM_NER_SERVICE_RESPONSE_QUEUE, dossierId, fileId);
|
||||
fileStatusService.setStatusAnalyse(dossierId, fileId, false);
|
||||
}
|
||||
|
||||
|
||||
@RabbitListener(queues = MessagingConfiguration.NER_SERVICE_DLQ)
|
||||
public void handleDLQMessage(Message failedMessage) throws IOException {
|
||||
|
||||
@ -67,6 +83,26 @@ public class NerMessageReceiver {
|
||||
}
|
||||
|
||||
|
||||
@RabbitListener(queues = MessagingConfiguration.LLM_NER_SERVICE_DLQ)
|
||||
public void handleLLMDLQMessage(Message failedMessage) throws IOException {
|
||||
|
||||
LlmNerMessage entityResponse = objectMapper.readValue(failedMessage.getBody(), LlmNerMessage.class);
|
||||
|
||||
String dossierId = QueueMessageIdentifierService.parseDossierId(entityResponse.getIdentifier());
|
||||
String fileId = QueueMessageIdentifierService.parseDossierId(entityResponse.getIdentifier());
|
||||
|
||||
addFileIdToTrace(fileId);
|
||||
|
||||
log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.LLM_NER_SERVICE_DLQ, dossierId, fileId);
|
||||
fileStatusProcessingUpdateService.analysisFailed(dossierId,
|
||||
fileId,
|
||||
new FileErrorInfo("llm ner service failed",
|
||||
MessagingConfiguration.LLM_NER_SERVICE_DLQ,
|
||||
"entity-recognition-service-v3",
|
||||
OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS)));
|
||||
}
|
||||
|
||||
|
||||
// This can be removed if tracing is implemented in python services.
|
||||
private void addFileIdToTrace(String fileId) {
|
||||
|
||||
|
||||
@ -25,6 +25,7 @@ public class FileManagementServiceSettings {
|
||||
private boolean imageServiceEnabled = true;
|
||||
private boolean nerServiceEnabled = true;
|
||||
private boolean azureNerServiceEnabled;
|
||||
private boolean llmNerServiceEnabled;
|
||||
private boolean visualLayoutParsingEnabled;
|
||||
|
||||
private boolean storeImageFile = true;
|
||||
|
||||
@ -11,6 +11,7 @@ public enum FileType {
|
||||
SIMPLIFIED_TEXT(".json"),
|
||||
TEXT(".json"), // deprecated file type, only present in legacy migrations
|
||||
NER_ENTITIES(".json"),
|
||||
LLM_NER_ENTITIES(".json"),
|
||||
AZURE_NER_ENTITIES(".json"),
|
||||
IMAGE_INFO(".json"),
|
||||
IMPORTED_REDACTIONS(".json"),
|
||||
@ -26,6 +27,7 @@ public enum FileType {
|
||||
DOCUMENT_STRUCTURE(".json"),
|
||||
DOCUMENT_POSITION(".json"),
|
||||
DOCUMENT_PAGES(".json"),
|
||||
DOCUMENT_CHUNKS(".json"),
|
||||
ENTITY_LOG(".json"),
|
||||
ENTITY_LOG_BAK(".json"),
|
||||
COMPONENT_LOG(".json"),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user