From 4789a89257187cb63684743120ed8498e41badfc Mon Sep 17 00:00:00 2001 From: Maverick Studer Date: Thu, 29 Aug 2024 15:11:03 +0200 Subject: [PATCH] Tenant retry and queue renames --- .../build.gradle.kts | 13 +- .../configuration/MessagingConfiguration.java | 213 ++++++++++-------- .../UserMessagingConfiguration.java | 14 +- .../service/DossierTemplateExportService.java | 2 +- .../service/FileExchangeExportService.java | 2 +- .../migrations/QueueRenameMigration23.java | 185 +++++++-------- .../v1/processor/service/DownloadService.java | 2 +- .../processor/service/FileStatusService.java | 13 +- .../v1/processor/service/IndexingService.java | 4 +- .../download/DownloadCompressingService.java | 2 +- .../download/DownloadDLQMessageReceiver.java | 2 +- .../AnalysisFlagCalculationSchedulerJob.java | 3 +- .../queue/ChunkingMessageReceiver.java | 12 +- .../service/queue/NerMessageReceiver.java | 11 +- .../queue/OCRProcessingMessageReceiver.java | 2 +- .../TenantExchangeMessageReceiverImpl.java | 34 ++- .../build.gradle.kts | 4 +- 17 files changed, 285 insertions(+), 233 deletions(-) diff --git a/persistence-service-v1/persistence-service-processor-v1/build.gradle.kts b/persistence-service-v1/persistence-service-processor-v1/build.gradle.kts index d59a0fe6a..b090438cf 100644 --- a/persistence-service-v1/persistence-service-processor-v1/build.gradle.kts +++ b/persistence-service-v1/persistence-service-processor-v1/build.gradle.kts @@ -30,7 +30,7 @@ dependencies { exclude(group = "com.iqser.red.service", module = "persistence-service-internal-api-v1") exclude(group = "com.iqser.red.service", module = "persistence-service-shared-api-v1") } - api("com.knecon.fforesight:layoutparser-service-internal-api:0.161.0") { + api("com.knecon.fforesight:layoutparser-service-internal-api:0.163.0") { exclude(group = "com.iqser.red.service", module = "persistence-service-internal-api-v1") exclude(group = "com.iqser.red.service", module = "persistence-service-shared-api-v1") } @@ -42,10 +42,15 @@ dependencies { exclude(group = "com.iqser.red.service", module = "persistence-service-internal-api-v1") exclude(group = "com.iqser.red.service", module = "persistence-service-shared-api-v1") } - implementation("com.knecon.fforesight:llm-service-api:1.10.0") + implementation("com.knecon.fforesight:llm-service-api:1.13.0") api("com.knecon.fforesight:jobs-commons:0.10.0") - api("com.knecon.fforesight:database-tenant-commons:0.24.0") - api("com.knecon.fforesight:keycloak-commons:0.30.0") + api("com.knecon.fforesight:tenant-commons:0.29.0") + api("com.knecon.fforesight:database-tenant-commons:0.24.0") { + exclude(group = "com.knecon.fforesight", module = "tenant-commons") + } + api("com.knecon.fforesight:keycloak-commons:0.30.0") { + exclude(group = "com.knecon.fforesight", module = "tenant-commons") + } api("com.knecon.fforesight:tracing-commons:0.5.0") api("com.knecon.fforesight:swagger-commons:0.7.0") api("com.giffing.bucket4j.spring.boot.starter:bucket4j-spring-boot-starter:0.4.0") diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/MessagingConfiguration.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/MessagingConfiguration.java index 146ac188d..51e29aa65 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/MessagingConfiguration.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/MessagingConfiguration.java @@ -18,107 +18,114 @@ import lombok.RequiredArgsConstructor; public class MessagingConfiguration { // persistence-service - public static final String DOWNLOAD_QUEUE_PREFIX = "download_queue"; - public static final String DOWNLOAD_EXCHANGE = "download_exchange"; - public static final String DOWNLOAD_DLQ = "download_dlq"; + public static final String DOWNLOAD_REQUEST_QUEUE_PREFIX = "download_request"; + public static final String DOWNLOAD_REQUEST_EXCHANGE = "download_request_exchange"; + public static final String DOWNLOAD_DLQ = "download_error"; - public static final String DOWNLOAD_COMPRESSION_QUEUE_PREFIX = "download_compression_queue"; - public static final String DOWNLOAD_COMPRESSION_EXCHANGE = "download_compression_exchange"; - public static final String DOWNLOAD_COMPRESSION_DLQ = "download_compression_dlq"; + public static final String DOWNLOAD_COMPRESSION_REQUEST_QUEUE_PREFIX = "download_compression_request"; + public static final String DOWNLOAD_COMPRESSION_REQUEST_EXCHANGE = "download_compression_request_exchange"; + public static final String DOWNLOAD_COMPRESSION_DLQ = "download_compression_error"; - public static final String EXPORT_DOWNLOAD_QUEUE_PREFIX = "export_download_queue"; - public static final String EXPORT_DOWNLOAD_EXCHANGE = "export_download_exchange"; - public static final String EXPORT_DOWNLOAD_DLQ = "export_download_dlq"; + public static final String EXPORT_DOWNLOAD_REQUEST_QUEUE_PREFIX = "export_download_request"; + public static final String EXPORT_DOWNLOAD_REQUEST_EXCHANGE = "export_download_request_exchange"; + public static final String EXPORT_DOWNLOAD_DLQ = "export_download_error"; - public static final String ANALYSIS_FLAG_CALCULATION_QUEUE_PREFIX = "analysis_flag_calculation_queue"; - public static final String ANALYSIS_FLAG_CALCULATION_EXCHANGE = "analysis_flag_calculation_exchange"; + public static final String ANALYSIS_FLAG_CALCULATION_REQUEST_QUEUE_PREFIX = "analysis_flag_calculation_request"; + public static final String ANALYSIS_FLAG_CALCULATION_REQUEST_EXCHANGE = "analysis_flag_calculation_request_exchange"; - public static final String REDACTION_RESPONSE_QUEUE_PREFIX = "redaction_response_queue"; + public static final String REDACTION_RESPONSE_QUEUE_PREFIX = "redaction_response"; public static final String REDACTION_RESPONSE_EXCHANGE = "redaction_response_exchange"; - public static final String PDFTRON_RESPONSE_QUEUE_PREFIX = "pdftron_response_queue"; + public static final String PDFTRON_RESPONSE_QUEUE_PREFIX = "pdftron_response"; public static final String PDFTRON_RESPONSE_EXCHANGE = "pdftron_response_exchange"; - public static final String REPORT_RESPONSE_QUEUE_PREFIX = "report_response_queue"; + public static final String REPORT_RESPONSE_QUEUE_PREFIX = "report_response"; public static final String REPORT_RESPONSE_EXCHANGE = "report_response_exchange"; - public static final String REPORT_RESPONSE_DLQ = "report_response_dlq"; + public static final String REPORT_RESPONSE_DLQ = "report_response_error"; - public static final String OCR_RESPONSE_QUEUE_PREFIX = "ocr_response_queue"; + public static final String OCR_RESPONSE_QUEUE_PREFIX = "ocr_response"; public static final String OCR_RESPONSE_EXCHANGE = "ocr_response_exchange"; - public static final String OCR_STATUS_UPDATE_QUEUE_PREFIX = "ocr_status_update_queue"; - public static final String OCR_STATUS_UPDATE_EXCHANGE = "ocr_status_update_exchange"; - public static final String OCR_STATUS_UPDATE_DLQ = "ocr_status_update_dlq"; + public static final String OCR_STATUS_UPDATE_RESPONSE_QUEUE_PREFIX = "ocr_status_update_response"; + public static final String OCR_STATUS_UPDATE_RESPONSE_EXCHANGE = "ocr_status_update_response_exchange"; + public static final String OCR_STATUS_UPDATE_DLQ = "ocr_status_update_error"; - public static final String IMAGE_RESPONSE_QUEUE_PREFIX = "image_response_queue"; + public static final String IMAGE_RESPONSE_QUEUE_PREFIX = "image_response"; public static final String IMAGE_RESPONSE_EXCHANGE = "image_response_exchange"; - public static final String ENTITY_RESPONSE_QUEUE_PREFIX = "entity_response_queue"; + public static final String ENTITY_RESPONSE_QUEUE_PREFIX = "entity_response"; public static final String ENTITY_RESPONSE_EXCHANGE = "entity_response_exchange"; - public static final String AZURE_ENTITY_RESPONSE_QUEUE_PREFIX = "azure_entity_response_queue"; + public static final String AZURE_ENTITY_RESPONSE_QUEUE_PREFIX = "azure_entity_response"; public static final String AZURE_ENTITY_RESPONSE_EXCHANGE = "azure_entity_response_exchange"; - public static final String CHUNKING_SERVICE_QUEUE = "chunking_service_request_queue"; - public static final String CHUNKING_SERVICE_RESPONSE_QUEUE = "chunking_service_response_queue"; - public static final String CHUNKING_SERVICE_DLQ = "chunking_service_dlq"; + public static final String CHUNKING_RESPONSE_QUEUE_PREFIX = "chunking_response"; + public static final String CHUNKING_RESPONSE_EXCHANGE = "chunking_response_exchange"; + public static final String CHUNKING_DLQ = "chunking_error"; - public static final String LLM_NER_SERVICE_QUEUE = QueueNames.LLM_NER_SERVICE_QUEUE; - public static final String LLM_NER_SERVICE_RESPONSE_QUEUE = QueueNames.LLM_NER_SERVICE_RESPONSE_QUEUE; - public static final String LLM_NER_SERVICE_DLQ = QueueNames.LLM_NER_SERVICE_DLQ; + public static final String LLM_NER_RESPONSE_QUEUE_PREFIX = QueueNames.LLM_NER_RESPONSE_QUEUE_PREFIX; + public static final String LLM_NER_RESPONSE_EXCHANGE = QueueNames.LLM_NER_RESPONSE_EXCHANGE; + public static final String LLM_NER_DLQ = QueueNames.LLM_NER_DLQ; - public static final String CV_ANALYSIS_RESPONSE_QUEUE_PREFIX = "cv_analysis_response_queue"; + public static final String CV_ANALYSIS_RESPONSE_QUEUE_PREFIX = "cv_analysis_response"; public static final String CV_ANALYSIS_RESPONSE_EXCHANGE = "cv_analysis_response_exchange"; - public static final String VISUAL_LAYOUT_PARSING_RESPONSE_QUEUE_PREFIX = "visual_layout_parsing_response_queue"; + public static final String VISUAL_LAYOUT_PARSING_RESPONSE_QUEUE_PREFIX = "visual_layout_parsing_response"; public static final String VISUAL_LAYOUT_PARSING_RESPONSE_EXCHANGE = "visual_layout_parsing_response_exchange"; // pdftron-redaction-service - public static final String PREPROCESSING_EXCHANGE = "preprocessing_exchange"; - public static final String PREPROCESSING_DLQ = "preprocessing_dlq"; + public static final String PREPROCESSING_REQUEST_EXCHANGE = "preprocessing_request_exchange"; + public static final String PREPROCESSING_DLQ = "preprocessing_error"; public static final String PDFTRON_REQUEST_EXCHANGE = "pdftron_request_exchange"; - public static final String PDFTRON_DLQ = "pdftron_dlq"; + public static final String PDFTRON_DLQ = "pdftron_error"; // redaction-service - public static final String REDACTION_REQUEST_QUEUE_PREFIX = "redaction_request_queue"; + public static final String REDACTION_REQUEST_QUEUE_PREFIX = "redaction_request"; public static final String REDACTION_REQUEST_EXCHANGE = "redaction_request_exchange"; + public static final String REDACTION_PRIORITY_REQUEST_QUEUE_PREFIX = "redaction_priority_request"; public static final String REDACTION_PRIORITY_REQUEST_EXCHANGE = "redaction_priority_request_exchange"; - public static final String REDACTION_DLQ = "redaction_dlq"; + public static final String REDACTION_DLQ = "redaction_error"; // redaction-report-service public static final String REPORT_REQUEST_EXCHANGE = "report_request_exchange"; - public static final String REPORT_REQUEST_DLQ = "report_request_dlq"; + public static final String REPORT_REQUEST_DLQ = "report_request_error"; // search-service - public static final String INDEXING_EXCHANGE = "indexing_exchange"; - public static final String INDEXING_DLQ = "indexing_dlq"; + public static final String INDEXING_REQUEST_EXCHANGE = "indexing_request_exchange"; + public static final String INDEXING_DLQ = "indexing_error"; - public static final String DELETE_FROM_INDEX_EXCHANGE = "delete_from_index_exchange"; - public static final String DELETE_FROM_INDEX_DLQ = "delete_from_index_dlq"; + public static final String DELETE_FROM_INDEX_REQUEST_EXCHANGE = "delete_from_index_request_exchange"; + public static final String DELETE_FROM_INDEX_DLQ = "delete_from_index_error"; // ocr-services public static final String OCR_REQUEST_EXCHANGE = "ocr_request_exchange"; - public static final String OCR_DLQ = "ocr_dlq"; + public static final String OCR_DLQ = "ocr_error"; // image-service public static final String IMAGE_REQUEST_EXCHANGE = "image_request_exchange"; - public static final String IMAGE_DLQ = "image_dlq"; + public static final String IMAGE_DLQ = "image_error"; // entity-recognition-service public static final String ENTITY_REQUEST_EXCHANGE = "entity_request_exchange"; - public static final String ENTITY_DLQ = "entity_dlq"; + public static final String ENTITY_DLQ = "entity_error"; // azure-ner-service public static final String AZURE_ENTITY_REQUEST_EXCHANGE = "azure_entity_request_exchange"; - public static final String AZURE_ENTITY_DLQ = "azure_entity_dlq"; + public static final String AZURE_ENTITY_DLQ = "azure_entity_error"; // cv-analysis-service public static final String CV_ANALYSIS_REQUEST_EXCHANGE = "cv_analysis_request_exchange"; - public static final String CV_ANALYSIS_DLQ = "cv_analysis_dlq"; + public static final String CV_ANALYSIS_DLQ = "cv_analysis_error"; // visual-layout-parsing-service public static final String VISUAL_LAYOUT_PARSING_REQUEST_EXCHANGE = "visual_layout_parsing_request_exchange"; - public static final String VISUAL_LAYOUT_PARSING_DLQ = "visual_layout_parsing_dlq"; + public static final String VISUAL_LAYOUT_PARSING_DLQ = "visual_layout_parsing_error"; + + //chunking-service + public static final String CHUNKING_REQUEST_EXCHANGE = "chunking_request_exchange"; + + //llm-ner-service + public static final String LLM_NER_REQUEST_EXCHANGE = QueueNames.LLM_NER_REQUEST_EXCHANGE; public static final String X_ERROR_INFO_HEADER = "x-error-message"; public static final String X_ERROR_INFO_TIMESTAMP_HEADER = "x-error-message-timestamp"; @@ -133,7 +140,7 @@ public class MessagingConfiguration { @Bean public DirectExchange downloadExchange() { - return new DirectExchange(DOWNLOAD_EXCHANGE); + return new DirectExchange(DOWNLOAD_REQUEST_EXCHANGE); } @@ -147,7 +154,7 @@ public class MessagingConfiguration { @Bean public DirectExchange downloadCompressionExchange() { - return new DirectExchange(DOWNLOAD_COMPRESSION_EXCHANGE); + return new DirectExchange(DOWNLOAD_COMPRESSION_REQUEST_EXCHANGE); } @@ -161,7 +168,7 @@ public class MessagingConfiguration { @Bean public DirectExchange exportDownloadExchange() { - return new DirectExchange(EXPORT_DOWNLOAD_EXCHANGE); + return new DirectExchange(EXPORT_DOWNLOAD_REQUEST_EXCHANGE); } @@ -175,7 +182,7 @@ public class MessagingConfiguration { @Bean public DirectExchange analysisFlagCalculationExchange() { - return new DirectExchange(ANALYSIS_FLAG_CALCULATION_EXCHANGE); + return new DirectExchange(ANALYSIS_FLAG_CALCULATION_REQUEST_EXCHANGE); } @@ -194,50 +201,30 @@ public class MessagingConfiguration { @Bean - public Queue llmNerRequestQueue() { + public DirectExchange chunkingResponseExchange() { - return QueueBuilder.durable(LLM_NER_SERVICE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", LLM_NER_SERVICE_DLQ).build(); - } - - - @Bean - public Queue llmNerResponseQueue() { - - return QueueBuilder.durable(LLM_NER_SERVICE_RESPONSE_QUEUE) - .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", LLM_NER_SERVICE_DLQ) - .build(); - } - - - @Bean - public Queue llmNerResponseDLQ() { - - return QueueBuilder.durable(LLM_NER_SERVICE_DLQ).build(); - } - - - @Bean - public Queue chunkingRequestQueue() { - - return QueueBuilder.durable(CHUNKING_SERVICE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", CHUNKING_SERVICE_DLQ).build(); - } - - - @Bean - public Queue chunkingResponseQueue() { - - return QueueBuilder.durable(CHUNKING_SERVICE_RESPONSE_QUEUE) - .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", CHUNKING_SERVICE_DLQ) - .build(); + return new DirectExchange(CHUNKING_RESPONSE_EXCHANGE); } @Bean public Queue chunkingDLQ() { - return QueueBuilder.durable(CHUNKING_SERVICE_DLQ).build(); + return QueueBuilder.durable(CHUNKING_DLQ).build(); + } + + + @Bean + public DirectExchange llmNerResponseExchange() { + + return new DirectExchange(LLM_NER_RESPONSE_EXCHANGE); + } + + + @Bean + public Queue llmNerDLQ() { + + return QueueBuilder.durable(LLM_NER_DLQ).build(); } @@ -268,6 +255,7 @@ public class MessagingConfiguration { return new DirectExchange(ENTITY_RESPONSE_EXCHANGE); } + @Bean public DirectExchange azureEntityResponseExchange() { @@ -298,19 +286,33 @@ public class MessagingConfiguration { // ---- pdftron-redaction-service ---- @Bean - public DirectExchange preprocessingExchange() { + public DirectExchange preprocessingRequestExchange() { - return new DirectExchange(PREPROCESSING_EXCHANGE); + return new DirectExchange(PREPROCESSING_REQUEST_EXCHANGE); } @Bean - public Queue preprocessingDeadLetterQueue() { + public DirectExchange preprocessingResponseExchange() { + + return new DirectExchange(PREPROCESSING_REQUEST_EXCHANGE); + } + + + @Bean + public Queue preprocessingDLQ() { return QueueBuilder.durable(PREPROCESSING_DLQ).build(); } + @Bean + public DirectExchange pdftronRequestExchange() { + + return new DirectExchange(PDFTRON_REQUEST_EXCHANGE); + } + + @Bean public DirectExchange pdftronResponseExchange() { @@ -364,9 +366,9 @@ public class MessagingConfiguration { // ---- search-service ---- @Bean - public DirectExchange indexingExchange() { + public DirectExchange indexingRequestExchange() { - return new DirectExchange(INDEXING_EXCHANGE); + return new DirectExchange(INDEXING_REQUEST_EXCHANGE); } @@ -378,9 +380,9 @@ public class MessagingConfiguration { @Bean - public DirectExchange deleteFromIndexExchange() { + public DirectExchange deleteFromIndexRequestExchange() { - return new DirectExchange(DELETE_FROM_INDEX_EXCHANGE); + return new DirectExchange(DELETE_FROM_INDEX_REQUEST_EXCHANGE); } @@ -407,9 +409,9 @@ public class MessagingConfiguration { @Bean - public DirectExchange ocrStatusUpdateExchange() { + public DirectExchange ocrStatusUpdateResponseExchange() { - return new DirectExchange(OCR_STATUS_UPDATE_EXCHANGE); + return new DirectExchange(OCR_STATUS_UPDATE_RESPONSE_EXCHANGE); } @@ -448,7 +450,10 @@ public class MessagingConfiguration { return QueueBuilder.durable(ENTITY_DLQ).build(); } + // ---- azure-ner-service ---- + + @Bean public DirectExchange azureNerRequestExchange() { @@ -508,6 +513,22 @@ public class MessagingConfiguration { } + // ---- chunking-service ---- + @Bean + public DirectExchange chunkingRequestExchange() { + + return new DirectExchange(CHUNKING_REQUEST_EXCHANGE); + } + + + // ----- llm-ner-service ---- + @Bean + public DirectExchange llmNerRequestExchange() { + + return new DirectExchange(LLM_NER_REQUEST_EXCHANGE); + } + + // ---- Saas Migration ---- @Bean public Queue migrationQueue() { diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/UserMessagingConfiguration.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/UserMessagingConfiguration.java index c8724e8a5..a148d6c75 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/UserMessagingConfiguration.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/UserMessagingConfiguration.java @@ -16,13 +16,13 @@ import lombok.RequiredArgsConstructor; @RequiredArgsConstructor public class UserMessagingConfiguration { - public static final String PERSISTENCE_SERVICE_USER_CREATED_QUEUE = "persistence-service-user-created-queue"; - public static final String PERSISTENCE_SERVICE_USER_DELETED_QUEUE = "persistence-service-user-deleted-queue"; - public static final String PERSISTENCE_SERVICE_USER_UPDATED_QUEUE = "persistence-service-user-updated-queue"; - public static final String PERSISTENCE_SERVICE_USER_STATUS_CHANGED_QUEUE = "persistence-service-user-status-changed-queue"; - public static final String PERSISTENCE_SERVICE_USER_ROLES_UPDATED_QUEUE = "persistence-service-user-roles-updated-queue"; - public static final String PERSISTENCE_SERVICE_USER_OWN_PROFILE_UPDATED_QUEUE = "persistence-service-user-own-profile-updated-queue"; - public static final String PERSISTENCE_SERVICE_USER_EVENTS_DLQ = "persistence-service-user-events-dlq"; + public static final String PERSISTENCE_SERVICE_USER_CREATED_QUEUE = "persistence-service-user-created"; + public static final String PERSISTENCE_SERVICE_USER_DELETED_QUEUE = "persistence-service-user-deleted"; + public static final String PERSISTENCE_SERVICE_USER_UPDATED_QUEUE = "persistence-service-user-updated"; + public static final String PERSISTENCE_SERVICE_USER_STATUS_CHANGED_QUEUE = "persistence-service-user-status-changed"; + public static final String PERSISTENCE_SERVICE_USER_ROLES_UPDATED_QUEUE = "persistence-service-user-roles-updated"; + public static final String PERSISTENCE_SERVICE_USER_OWN_PROFILE_UPDATED_QUEUE = "persistence-service-user-own-profile-updated"; + public static final String PERSISTENCE_SERVICE_USER_EVENTS_DLQ = "persistence-service-user-events-error"; @Bean("persistenceServiceUserRolesUpdatedQueue") diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/service/DossierTemplateExportService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/service/DossierTemplateExportService.java index ef8d39377..9210bfa9c 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/service/DossierTemplateExportService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/service/DossierTemplateExportService.java @@ -119,7 +119,7 @@ public class DossierTemplateExportService { private void addToExportDownloadQueue(ExportDownloadMessage downloadJob, int priority) { - rabbitTemplate.convertAndSend(MessagingConfiguration.EXPORT_DOWNLOAD_EXCHANGE, TenantContext.getTenantId(), downloadJob, message -> { + rabbitTemplate.convertAndSend(MessagingConfiguration.EXPORT_DOWNLOAD_REQUEST_EXCHANGE, TenantContext.getTenantId(), downloadJob, message -> { message.getMessageProperties().setPriority(priority); return message; }); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/service/FileExchangeExportService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/service/FileExchangeExportService.java index f8d206918..0fd0a11ad 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/service/FileExchangeExportService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/service/FileExchangeExportService.java @@ -75,7 +75,7 @@ public class FileExchangeExportService { private void addToExportDownloadQueue(ExportDownloadMessage downloadJob) { - rabbitTemplate.convertAndSend(MessagingConfiguration.EXPORT_DOWNLOAD_EXCHANGE, TenantContext.getTenantId(), downloadJob, message -> { + rabbitTemplate.convertAndSend(MessagingConfiguration.EXPORT_DOWNLOAD_REQUEST_EXCHANGE, TenantContext.getTenantId(), downloadJob, message -> { message.getMessageProperties().setPriority(1); return message; }); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/migration/migrations/QueueRenameMigration23.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/migration/migrations/QueueRenameMigration23.java index 7b4a45d64..7901565be 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/migration/migrations/QueueRenameMigration23.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/migration/migrations/QueueRenameMigration23.java @@ -10,92 +10,99 @@ import com.iqser.red.service.persistence.management.v1.processor.migration.Migra import lombok.Setter; import lombok.extern.slf4j.Slf4j; -@Slf4j -@Setter -@Service -public class QueueRenameMigration23 extends Migration { - - private final AmqpAdmin amqpAdmin; - - private static final String NAME = "Migration for renaming of most queues"; - private static final long VERSION = 23; - private static final List queueNames = List.of("analysis_flag_calculation_queue", - "cv_analysis_dead_letter_queue", - "cv_analysis_request_queue", - "cv_analysis_response_queue", - "deleteFromIndexDLQ", - "deleteFromIndexQueue", - "downloadDLQ", - "downloadQueue", - "download_compression_dlq", - "download_compression_queue", - "entity_dead_letter_queue", - "entity_request_queue", - "entity_response_queue", - "exportDownloadDLQ", - "exportDownloadQueue", - "image_dead_letter_queue", - "image_request_queue", - "image_response_queue", - "indexingDQL", - "indexingQueue", - "layout_parsing_dead_letter_queue", - "layout_parsing_request_queue", - "layout_parsing_response_queue", - "migrationDLQ", - "migrationQueue", - "migrationResponseQueue", - "mongo-tenant-created", - "mongo-tenant-created-dlq", - "ocr_dead_letter_queue", - "ocr_request_queue", - "ocr_response_queue", - "ocr_status_update_dead_letter_queue", - "ocr_status_update_response_queue", - "pdftron_dlq", - "pdftron_queue", - "pdftron_result_queue", - "persistence-service-user-created-queue", - "persistence-service-user-deleted-queue", - "persistence-service-user-events-dql", - "persistence-service-user-own-profile-updated-queue", - "persistence-service-user-roles-updated-queue", - "persistence-service-user-status-changed-queue", - "persistence-service-user-updated-queue", - "preprocessingDLQ", - "preprocessingQueue", - "redactionAnalysisResponseQueue", - "redactionDQL", - "redactionPriorityQueue", - "redactionQueue", - "reportDLQ", - "reportQueue", - "reportResultDLQ", - "reportResultQueue", - "tenant-created", - "tenant-created-dlq", - "tenant-delete-dlq", - "tenant-delete-queue", - "tenant-sync", - "tenant-sync-dlq", - "visual_layout_parsing_service_dead_letter_queue", - "visual_layout_parsing_service_queue", - "visual_layout_parsing_service_response_queue"); - - - public QueueRenameMigration23(AmqpAdmin amqpAdmin) { - - super(NAME, VERSION); - this.amqpAdmin = amqpAdmin; - } - - - @Override - protected void migrate() { - - log.info("Migration: Deleting queues..."); - queueNames.forEach(amqpAdmin::deleteQueue); - - } - -} +// TODO: run in a migration after 4.2 -> 4.3 +// +//@Slf4j +//@Setter +//@Service +//public class QueueRenameMigration23 extends Migration { +// +// private final AmqpAdmin amqpAdmin; +// +// private static final String NAME = "Migration for renaming of most queues"; +// private static final long VERSION = 23; +// private static final List queueNames = List.of("analysis_flag_calculation_queue", +// "cv_analysis_dead_letter_queue", +// "cv_analysis_request_queue", +// "cv_analysis_response_queue", +// "deleteFromIndexDLQ", +// "deleteFromIndexQueue", +// "downloadDLQ", +// "downloadQueue", +// "download_compression_dlq", +// "download_compression_queue", +// "entity_dead_letter_queue", +// "entity_request_queue", +// "entity_response_queue", +// "exportDownloadDLQ", +// "exportDownloadQueue", +// "image_dead_letter_queue", +// "image_request_queue", +// "image_response_queue", +// "indexingDQL", +// "indexingQueue", +// "layout_parsing_dead_letter_queue", +// "layout_parsing_request_queue", +// "layout_parsing_response_queue", +// "migrationDLQ", +// "migrationQueue", +// "migrationResponseQueue", +// "mongo-tenant-created", +// "mongo-tenant-created-dlq", +// "ocr_dead_letter_queue", +// "ocr_request_queue", +// "ocr_response_queue", +// "ocr_status_update_dead_letter_queue", +// "ocr_status_update_response_queue", +// "pdftron_dlq", +// "pdftron_queue", +// "pdftron_result_queue", +// "persistence-service-user-created-queue", +// "persistence-service-user-deleted-queue", +// "persistence-service-user-events-dql", +// "persistence-service-user-own-profile-updated-queue", +// "persistence-service-user-roles-updated-queue", +// "persistence-service-user-status-changed-queue", +// "persistence-service-user-updated-queue", +// "preprocessingDLQ", +// "preprocessingQueue", +// "redactionAnalysisResponseQueue", +// "redactionDQL", +// "redactionPriorityQueue", +// "redactionQueue", +// "reportDLQ", +// "reportQueue", +// "reportResultDLQ", +// "reportResultQueue", +// "tenant-created", +// "tenant-created-dlq", +// "tenant-delete-dlq", +// "tenant-delete-queue", +// "tenant-updated-queue", +// "tenant-updated-dlq", +// "tenant-sync", +// "tenant-sync-dlq", +// "visual_layout_parsing_service_dead_letter_queue", +// "visual_layout_parsing_service_queue", +// "visual_layout_parsing_service_response_queue"); +// +// +// public QueueRenameMigration23(AmqpAdmin amqpAdmin) { +// +// super(NAME, VERSION); +// this.amqpAdmin = amqpAdmin; +// } +// +// +// @Override +// protected void migrate() { +// +// log.info("Migration: Deleting queues..."); +// for (String queueName : queueNames) { +// log.info("Migration: Deleting queue {}", queueName); +// amqpAdmin.deleteQueue(queueName); +// } +// +// } +// +//} diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/DownloadService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/DownloadService.java index 5ce2fbedd..bf4698bad 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/DownloadService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/DownloadService.java @@ -140,7 +140,7 @@ public class DownloadService { private void addToDownloadQueue(DownloadJob downloadJob, int priority) { - rabbitTemplate.convertAndSend(MessagingConfiguration.DOWNLOAD_EXCHANGE, TenantContext.getTenantId(), downloadJob, message -> { + rabbitTemplate.convertAndSend(MessagingConfiguration.DOWNLOAD_REQUEST_EXCHANGE, TenantContext.getTenantId(), downloadJob, message -> { message.getMessageProperties().setPriority(priority); return message; }); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusService.java index eaca681b5..6332436d7 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusService.java @@ -1,6 +1,6 @@ package com.iqser.red.service.persistence.management.v1.processor.service; -import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.CHUNKING_SERVICE_QUEUE; +import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.CHUNKING_REQUEST_EXCHANGE; import java.time.OffsetDateTime; import java.util.ArrayList; @@ -299,14 +299,14 @@ public class FileStatusService { if (settings.isLlmNerServiceEnabled() && !fileManagementStorageService.objectExists(dossierId, fileId, FileType.DOCUMENT_CHUNKS)) { var chunkingRequest = ChunkingRequestFactory.createChunkingRequest(dossierId, fileId); setStatusFullProcessing(fileId); - log.info("Sending request to {} for {}/{}", CHUNKING_SERVICE_QUEUE, dossierId, fileId); - rabbitTemplate.convertAndSend(CHUNKING_SERVICE_QUEUE, chunkingRequest); + log.info("Sending request to {} for {}/{}", CHUNKING_REQUEST_EXCHANGE, dossierId, fileId); + rabbitTemplate.convertAndSend(CHUNKING_REQUEST_EXCHANGE, TenantContext.getTenantId(), chunkingRequest); sendReadOnlyAnalysisEvent(dossierId, fileId, fileEntity); return; } if (settings.isLlmNerServiceEnabled() && !fileManagementStorageService.objectExists(dossierId, fileId, FileType.LLM_NER_ENTITIES)) { - log.info("Add file: {} from dossier {} to NER queue", fileId, dossierId); + log.info("Add file: {} from dossier {} to LLM NER queue", fileId, dossierId); addToLLMNerQueue(dossierId, fileId); sendReadOnlyAnalysisEvent(dossierId, fileId, fileEntity); return; @@ -426,7 +426,7 @@ public class FileStatusService { setStatusPreProcessingQueued(fileId); - rabbitTemplate.convertAndSend(MessagingConfiguration.PREPROCESSING_EXCHANGE, TenantContext.getTenantId(), processUntouchedDocumentRequest); + rabbitTemplate.convertAndSend(MessagingConfiguration.PREPROCESSING_REQUEST_EXCHANGE, TenantContext.getTenantId(), processUntouchedDocumentRequest); } @@ -531,7 +531,8 @@ public class FileStatusService { protected void addToLLMNerQueue(String dossierId, String fileId) { setStatusNerAnalyzing(fileId); - rabbitTemplate.convertAndSend(MessagingConfiguration.LLM_NER_SERVICE_QUEUE, + rabbitTemplate.convertAndSend(MessagingConfiguration.LLM_NER_REQUEST_EXCHANGE, + TenantContext.getTenantId(), LlmNerMessage.builder() .identifier(QueueMessageIdentifierService.buildIdentifier(dossierId, fileId, false)) .chunksStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.DOCUMENT_CHUNKS)) diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/IndexingService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/IndexingService.java index 2d3e0f1f7..99afb770e 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/IndexingService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/IndexingService.java @@ -67,7 +67,7 @@ public class IndexingService { public void addToIndexingQueue(IndexMessageType indexMessageType, String dossierTemplateId, String dossierId, String fileId, int priority) { - rabbitTemplate.convertAndSend(MessagingConfiguration.INDEXING_EXCHANGE, + rabbitTemplate.convertAndSend(MessagingConfiguration.INDEXING_REQUEST_EXCHANGE, TenantContext.getTenantId(), IndexMessage.builder().messageType(indexMessageType).dossierTemplateId(dossierTemplateId).dossierId(dossierId).fileId(fileId).build(), message -> { @@ -79,7 +79,7 @@ public class IndexingService { public void addToDeleteFromIndexQueue(String dossierId, String fileId, int priority) { - rabbitTemplate.convertAndSend(MessagingConfiguration.DELETE_FROM_INDEX_EXCHANGE, + rabbitTemplate.convertAndSend(MessagingConfiguration.DELETE_FROM_INDEX_REQUEST_EXCHANGE, TenantContext.getTenantId(), IndexMessage.builder().dossierId(dossierId).fileId(fileId).build(), message -> { diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadCompressingService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadCompressingService.java index b0414806d..1043f7807 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadCompressingService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadCompressingService.java @@ -30,7 +30,7 @@ public class DownloadCompressingService { if (updated == 1) { websocketService.sendDownloadEvent(downloadId, userId, DownloadStatusValue.COMPRESSING); - rabbitTemplate.convertAndSend(MessagingConfiguration.DOWNLOAD_COMPRESSION_EXCHANGE, + rabbitTemplate.convertAndSend(MessagingConfiguration.DOWNLOAD_COMPRESSION_REQUEST_EXCHANGE, TenantContext.getTenantId(), DownloadJob.builder().storageId(downloadId).userId(userId).build()); } diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadDLQMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadDLQMessageReceiver.java index 8616882d7..f76c9aeab 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadDLQMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadDLQMessageReceiver.java @@ -75,7 +75,7 @@ public class DownloadDLQMessageReceiver { @RabbitListener(queues = MessagingConfiguration.REPORT_RESPONSE_DLQ) public void handleReportResponseDlqMessage(ReportResultMessage reportResultMessage) { - log.warn("Handling report request in DLQ userId: {} storageId: {} - setting status to error!", reportResultMessage.getUserId(), reportResultMessage.getDownloadId()); + log.warn("Handling report response in DLQ userId: {} storageId: {} - setting status to error!", reportResultMessage.getUserId(), reportResultMessage.getDownloadId()); setDownloadFailed(reportResultMessage.getUserId(), reportResultMessage.getDownloadId()); } diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/job/AnalysisFlagCalculationSchedulerJob.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/job/AnalysisFlagCalculationSchedulerJob.java index 449dfdcc9..41d4df59d 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/job/AnalysisFlagCalculationSchedulerJob.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/job/AnalysisFlagCalculationSchedulerJob.java @@ -2,7 +2,6 @@ package com.iqser.red.service.persistence.management.v1.processor.service.job; import static com.iqser.red.service.persistence.management.v1.processor.utils.TenantUtils.isTenantReadyForPersistence; -import java.time.OffsetDateTime; import java.util.Set; import org.quartz.DisallowConcurrentExecution; @@ -51,7 +50,7 @@ import lombok.extern.slf4j.Slf4j; log.debug("Found {} files which require analysis flag calculation. Tenant: {}", fileIdentifiers.size(), tenant.getDisplayName()); - fileIdentifiers.forEach(fileIdentifier -> rabbitTemplate.convertAndSend(MessagingConfiguration.ANALYSIS_FLAG_CALCULATION_EXCHANGE, + fileIdentifiers.forEach(fileIdentifier -> rabbitTemplate.convertAndSend(MessagingConfiguration.ANALYSIS_FLAG_CALCULATION_REQUEST_EXCHANGE, TenantContext.getTenantId(), new AnalysisFlagCalculationMessage(fileIdentifier.dossierId(), fileIdentifier.fileId()))); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/ChunkingMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/ChunkingMessageReceiver.java index 4cd6f9323..6752e9db1 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/ChunkingMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/ChunkingMessageReceiver.java @@ -27,34 +27,36 @@ import lombok.extern.slf4j.Slf4j; @FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) public class ChunkingMessageReceiver { + public static final String CHUNKING_RESPONSE_LISTENER_ID = "chunking-response-listener"; + FileStatusService fileStatusService; ObservationRegistry observationRegistry; FileStatusProcessingUpdateService fileStatusProcessingUpdateService; ObjectMapper mapper; - @RabbitListener(queues = MessagingConfiguration.CHUNKING_SERVICE_RESPONSE_QUEUE) + @RabbitListener(id = CHUNKING_RESPONSE_LISTENER_ID) public void receive(Message message) { PyInfraRequest pyInfraRequest = parsePyInfraRequest(message); addFileIdToTrace(pyInfraRequest.getFileUuid()); - log.info("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.CHUNKING_SERVICE_RESPONSE_QUEUE, pyInfraRequest.getDossierUuid(), pyInfraRequest.getFileUuid()); + log.info("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.CHUNKING_RESPONSE_EXCHANGE, pyInfraRequest.getDossierUuid(), pyInfraRequest.getFileUuid()); fileStatusService.setStatusAnalyse(pyInfraRequest.getDossierUuid(), pyInfraRequest.getFileUuid(), false); } - @RabbitListener(queues = MessagingConfiguration.CHUNKING_SERVICE_DLQ) + @RabbitListener(queues = MessagingConfiguration.CHUNKING_DLQ) public void handleDLQ(Message message) { PyInfraRequest pyInfraRequest = parsePyInfraRequest(message); addFileIdToTrace(pyInfraRequest.getFileUuid()); - log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.CHUNKING_SERVICE_DLQ, pyInfraRequest.getDossierUuid(), pyInfraRequest.getFileUuid()); + log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.CHUNKING_DLQ, pyInfraRequest.getDossierUuid(), pyInfraRequest.getFileUuid()); fileStatusProcessingUpdateService.analysisFailed(pyInfraRequest.getDossierUuid(), pyInfraRequest.getFileUuid(), new FileErrorInfo("chunking service failed", - MessagingConfiguration.CHUNKING_SERVICE_DLQ, + MessagingConfiguration.CHUNKING_DLQ, "junker-chunker-service", OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS))); } diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/NerMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/NerMessageReceiver.java index ae5a48f75..4fb4ab86d 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/NerMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/NerMessageReceiver.java @@ -30,6 +30,7 @@ import lombok.extern.slf4j.Slf4j; public class NerMessageReceiver { public static final String ENTITY_RESPONSE_LISTENER_ID = "entity-response-listener"; + public static final String LLM_ENTITY_RESPONSE_LISTENER_ID = "llm-entity-response-listener"; private final FileStatusService fileStatusService; private final ObjectMapper objectMapper; @@ -54,14 +55,14 @@ public class NerMessageReceiver { @SneakyThrows - @RabbitListener(queues = MessagingConfiguration.LLM_NER_SERVICE_RESPONSE_QUEUE) + @RabbitListener(id = LLM_ENTITY_RESPONSE_LISTENER_ID) public void receiveLLM(LlmNerResponseMessage message) { String dossierId = QueueMessageIdentifierService.parseDossierId(message.getIdentifier()); String fileId = QueueMessageIdentifierService.parseFileId(message.getIdentifier()); addFileIdToTrace(fileId); - log.info("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.LLM_NER_SERVICE_RESPONSE_QUEUE, dossierId, fileId); + log.info("Received message from {} for dossierId {} and fileId {}", LLM_ENTITY_RESPONSE_LISTENER_ID, dossierId, fileId); fileStatusService.setStatusAnalyse(dossierId, fileId, false); } @@ -85,7 +86,7 @@ public class NerMessageReceiver { } - @RabbitListener(queues = MessagingConfiguration.LLM_NER_SERVICE_DLQ) + @RabbitListener(queues = MessagingConfiguration.LLM_NER_DLQ) public void handleLLMDLQMessage(Message failedMessage) throws IOException { LlmNerMessage entityResponse = objectMapper.readValue(failedMessage.getBody(), LlmNerMessage.class); @@ -95,11 +96,11 @@ public class NerMessageReceiver { addFileIdToTrace(fileId); - log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.LLM_NER_SERVICE_DLQ, dossierId, fileId); + log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.LLM_NER_DLQ, dossierId, fileId); fileStatusProcessingUpdateService.analysisFailed(dossierId, fileId, new FileErrorInfo("llm ner service failed", - MessagingConfiguration.LLM_NER_SERVICE_DLQ, + MessagingConfiguration.LLM_NER_DLQ, "entity-recognition-service-v3", OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS))); } diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/OCRProcessingMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/OCRProcessingMessageReceiver.java index ab369d5fb..eb4be9542 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/OCRProcessingMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/OCRProcessingMessageReceiver.java @@ -57,7 +57,7 @@ public class OCRProcessingMessageReceiver { response.getNumberOfOCRedPages()); } - log.debug("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_EXCHANGE); + log.debug("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_EXCHANGE); } diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/TenantExchangeMessageReceiverImpl.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/TenantExchangeMessageReceiverImpl.java index 4199caeab..8b21a7935 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/TenantExchangeMessageReceiverImpl.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/TenantExchangeMessageReceiverImpl.java @@ -8,10 +8,12 @@ import static com.iqser.red.service.persistence.management.v1.processor.service. import static com.iqser.red.service.persistence.management.v1.processor.service.download.RedactionResultMessageReceiver.PDFTRON_RESPONSE_LISTENER_ID; import static com.iqser.red.service.persistence.management.v1.processor.service.queue.AnalysisFlagsCalculationMessageReceiver.ANALYSIS_FLAG_CALCULATION_LISTENER_ID; import static com.iqser.red.service.persistence.management.v1.processor.service.queue.AzureNerMessageReceiver.AZURE_ENTITY_RESPONSE_LISTENER_ID; +import static com.iqser.red.service.persistence.management.v1.processor.service.queue.ChunkingMessageReceiver.CHUNKING_RESPONSE_LISTENER_ID; import static com.iqser.red.service.persistence.management.v1.processor.service.queue.CvAnalysisMessageReceiver.CV_ANALYSIS_RESPONSE_LISTENER_ID; import static com.iqser.red.service.persistence.management.v1.processor.service.queue.ImageMessageReceiver.IMAGE_RESPONSE_LISTENER_ID; import static com.iqser.red.service.persistence.management.v1.processor.service.queue.LayoutParsingFinishedMessageReceiver.LAYOUT_PARSING_RESPONSE_LISTENER_ID; import static com.iqser.red.service.persistence.management.v1.processor.service.queue.NerMessageReceiver.ENTITY_RESPONSE_LISTENER_ID; +import static com.iqser.red.service.persistence.management.v1.processor.service.queue.NerMessageReceiver.LLM_ENTITY_RESPONSE_LISTENER_ID; import static com.iqser.red.service.persistence.management.v1.processor.service.queue.OCRProcessingMessageReceiver.OCR_RESPONSE_LISTENER_ID; import static com.iqser.red.service.persistence.management.v1.processor.service.queue.OCRProcessingMessageReceiver.OCR_STATUS_UPDATE_LISTENER_ID; import static com.iqser.red.service.persistence.management.v1.processor.service.queue.RedactionAnalysisResponseReceiver.REDACTION_RESPONSE_LISTENER_ID; @@ -48,26 +50,26 @@ public class TenantExchangeMessageReceiverImpl extends TenantExchangeMessageRece return Set.of(TenantQueueConfiguration.builder() .listenerId(DOWNLOAD_LISTENER_ID) - .exchangeName(DOWNLOAD_EXCHANGE) - .queuePrefix(DOWNLOAD_QUEUE_PREFIX) + .exchangeName(DOWNLOAD_REQUEST_EXCHANGE) + .queuePrefix(DOWNLOAD_REQUEST_QUEUE_PREFIX) .dlqName(DOWNLOAD_DLQ) .build(), TenantQueueConfiguration.builder() .listenerId(DOWNLOAD_COMPRESSION_LISTENER_ID) - .exchangeName(DOWNLOAD_COMPRESSION_EXCHANGE) - .queuePrefix(DOWNLOAD_COMPRESSION_QUEUE_PREFIX) + .exchangeName(DOWNLOAD_COMPRESSION_REQUEST_EXCHANGE) + .queuePrefix(DOWNLOAD_COMPRESSION_REQUEST_QUEUE_PREFIX) .dlqName(DOWNLOAD_COMPRESSION_DLQ) .build(), TenantQueueConfiguration.builder() .listenerId(EXPORT_DOWNLOAD_LISTENER_ID) - .exchangeName(EXPORT_DOWNLOAD_EXCHANGE) - .queuePrefix(EXPORT_DOWNLOAD_QUEUE_PREFIX) + .exchangeName(EXPORT_DOWNLOAD_REQUEST_EXCHANGE) + .queuePrefix(EXPORT_DOWNLOAD_REQUEST_QUEUE_PREFIX) .dlqName(EXPORT_DOWNLOAD_DLQ) .build(), TenantQueueConfiguration.builder() .listenerId(ANALYSIS_FLAG_CALCULATION_LISTENER_ID) - .exchangeName(ANALYSIS_FLAG_CALCULATION_EXCHANGE) - .queuePrefix(ANALYSIS_FLAG_CALCULATION_QUEUE_PREFIX) + .exchangeName(ANALYSIS_FLAG_CALCULATION_REQUEST_EXCHANGE) + .queuePrefix(ANALYSIS_FLAG_CALCULATION_REQUEST_QUEUE_PREFIX) .build(), TenantQueueConfiguration.builder() .listenerId(REDACTION_RESPONSE_LISTENER_ID) @@ -96,8 +98,8 @@ public class TenantExchangeMessageReceiverImpl extends TenantExchangeMessageRece .build(), TenantQueueConfiguration.builder() .listenerId(OCR_STATUS_UPDATE_LISTENER_ID) - .exchangeName(OCR_STATUS_UPDATE_EXCHANGE) - .queuePrefix(OCR_STATUS_UPDATE_QUEUE_PREFIX) + .exchangeName(OCR_STATUS_UPDATE_RESPONSE_EXCHANGE) + .queuePrefix(OCR_STATUS_UPDATE_RESPONSE_QUEUE_PREFIX) .dlqName(OCR_STATUS_UPDATE_DLQ) .build(), TenantQueueConfiguration.builder() @@ -118,6 +120,18 @@ public class TenantExchangeMessageReceiverImpl extends TenantExchangeMessageRece .queuePrefix(AZURE_ENTITY_RESPONSE_QUEUE_PREFIX) .dlqName(AZURE_ENTITY_DLQ) .build(), + TenantQueueConfiguration.builder() + .listenerId(CHUNKING_RESPONSE_LISTENER_ID) + .exchangeName(CHUNKING_RESPONSE_EXCHANGE) + .queuePrefix(CHUNKING_RESPONSE_QUEUE_PREFIX) + .dlqName(CHUNKING_DLQ) + .build(), + TenantQueueConfiguration.builder() + .listenerId(LLM_ENTITY_RESPONSE_LISTENER_ID) + .exchangeName(LLM_NER_RESPONSE_EXCHANGE) + .queuePrefix(LLM_NER_RESPONSE_QUEUE_PREFIX) + .dlqName(LLM_NER_DLQ) + .build(), TenantQueueConfiguration.builder() .listenerId(CV_ANALYSIS_RESPONSE_LISTENER_ID) .exchangeName(CV_ANALYSIS_RESPONSE_EXCHANGE) diff --git a/persistence-service-v1/persistence-service-shared-mongo-v1/build.gradle.kts b/persistence-service-v1/persistence-service-shared-mongo-v1/build.gradle.kts index 747c42390..b0ac8ac58 100644 --- a/persistence-service-v1/persistence-service-shared-mongo-v1/build.gradle.kts +++ b/persistence-service-v1/persistence-service-shared-mongo-v1/build.gradle.kts @@ -9,7 +9,9 @@ dependencies { api(project(":persistence-service-shared-api-v1")) api("com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.17.2") api("com.google.guava:guava:31.1-jre") - api("com.knecon.fforesight:mongo-database-commons:0.11.0") + api("com.knecon.fforesight:mongo-database-commons:0.12.0") { + exclude(group = "com.knecon.fforesight", module = "tenant-commons") + } api("org.springframework.boot:spring-boot-starter-data-mongodb:${springBootStarterVersion}") api("org.springframework.boot:spring-boot-starter-validation:3.1.3") testImplementation("com.iqser.red.commons:test-commons:2.1.0")