From a99b2e816590607229c7e363442b090c28149eaa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kilian=20Sch=C3=BCttler?= Date: Tue, 27 Aug 2024 10:28:29 +0200 Subject: [PATCH] Spike: LLM NER --- .../build.gradle.kts | 1 + .../configuration/MessagingConfiguration.java | 66 +++++++++++++-- .../dataexchange/FileExchangeNames.java | 2 + .../service/FileExportService.java | 2 + .../zipreaders/FileExchangeArchiveReader.java | 2 + .../v1/processor/model/PyInfraRequest.java | 48 +++++++++++ .../processor/service/FileStatusService.java | 43 ++++++++++ .../layoutparsing/ChunkingRequestFactory.java | 28 +++++++ .../layoutparsing/ChunkingResponse.java | 25 ++++++ .../layoutparsing/ChunkingResponseData.java | 26 ++++++ .../LayoutParsingRequestFactory.java | 5 +- ...ava => QueueMessageIdentifierService.java} | 6 +- .../queue/ChunkingMessageReceiver.java | 84 +++++++++++++++++++ .../LayoutParsingFinishedMessageReceiver.java | 33 ++++---- .../service/queue/NerMessageReceiver.java | 36 ++++++++ .../FileManagementServiceSettings.java | 1 + .../dossier/file/FileType.java | 2 + 17 files changed, 382 insertions(+), 28 deletions(-) create mode 100644 persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/model/PyInfraRequest.java create mode 100644 persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/ChunkingRequestFactory.java create mode 100644 persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/ChunkingResponse.java create mode 100644 persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/ChunkingResponseData.java rename persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/{LayoutParsingRequestIdentifierService.java => QueueMessageIdentifierService.java} (86%) create mode 100644 persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/ChunkingMessageReceiver.java diff --git a/persistence-service-v1/persistence-service-processor-v1/build.gradle.kts b/persistence-service-v1/persistence-service-processor-v1/build.gradle.kts index 7af9ae075..00ffac61d 100644 --- a/persistence-service-v1/persistence-service-processor-v1/build.gradle.kts +++ b/persistence-service-v1/persistence-service-processor-v1/build.gradle.kts @@ -42,6 +42,7 @@ dependencies { exclude(group = "com.iqser.red.service", module = "persistence-service-internal-api-v1") exclude(group = "com.iqser.red.service", module = "persistence-service-shared-api-v1") } + implementation("com.knecon.fforesight:llm-service-api:1.10.0") api("com.knecon.fforesight:jobs-commons:0.10.0") api("com.knecon.fforesight:database-tenant-commons:0.24.0") api("com.knecon.fforesight:keycloak-commons:0.29.0") diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/MessagingConfiguration.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/MessagingConfiguration.java index 190c0de96..249349aa1 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/MessagingConfiguration.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/MessagingConfiguration.java @@ -8,6 +8,8 @@ import org.springframework.amqp.core.QueueBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import com.knecon.fforesight.llm.service.QueueNames; + import lombok.RequiredArgsConstructor; @Configuration @@ -54,6 +56,14 @@ public class MessagingConfiguration { public static final String NER_SERVICE_RESPONSE_QUEUE = "entity_response_queue"; public static final String NER_SERVICE_DLQ = "entity_dead_letter_queue"; + public static final String CHUNKING_SERVICE_QUEUE = "chunking_service_request_queue"; + public static final String CHUNKING_SERVICE_RESPONSE_QUEUE = "chunking_service_response_queue"; + public static final String CHUNKING_SERVICE_DLQ = "chunking_service_dlq"; + + public static final String LLM_NER_SERVICE_QUEUE = QueueNames.LLM_NER_SERVICE_QUEUE; + public static final String LLM_NER_SERVICE_RESPONSE_QUEUE = QueueNames.LLM_NER_SERVICE_RESPONSE_QUEUE; + public static final String LLM_NER_SERVICE_DLQ = QueueNames.LLM_NER_SERVICE_DLQ; + public static final String AZURE_NER_SERVICE_QUEUE = "azure_entity_request_queue"; public static final String AZURE_NER_SERVICE_RESPONSE_QUEUE = "azure_entity_response_queue"; public static final String AZURE_NER_SERVICE_DLQ = "azure_entity_dead_letter_queue"; @@ -77,7 +87,6 @@ public class MessagingConfiguration { public static final String X_ERROR_INFO_HEADER = "x-error-message"; public static final String X_ERROR_INFO_TIMESTAMP_HEADER = "x-error-message-timestamp"; - public static final String LAYOUT_PARSING_DLQ = "layout_parsing_dead_letter_queue"; // --- Saas Migration, can be removed later ---- @@ -156,6 +165,54 @@ public class MessagingConfiguration { } + @Bean + public Queue llmNerRequestQueue() { + + return QueueBuilder.durable(LLM_NER_SERVICE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", LLM_NER_SERVICE_DLQ).build(); + } + + + @Bean + public Queue llmNerResponseQueue() { + + return QueueBuilder.durable(LLM_NER_SERVICE_RESPONSE_QUEUE) + .withArgument("x-dead-letter-exchange", "") + .withArgument("x-dead-letter-routing-key", LLM_NER_SERVICE_DLQ) + .build(); + } + + + @Bean + public Queue llmNerResponseDLQ() { + + return QueueBuilder.durable(LLM_NER_SERVICE_DLQ).build(); + } + + + @Bean + public Queue chunkingRequestQueue() { + + return QueueBuilder.durable(CHUNKING_SERVICE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", CHUNKING_SERVICE_DLQ).build(); + } + + + @Bean + public Queue chunkingResponseQueue() { + + return QueueBuilder.durable(CHUNKING_SERVICE_RESPONSE_QUEUE) + .withArgument("x-dead-letter-exchange", "") + .withArgument("x-dead-letter-routing-key", CHUNKING_SERVICE_DLQ) + .build(); + } + + + @Bean + public Queue chunkingDLQ() { + + return QueueBuilder.durable(CHUNKING_SERVICE_DLQ).build(); + } + + @Bean public Queue imageRequestQueue() { @@ -276,19 +333,18 @@ public class MessagingConfiguration { .build(); } + @Bean public Queue ocrResponseQueue() { - return QueueBuilder.durable(OCR_RESPONSE_QUEUE) - .build(); + return QueueBuilder.durable(OCR_RESPONSE_QUEUE).build(); } @Bean public Queue ocrDLQ() { - return QueueBuilder.durable(OCR_DLQ) - .build(); + return QueueBuilder.durable(OCR_DLQ).build(); } diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/FileExchangeNames.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/FileExchangeNames.java index f59f4e1ab..9b5c2e11c 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/FileExchangeNames.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/FileExchangeNames.java @@ -22,10 +22,12 @@ public class FileExchangeNames { public static Definition STRUCTURE = new Definition(FileType.DOCUMENT_STRUCTURE); public static Definition PAGES = new Definition(FileType.DOCUMENT_PAGES); + public static Definition CHUNKS = new Definition(FileType.DOCUMENT_CHUNKS); public static Definition TEXT = new Definition(FileType.DOCUMENT_TEXT); public static Definition POSITIONS = new Definition(FileType.DOCUMENT_POSITION); public static Definition SIMPLIFIED_TEXT = new Definition(FileType.SIMPLIFIED_TEXT); public static Definition NER_ENTITIES = new Definition(FileType.NER_ENTITIES); + public static Definition LLM_NER_ENTITIES = new Definition(FileType.LLM_NER_ENTITIES); public static Definition AZURE_NER_ENTITIES = new Definition(FileType.AZURE_NER_ENTITIES); public static Definition TABLES = new Definition(FileType.TABLES); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/service/FileExportService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/service/FileExportService.java index cb9237978..f78a199f1 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/service/FileExportService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/service/FileExportService.java @@ -62,8 +62,10 @@ public class FileExportService { addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.POSITIONS); addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.PAGES); addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.NER_ENTITIES); + addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.LLM_NER_ENTITIES); addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.AZURE_NER_ENTITIES); addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.SIMPLIFIED_TEXT); + addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.CHUNKS); } addArchiveModelForStorageFile(archiver, file, fileFolder, FileExchangeNames.FIGURE); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/zipreaders/FileExchangeArchiveReader.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/zipreaders/FileExchangeArchiveReader.java index 488d5fa3d..d7c105e9a 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/zipreaders/FileExchangeArchiveReader.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/zipreaders/FileExchangeArchiveReader.java @@ -100,6 +100,7 @@ public class FileExchangeArchiveReader { FileExchangeNames.SIMPLIFIED_TEXT, FileExchangeNames.NER_ENTITIES, FileExchangeNames.AZURE_NER_ENTITIES, + FileExchangeNames.LLM_NER_ENTITIES, FileExchangeNames.TABLES, FileExchangeNames.IMAGES, FileExchangeNames.VISUAL_LAYOUT, @@ -107,6 +108,7 @@ public class FileExchangeArchiveReader { FileExchangeNames.HIGHLIGHTS, FileExchangeNames.FIGURE, FileExchangeNames.ORIGIN, + FileExchangeNames.CHUNKS, FileExchangeNames.UNTOUCHED, FileExchangeNames.VIEWER); if (optionalFileType.isPresent()) { diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/model/PyInfraRequest.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/model/PyInfraRequest.java new file mode 100644 index 000000000..40d328727 --- /dev/null +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/model/PyInfraRequest.java @@ -0,0 +1,48 @@ +package com.iqser.red.service.persistence.management.v1.processor.model; + +import java.util.HashMap; +import java.util.Map; + +import com.iqser.red.service.persistence.management.v1.processor.utils.StorageIdUtils; +import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileType; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.FieldDefaults; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@FieldDefaults(level = AccessLevel.PRIVATE) +public class PyInfraRequest { + + Map targetFilePath = new HashMap<>(); + String responseFilePath; + String dossierUuid; // can't be called dossierId, as pyInfra gets confused + String fileUuid; // can't be called fileId, as pyInfra gets confused + + + public PyInfraRequest(String dossierUuid, String fileUuid) { + + this.dossierUuid = dossierUuid; + this.fileUuid = fileUuid; + this.targetFilePath = new HashMap<>(); + } + + + public void addTargetFilePath(String key, FileType fileType) { + + targetFilePath.put(key, StorageIdUtils.getStorageId(dossierUuid, fileUuid, fileType) + ".gz"); + } + + + public void setResponseFilePath(FileType fileType) { + + responseFilePath = StorageIdUtils.getStorageId(dossierUuid, fileUuid, fileType) + ".gz"; + } + +} diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusService.java index cb74b05c6..08c23a611 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusService.java @@ -1,10 +1,12 @@ package com.iqser.red.service.persistence.management.v1.processor.service; +import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.CHUNKING_SERVICE_QUEUE; import static com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_QUEUE; import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -14,6 +16,8 @@ import com.iqser.red.service.persistence.management.v1.processor.exception.NotFo import com.iqser.red.service.persistence.management.v1.processor.model.websocket.AnalyseStatus; import com.iqser.red.service.persistence.management.v1.processor.model.websocket.FileEventType; +import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.ChunkingRequestFactory; +import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.QueueMessageIdentifierService; import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DossierTemplatePersistenceService; import org.apache.commons.lang3.StringUtils; @@ -50,6 +54,7 @@ import com.iqser.red.service.persistence.management.v1.processor.service.persist import com.iqser.red.service.persistence.management.v1.processor.service.websocket.WebsocketService; import com.iqser.red.service.persistence.management.v1.processor.settings.FileManagementServiceSettings; import com.iqser.red.service.persistence.management.v1.processor.utils.FileModelMapper; +import com.iqser.red.service.persistence.management.v1.processor.utils.StorageIdUtils; import com.iqser.red.service.persistence.service.v1.api.shared.model.AnalyzeRequest; import com.iqser.red.service.persistence.service.v1.api.shared.model.AnalyzeResult; import com.iqser.red.service.persistence.service.v1.api.shared.model.FileAttribute; @@ -62,6 +67,7 @@ import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemp import com.iqser.red.service.persistence.service.v1.api.shared.mongo.service.ComponentLogMongoService; import com.iqser.red.service.persistence.service.v1.api.shared.mongo.service.EntityLogMongoService; import com.knecon.fforesight.databasetenantcommons.providers.utils.MagicConverter; +import com.knecon.fforesight.llm.service.LlmNerMessage; import com.knecon.fforesight.service.ocr.v1.api.model.DocumentRequest; import jakarta.transaction.Transactional; @@ -284,6 +290,22 @@ public class FileStatusService { sendReadOnlyAnalysisEvent(dossierId, fileId, fileEntity); return; } + + if (settings.isLlmNerServiceEnabled() && !fileManagementStorageService.objectExists(dossierId, fileId, FileType.DOCUMENT_CHUNKS)) { + var chunkingRequest = ChunkingRequestFactory.createChunkingRequest(dossierId, fileId); + setStatusFullProcessing(fileId); + log.info("Sending request to {} for {}/{}", CHUNKING_SERVICE_QUEUE, dossierId, fileId); + rabbitTemplate.convertAndSend(CHUNKING_SERVICE_QUEUE, chunkingRequest); + sendReadOnlyAnalysisEvent(dossierId, fileId, fileEntity); + return; + } + + if (settings.isLlmNerServiceEnabled() && !fileManagementStorageService.objectExists(dossierId, fileId, FileType.LLM_NER_ENTITIES)) { + log.info("Add file: {} from dossier {} to NER queue", fileId, dossierId); + addToLLMNerQueue(dossierId, fileId); + sendReadOnlyAnalysisEvent(dossierId, fileId, fileEntity); + return; + } if (settings.isAzureNerServiceEnabled() && !fileManagementStorageService.objectExists(dossierId, fileId, FileType.AZURE_NER_ENTITIES)) { log.info("Add file: {} from dossier {} to AZURE NER queue", fileId, dossierId); addToAzureNerQueue(dossierId, fileId); @@ -488,6 +510,26 @@ public class FileStatusService { } + protected void addToLLMNerQueue(String dossierId, String fileId) { + + setStatusNerAnalyzing(fileId); + rabbitTemplate.convertAndSend(MessagingConfiguration.LLM_NER_SERVICE_QUEUE, + LlmNerMessage.builder() + .identifier(QueueMessageIdentifierService.buildIdentifier(dossierId, fileId, false)) + .chunksStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.DOCUMENT_CHUNKS)) + .documentPagesStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.DOCUMENT_PAGES)) + .documentStructureStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.DOCUMENT_STRUCTURE)) + .documentTextStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.DOCUMENT_TEXT)) + .documentPositionStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.DOCUMENT_POSITION)) + .resultStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.LLM_NER_ENTITIES)) + .build(), + message -> { + message.getMessageProperties().setPriority(1); + return message; + }); + } + + protected void addToAzureNerQueue(String dossierId, String fileId) { setStatusNerAnalyzing(fileId); @@ -773,6 +815,7 @@ public class FileStatusService { fileManagementStorageService.deleteObject(dossierId, fileId, FileType.DOCUMENT_TEXT); fileManagementStorageService.deleteObject(dossierId, fileId, FileType.DOCUMENT_STRUCTURE); fileManagementStorageService.deleteObject(dossierId, fileId, FileType.NER_ENTITIES); + fileManagementStorageService.deleteObject(dossierId, fileId, FileType.LLM_NER_ENTITIES); } addToAnalysisQueue(dossierId, fileId, priority, Sets.newHashSet(), false); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/ChunkingRequestFactory.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/ChunkingRequestFactory.java new file mode 100644 index 000000000..bfcc11f53 --- /dev/null +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/ChunkingRequestFactory.java @@ -0,0 +1,28 @@ +package com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing; + +import com.iqser.red.service.persistence.management.v1.processor.model.PyInfraRequest; +import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileType; + +import lombok.experimental.UtilityClass; + +@UtilityClass +public class ChunkingRequestFactory { + + private static final String PAGES = "pages"; + private static final String TEXT_BLOCKS = "textBlocks"; + private static final String TEXT_POSITIONS = "positions"; + private static final String STRUCTURE = "structure"; + + + public PyInfraRequest createChunkingRequest(String dossierId, String fileId) { + + PyInfraRequest pyInfraRequest = new PyInfraRequest(dossierId, fileId); + pyInfraRequest.addTargetFilePath(PAGES, FileType.DOCUMENT_PAGES); + pyInfraRequest.addTargetFilePath(TEXT_BLOCKS, FileType.DOCUMENT_TEXT); + pyInfraRequest.addTargetFilePath(TEXT_POSITIONS, FileType.DOCUMENT_POSITION); + pyInfraRequest.addTargetFilePath(STRUCTURE, FileType.DOCUMENT_STRUCTURE); + pyInfraRequest.setResponseFilePath(FileType.DOCUMENT_CHUNKS); + return pyInfraRequest; + } + +} diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/ChunkingResponse.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/ChunkingResponse.java new file mode 100644 index 000000000..80abc8ec7 --- /dev/null +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/ChunkingResponse.java @@ -0,0 +1,25 @@ +package com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing; + +import java.util.List; +import java.util.Map; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.FieldDefaults; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +@FieldDefaults(level = AccessLevel.PRIVATE) +public class ChunkingResponse { + + Map targetFilePath; + String responseFilePath; + + List data; + +} diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/ChunkingResponseData.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/ChunkingResponseData.java new file mode 100644 index 000000000..37b7a9223 --- /dev/null +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/ChunkingResponseData.java @@ -0,0 +1,26 @@ +package com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing; + +import java.util.List; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.FieldDefaults; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +@FieldDefaults(level = AccessLevel.PRIVATE) +public class ChunkingResponseData { + + Integer chunkId; + String text; + List types; + List> treeIds; + float[] embedding; + Integer tokenCount; + +} diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/LayoutParsingRequestFactory.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/LayoutParsingRequestFactory.java index 9a2f15d75..43c923a49 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/LayoutParsingRequestFactory.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/LayoutParsingRequestFactory.java @@ -5,7 +5,6 @@ import java.util.Optional; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.DossierEntity; import com.iqser.red.service.persistence.management.v1.processor.service.FileManagementStorageService; import com.iqser.red.service.persistence.management.v1.processor.settings.FileManagementServiceSettings; import com.iqser.red.service.persistence.management.v1.processor.utils.StorageIdUtils; @@ -23,7 +22,7 @@ public class LayoutParsingRequestFactory { private String applicationType; private final FileManagementStorageService fileManagementStorageService; - private final LayoutParsingRequestIdentifierService layoutParsingRequestIdentifierService; + private final FileManagementServiceSettings fileManagementServiceSettings; @@ -46,7 +45,7 @@ public class LayoutParsingRequestFactory { return LayoutParsingRequest.builder() .layoutParsingType(type) - .identifier(layoutParsingRequestIdentifierService.buildIdentifier(dossierId, fileId, priority)) + .identifier(QueueMessageIdentifierService.buildIdentifier(dossierId, fileId, priority)) .originFileStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.ORIGIN)) .imagesFileStorageId(optionalImageFileId) .tablesFileStorageId(optionalTableFileId) diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/LayoutParsingRequestIdentifierService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/QueueMessageIdentifierService.java similarity index 86% rename from persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/LayoutParsingRequestIdentifierService.java rename to persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/QueueMessageIdentifierService.java index 5eccf6b44..6edb94911 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/LayoutParsingRequestIdentifierService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/layoutparsing/QueueMessageIdentifierService.java @@ -4,10 +4,10 @@ import java.util.Map; import org.springframework.stereotype.Service; -import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.DossierEntity; +import lombok.experimental.UtilityClass; -@Service -public class LayoutParsingRequestIdentifierService { +@UtilityClass +public class QueueMessageIdentifierService { private enum IdentifierNames { DOSSIER_ID, diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/ChunkingMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/ChunkingMessageReceiver.java new file mode 100644 index 000000000..4cd6f9323 --- /dev/null +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/ChunkingMessageReceiver.java @@ -0,0 +1,84 @@ +package com.iqser.red.service.persistence.management.v1.processor.service.queue; + +import java.io.IOException; +import java.time.OffsetDateTime; +import java.time.temporal.ChronoUnit; + +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration; +import com.iqser.red.service.persistence.management.v1.processor.model.PyInfraRequest; +import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusProcessingUpdateService; +import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusService; +import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileErrorInfo; + +import io.micrometer.observation.ObservationRegistry; +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import lombok.experimental.FieldDefaults; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Service +@RequiredArgsConstructor +@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) +public class ChunkingMessageReceiver { + + FileStatusService fileStatusService; + ObservationRegistry observationRegistry; + FileStatusProcessingUpdateService fileStatusProcessingUpdateService; + ObjectMapper mapper; + + + @RabbitListener(queues = MessagingConfiguration.CHUNKING_SERVICE_RESPONSE_QUEUE) + public void receive(Message message) { + + PyInfraRequest pyInfraRequest = parsePyInfraRequest(message); + addFileIdToTrace(pyInfraRequest.getFileUuid()); + + log.info("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.CHUNKING_SERVICE_RESPONSE_QUEUE, pyInfraRequest.getDossierUuid(), pyInfraRequest.getFileUuid()); + fileStatusService.setStatusAnalyse(pyInfraRequest.getDossierUuid(), pyInfraRequest.getFileUuid(), false); + } + + + @RabbitListener(queues = MessagingConfiguration.CHUNKING_SERVICE_DLQ) + public void handleDLQ(Message message) { + + PyInfraRequest pyInfraRequest = parsePyInfraRequest(message); + addFileIdToTrace(pyInfraRequest.getFileUuid()); + + log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.CHUNKING_SERVICE_DLQ, pyInfraRequest.getDossierUuid(), pyInfraRequest.getFileUuid()); + fileStatusProcessingUpdateService.analysisFailed(pyInfraRequest.getDossierUuid(), + pyInfraRequest.getFileUuid(), + new FileErrorInfo("chunking service failed", + MessagingConfiguration.CHUNKING_SERVICE_DLQ, + "junker-chunker-service", + OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS))); + } + + + private PyInfraRequest parsePyInfraRequest(Message message) { + + PyInfraRequest pyInfraRequest = null; + try { + pyInfraRequest = mapper.readValue(message.getBody(), PyInfraRequest.class); + } catch (IOException e) { + log.error("Could not decode message {}", new String(message.getBody())); + throw new RuntimeException(e); + } + return pyInfraRequest; + } + + + // This can be removed if tracing is implemented in python services. + private void addFileIdToTrace(String fileId) { + + if (observationRegistry.getCurrentObservation() != null) { + observationRegistry.getCurrentObservation().highCardinalityKeyValue("fileId", fileId); + } + } + +} diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/LayoutParsingFinishedMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/LayoutParsingFinishedMessageReceiver.java index c4f43510f..68b54cfd0 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/LayoutParsingFinishedMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/LayoutParsingFinishedMessageReceiver.java @@ -17,7 +17,7 @@ import com.iqser.red.service.persistence.management.v1.processor.service.FileSta import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusService; import com.iqser.red.service.persistence.management.v1.processor.service.ImageSimilarityService; import com.iqser.red.service.persistence.management.v1.processor.service.websocket.WebsocketService; -import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.LayoutParsingRequestIdentifierService; +import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.QueueMessageIdentifierService; import com.iqser.red.service.persistence.management.v1.processor.service.persistence.SaasMigrationStatusPersistenceService; import com.iqser.red.service.persistence.management.v1.processor.utils.StorageIdUtils; import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileErrorInfo; @@ -38,7 +38,6 @@ public class LayoutParsingFinishedMessageReceiver { private final FileStatusService fileStatusService; private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService; private final ObjectMapper objectMapper; - private final LayoutParsingRequestIdentifierService layoutParsingRequestIdentifierService; private final SaasMigrationStatusPersistenceService saasMigrationStatusPersistenceService; private final SaasMigrationService saasMigrationService; private final ImageSimilarityService imageSimilarityService; @@ -49,22 +48,23 @@ public class LayoutParsingFinishedMessageReceiver { @RabbitListener(queues = LayoutParsingQueueNames.LAYOUT_PARSING_FINISHED_EVENT_QUEUE) public void receive(LayoutParsingFinishedEvent response) { - if (saasMigrationStatusPersistenceService.isMigrating(layoutParsingRequestIdentifierService.parseFileId(response.identifier()))) { - saasMigrationService.handleLayoutParsingFinished(layoutParsingRequestIdentifierService.parseDossierId(response.identifier()), - layoutParsingRequestIdentifierService.parseFileId(response.identifier())); + var dossierId = QueueMessageIdentifierService.parseDossierId(response.identifier()); + var fileId = QueueMessageIdentifierService.parseFileId(response.identifier()); + log.info("Layout parsing has finished for {}/{} in {}", dossierId, fileId, LayoutParsingQueueNames.LAYOUT_PARSING_FINISHED_EVENT_QUEUE); + if (saasMigrationStatusPersistenceService.isMigrating(QueueMessageIdentifierService.parseFileId(response.identifier()))) { + saasMigrationService.handleLayoutParsingFinished(QueueMessageIdentifierService.parseDossierId(response.identifier()), + QueueMessageIdentifierService.parseFileId(response.identifier())); return; } var templateId = ""; - var dossierId = layoutParsingRequestIdentifierService.parseDossierId(response.identifier()); - var fileId = layoutParsingRequestIdentifierService.parseFileId(response.identifier()); var storageId = StorageIdUtils.getStorageId(dossierId, fileId, FileType.DOCUMENT_STRUCTURE); - fileStatusService.setStatusAnalyse(layoutParsingRequestIdentifierService.parseDossierId(response.identifier()), - layoutParsingRequestIdentifierService.parseFileId(response.identifier()), - layoutParsingRequestIdentifierService.parsePriority(response.identifier())); + fileStatusService.setStatusAnalyse(QueueMessageIdentifierService.parseDossierId(response.identifier()), + QueueMessageIdentifierService.parseFileId(response.identifier()), + QueueMessageIdentifierService.parsePriority(response.identifier())); - fileStatusService.updateLayoutProcessedTime(layoutParsingRequestIdentifierService.parseFileId(response.identifier())); + fileStatusService.updateLayoutProcessedTime(QueueMessageIdentifierService.parseFileId(response.identifier())); websocketService.sendAnalysisEvent(dossierId, fileId, AnalyseStatus.LAYOUT_UPDATE, fileStatusService.getStatus(fileId).getNumberOfAnalyses() + 1); try { @@ -74,7 +74,6 @@ public class LayoutParsingFinishedMessageReceiver { throw e; } - log.info("Received message {} in {}", response, LayoutParsingQueueNames.LAYOUT_PARSING_FINISHED_EVENT_QUEUE); } @@ -88,9 +87,9 @@ public class LayoutParsingFinishedMessageReceiver { if (errorCause == null) { errorCause = "Error occured during layout parsing!"; } - if (saasMigrationStatusPersistenceService.isMigrating(layoutParsingRequestIdentifierService.parseFileId(analyzeRequest.identifier()))) { - saasMigrationService.handleError(layoutParsingRequestIdentifierService.parseDossierId(analyzeRequest.identifier()), - layoutParsingRequestIdentifierService.parseFileId(analyzeRequest.identifier()), + if (saasMigrationStatusPersistenceService.isMigrating(QueueMessageIdentifierService.parseFileId(analyzeRequest.identifier()))) { + saasMigrationService.handleError(QueueMessageIdentifierService.parseDossierId(analyzeRequest.identifier()), + QueueMessageIdentifierService.parseFileId(analyzeRequest.identifier()), errorCause, LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_QUEUE); return; @@ -99,8 +98,8 @@ public class LayoutParsingFinishedMessageReceiver { OffsetDateTime timestamp = failedMessage.getMessageProperties().getHeader(MessagingConfiguration.X_ERROR_INFO_TIMESTAMP_HEADER); timestamp = timestamp != null ? timestamp : OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS); log.info("Failed to process layout parsing request, errorCause: {}, timestamp: {}", errorCause, timestamp); - fileStatusProcessingUpdateService.analysisFailed(layoutParsingRequestIdentifierService.parseDossierId(analyzeRequest.identifier()), - layoutParsingRequestIdentifierService.parseFileId(analyzeRequest.identifier()), + fileStatusProcessingUpdateService.analysisFailed(QueueMessageIdentifierService.parseDossierId(analyzeRequest.identifier()), + QueueMessageIdentifierService.parseFileId(analyzeRequest.identifier()), new FileErrorInfo(errorCause, LAYOUT_PARSING_DLQ, "layoutparser-service", timestamp)); } diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/NerMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/NerMessageReceiver.java index 5c6490c92..6d32f55ca 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/NerMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/NerMessageReceiver.java @@ -14,7 +14,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration; import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusProcessingUpdateService; import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusService; +import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.QueueMessageIdentifierService; import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileErrorInfo; +import com.knecon.fforesight.llm.service.LlmNerMessage; +import com.knecon.fforesight.llm.service.LlmNerResponseMessage; import io.micrometer.observation.ObservationRegistry; import lombok.RequiredArgsConstructor; @@ -48,6 +51,19 @@ public class NerMessageReceiver { } + @SneakyThrows + @RabbitListener(queues = MessagingConfiguration.LLM_NER_SERVICE_RESPONSE_QUEUE) + public void receiveLLM(LlmNerResponseMessage message) { + + String dossierId = QueueMessageIdentifierService.parseDossierId(message.getIdentifier()); + String fileId = QueueMessageIdentifierService.parseFileId(message.getIdentifier()); + addFileIdToTrace(fileId); + + log.info("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.LLM_NER_SERVICE_RESPONSE_QUEUE, dossierId, fileId); + fileStatusService.setStatusAnalyse(dossierId, fileId, false); + } + + @RabbitListener(queues = MessagingConfiguration.NER_SERVICE_DLQ) public void handleDLQMessage(Message failedMessage) throws IOException { @@ -67,6 +83,26 @@ public class NerMessageReceiver { } + @RabbitListener(queues = MessagingConfiguration.LLM_NER_SERVICE_DLQ) + public void handleLLMDLQMessage(Message failedMessage) throws IOException { + + LlmNerMessage entityResponse = objectMapper.readValue(failedMessage.getBody(), LlmNerMessage.class); + + String dossierId = QueueMessageIdentifierService.parseDossierId(entityResponse.getIdentifier()); + String fileId = QueueMessageIdentifierService.parseDossierId(entityResponse.getIdentifier()); + + addFileIdToTrace(fileId); + + log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.LLM_NER_SERVICE_DLQ, dossierId, fileId); + fileStatusProcessingUpdateService.analysisFailed(dossierId, + fileId, + new FileErrorInfo("llm ner service failed", + MessagingConfiguration.LLM_NER_SERVICE_DLQ, + "entity-recognition-service-v3", + OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS))); + } + + // This can be removed if tracing is implemented in python services. private void addFileIdToTrace(String fileId) { diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/settings/FileManagementServiceSettings.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/settings/FileManagementServiceSettings.java index e9ff584bb..db48ab9d8 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/settings/FileManagementServiceSettings.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/settings/FileManagementServiceSettings.java @@ -25,6 +25,7 @@ public class FileManagementServiceSettings { private boolean imageServiceEnabled = true; private boolean nerServiceEnabled = true; private boolean azureNerServiceEnabled; + private boolean llmNerServiceEnabled; private boolean visualLayoutParsingEnabled; private boolean storeImageFile = true; diff --git a/persistence-service-v1/persistence-service-shared-api-v1/src/main/java/com/iqser/red/service/persistence/service/v1/api/shared/model/dossiertemplate/dossier/file/FileType.java b/persistence-service-v1/persistence-service-shared-api-v1/src/main/java/com/iqser/red/service/persistence/service/v1/api/shared/model/dossiertemplate/dossier/file/FileType.java index d7acb3796..5e135ef47 100644 --- a/persistence-service-v1/persistence-service-shared-api-v1/src/main/java/com/iqser/red/service/persistence/service/v1/api/shared/model/dossiertemplate/dossier/file/FileType.java +++ b/persistence-service-v1/persistence-service-shared-api-v1/src/main/java/com/iqser/red/service/persistence/service/v1/api/shared/model/dossiertemplate/dossier/file/FileType.java @@ -11,6 +11,7 @@ public enum FileType { SIMPLIFIED_TEXT(".json"), TEXT(".json"), // deprecated file type, only present in legacy migrations NER_ENTITIES(".json"), + LLM_NER_ENTITIES(".json"), AZURE_NER_ENTITIES(".json"), IMAGE_INFO(".json"), IMPORTED_REDACTIONS(".json"), @@ -26,6 +27,7 @@ public enum FileType { DOCUMENT_STRUCTURE(".json"), DOCUMENT_POSITION(".json"), DOCUMENT_PAGES(".json"), + DOCUMENT_CHUNKS(".json"), ENTITY_LOG(".json"), ENTITY_LOG_BAK(".json"), COMPONENT_LOG(".json"),