From 22bf27dd735887dfd558ec09156edfbcbf353752 Mon Sep 17 00:00:00 2001 From: Maverick Studer Date: Tue, 27 Aug 2024 11:29:14 +0200 Subject: [PATCH] RED-9331: Explore possibilities for fair upload / analysis processing per tenant --- ...tusProcessingUpdateInternalController.java | 5 +- .../build.gradle.kts | 4 +- .../configuration/MessagingConfiguration.java | 553 +++++++++--------- .../TenantMessagingConfigurationImpl.java | 10 + .../UserMessagingConfiguration.java | 16 +- .../ExportDownloadMessageReceiver.java | 3 +- .../service/DossierTemplateExportService.java | 3 +- .../service/FileExchangeExportService.java | 3 +- .../migration/SaasMigrationService.java | 14 +- .../migrations/QueueRenameMigration23.java | 101 ++++ .../v1/processor/service/DownloadService.java | 3 +- .../processor/service/FileStatusService.java | 61 +- .../v1/processor/service/IndexingService.java | 15 +- .../download/DownloadCompressingService.java | 5 +- .../DownloadCompressionMessageReceiver.java | 6 +- .../download/DownloadDLQMessageReceiver.java | 4 +- .../download/DownloadMessageReceiver.java | 5 +- .../download/DownloadPreparationService.java | 2 +- .../download/DownloadProcessorService.java | 3 +- .../DownloadReportMessageReceiver.java | 4 +- .../download/RedactionDlqMessageReceiver.java | 6 +- .../RedactionResultMessageReceiver.java | 4 +- .../AnalysisFlagCalculationSchedulerJob.java | 3 +- .../service/job/AutomaticAnalysisJob.java | 7 +- .../ManualRedactionService.java | 3 +- ...alysisFlagsCalculationMessageReceiver.java | 5 +- .../queue/AzureNerMessageReceiver.java | 14 +- .../queue/CvAnalysisMessageReceiver.java | 13 +- .../service/queue/ImageMessageReceiver.java | 13 +- .../LayoutParsingFinishedMessageReceiver.java | 11 +- .../service/queue/NerMessageReceiver.java | 14 +- .../queue/OCRProcessingMessageReceiver.java | 17 +- .../RedactionAnalysisResponseReceiver.java | 5 +- ...onServiceSaasMigrationMessageReceiver.java | 4 +- .../TenantExchangeMessageReceiverImpl.java | 166 ++++++ .../VisualLayoutParsingMessageReceiver.java | 15 +- .../AbstractPersistenceServerServiceTest.java | 11 +- .../build.gradle.kts | 2 +- 38 files changed, 733 insertions(+), 400 deletions(-) create mode 100644 persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/TenantMessagingConfigurationImpl.java create mode 100644 persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/migration/migrations/QueueRenameMigration23.java create mode 100644 persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/TenantExchangeMessageReceiverImpl.java diff --git a/persistence-service-v1/persistence-service-internal-api-impl-v1/src/main/java/com/iqser/red/service/persistence/v1/internal/api/controller/FileStatusProcessingUpdateInternalController.java b/persistence-service-v1/persistence-service-internal-api-impl-v1/src/main/java/com/iqser/red/service/persistence/v1/internal/api/controller/FileStatusProcessingUpdateInternalController.java index 7d530b5d5..9d6858e3f 100644 --- a/persistence-service-v1/persistence-service-internal-api-impl-v1/src/main/java/com/iqser/red/service/persistence/v1/internal/api/controller/FileStatusProcessingUpdateInternalController.java +++ b/persistence-service-v1/persistence-service-internal-api-impl-v1/src/main/java/com/iqser/red/service/persistence/v1/internal/api/controller/FileStatusProcessingUpdateInternalController.java @@ -5,7 +5,6 @@ import java.time.temporal.ChronoUnit; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration; @@ -39,7 +38,7 @@ public class FileStatusProcessingUpdateInternalController implements FileStatusP fileStatusProcessingUpdateService.preprocessingFailed(dossierId, fileId, new FileErrorInfo("preprocessing failed", - MessagingConfiguration.PRE_PROCESSING_DLQ, + MessagingConfiguration.PREPROCESSING_DLQ, "pdftron-service", OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS))); } @@ -125,7 +124,7 @@ public class FileStatusProcessingUpdateInternalController implements FileStatusP fileStatusProcessingUpdateService.indexingFailed(dossierId, fileId, new FileErrorInfo("indexing failed", - MessagingConfiguration.INDEXING_DQL, + MessagingConfiguration.INDEXING_DLQ, "search-service", OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS))); } diff --git a/persistence-service-v1/persistence-service-processor-v1/build.gradle.kts b/persistence-service-v1/persistence-service-processor-v1/build.gradle.kts index 00ffac61d..1917cfdc6 100644 --- a/persistence-service-v1/persistence-service-processor-v1/build.gradle.kts +++ b/persistence-service-v1/persistence-service-processor-v1/build.gradle.kts @@ -30,7 +30,7 @@ dependencies { exclude(group = "com.iqser.red.service", module = "persistence-service-internal-api-v1") exclude(group = "com.iqser.red.service", module = "persistence-service-shared-api-v1") } - api("com.knecon.fforesight:layoutparser-service-internal-api:0.149.0") { + api("com.knecon.fforesight:layoutparser-service-internal-api:0.161.0") { exclude(group = "com.iqser.red.service", module = "persistence-service-internal-api-v1") exclude(group = "com.iqser.red.service", module = "persistence-service-shared-api-v1") } @@ -45,7 +45,7 @@ dependencies { implementation("com.knecon.fforesight:llm-service-api:1.10.0") api("com.knecon.fforesight:jobs-commons:0.10.0") api("com.knecon.fforesight:database-tenant-commons:0.24.0") - api("com.knecon.fforesight:keycloak-commons:0.29.0") + api("com.knecon.fforesight:keycloak-commons:0.30.0") api("com.knecon.fforesight:tracing-commons:0.5.0") api("com.knecon.fforesight:swagger-commons:0.7.0") api("com.giffing.bucket4j.spring.boot.starter:bucket4j-spring-boot-starter:0.4.0") diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/MessagingConfiguration.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/MessagingConfiguration.java index 249349aa1..146ac188d 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/MessagingConfiguration.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/MessagingConfiguration.java @@ -1,13 +1,14 @@ package com.iqser.red.service.persistence.management.v1.processor.configuration; -import static com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames.LAYOUT_PARSING_FINISHED_EVENT_QUEUE; -import static com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_QUEUE; +import static com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames.LAYOUT_PARSING_DLQ; +import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames; import com.knecon.fforesight.llm.service.QueueNames; import lombok.RequiredArgsConstructor; @@ -16,45 +17,46 @@ import lombok.RequiredArgsConstructor; @RequiredArgsConstructor public class MessagingConfiguration { - public static final String REDACTION_QUEUE = "redactionQueue"; - public static final String REDACTION_PRIORITY_QUEUE = "redactionPriorityQueue"; - public static final String REDACTION_ANALYSIS_RESPONSE_QUEUE = "redactionAnalysisResponseQueue"; - public static final String REDACTION_DQL = "redactionDQL"; + // persistence-service + public static final String DOWNLOAD_QUEUE_PREFIX = "download_queue"; + public static final String DOWNLOAD_EXCHANGE = "download_exchange"; + public static final String DOWNLOAD_DLQ = "download_dlq"; - public static final String DOWNLOAD_QUEUE = "downloadQueue"; - public static final String DOWNLOAD_DLQ = "downloadDLQ"; - - public static final String DOWNLOAD_COMPRESSION_QUEUE = "download_compression_queue"; + public static final String DOWNLOAD_COMPRESSION_QUEUE_PREFIX = "download_compression_queue"; + public static final String DOWNLOAD_COMPRESSION_EXCHANGE = "download_compression_exchange"; public static final String DOWNLOAD_COMPRESSION_DLQ = "download_compression_dlq"; - public static final String EXPORT_DOWNLOAD_QUEUE = "exportDownloadQueue"; - public static final String EXPORT_DOWNLOAD_DLQ = "exportDownloadDLQ"; + public static final String EXPORT_DOWNLOAD_QUEUE_PREFIX = "export_download_queue"; + public static final String EXPORT_DOWNLOAD_EXCHANGE = "export_download_exchange"; + public static final String EXPORT_DOWNLOAD_DLQ = "export_download_dlq"; - public static final String REPORT_QUEUE = "reportQueue"; - public static final String REPORT_DLQ = "reportDLQ"; + public static final String ANALYSIS_FLAG_CALCULATION_QUEUE_PREFIX = "analysis_flag_calculation_queue"; + public static final String ANALYSIS_FLAG_CALCULATION_EXCHANGE = "analysis_flag_calculation_exchange"; - public static final String REPORT_RESULT_QUEUE = "reportResultQueue"; - public static final String REPORT_RESULT_DLQ = "reportResultDLQ"; + public static final String REDACTION_RESPONSE_QUEUE_PREFIX = "redaction_response_queue"; + public static final String REDACTION_RESPONSE_EXCHANGE = "redaction_response_exchange"; - public static final String OCR_REQUEST_QUEUE = "ocr_request_queue"; - public static final String OCR_RESPONSE_QUEUE = "ocr_response_queue"; - public static final String OCR_DLQ = "ocr_dead_letter_queue"; - public static final String OCR_STATUS_UPDATE_RESPONSE_QUEUE = "ocr_status_update_response_queue"; - public static final String OCR_STATUS_UPDATE_RESPONSE_DQL = "ocr_status_update_dead_letter_queue"; + public static final String PDFTRON_RESPONSE_QUEUE_PREFIX = "pdftron_response_queue"; + public static final String PDFTRON_RESPONSE_EXCHANGE = "pdftron_response_exchange"; - public static final String INDEXING_QUEUE = "indexingQueue"; - public static final String INDEXING_DQL = "indexingDQL"; + public static final String REPORT_RESPONSE_QUEUE_PREFIX = "report_response_queue"; + public static final String REPORT_RESPONSE_EXCHANGE = "report_response_exchange"; + public static final String REPORT_RESPONSE_DLQ = "report_response_dlq"; - public static final String DELETE_FROM_INDEX_QUEUE = "deleteFromIndexQueue"; - public static final String DELETE_FROM_INDEX_DLQ = "deleteFromIndexDLQ"; + public static final String OCR_RESPONSE_QUEUE_PREFIX = "ocr_response_queue"; + public static final String OCR_RESPONSE_EXCHANGE = "ocr_response_exchange"; + public static final String OCR_STATUS_UPDATE_QUEUE_PREFIX = "ocr_status_update_queue"; + public static final String OCR_STATUS_UPDATE_EXCHANGE = "ocr_status_update_exchange"; + public static final String OCR_STATUS_UPDATE_DLQ = "ocr_status_update_dlq"; - public static final String IMAGE_SERVICE_QUEUE = "image_request_queue"; - public static final String IMAGE_SERVICE_RESPONSE_QUEUE = "image_response_queue"; - public static final String IMAGE_SERVICE_DLQ = "image_dead_letter_queue"; + public static final String IMAGE_RESPONSE_QUEUE_PREFIX = "image_response_queue"; + public static final String IMAGE_RESPONSE_EXCHANGE = "image_response_exchange"; - public static final String NER_SERVICE_QUEUE = "entity_request_queue"; - public static final String NER_SERVICE_RESPONSE_QUEUE = "entity_response_queue"; - public static final String NER_SERVICE_DLQ = "entity_dead_letter_queue"; + public static final String ENTITY_RESPONSE_QUEUE_PREFIX = "entity_response_queue"; + public static final String ENTITY_RESPONSE_EXCHANGE = "entity_response_exchange"; + + public static final String AZURE_ENTITY_RESPONSE_QUEUE_PREFIX = "azure_entity_response_queue"; + public static final String AZURE_ENTITY_RESPONSE_EXCHANGE = "azure_entity_response_exchange"; public static final String CHUNKING_SERVICE_QUEUE = "chunking_service_request_queue"; public static final String CHUNKING_SERVICE_RESPONSE_QUEUE = "chunking_service_response_queue"; @@ -64,104 +66,130 @@ public class MessagingConfiguration { public static final String LLM_NER_SERVICE_RESPONSE_QUEUE = QueueNames.LLM_NER_SERVICE_RESPONSE_QUEUE; public static final String LLM_NER_SERVICE_DLQ = QueueNames.LLM_NER_SERVICE_DLQ; - public static final String AZURE_NER_SERVICE_QUEUE = "azure_entity_request_queue"; - public static final String AZURE_NER_SERVICE_RESPONSE_QUEUE = "azure_entity_response_queue"; - public static final String AZURE_NER_SERVICE_DLQ = "azure_entity_dead_letter_queue"; + public static final String CV_ANALYSIS_RESPONSE_QUEUE_PREFIX = "cv_analysis_response_queue"; + public static final String CV_ANALYSIS_RESPONSE_EXCHANGE = "cv_analysis_response_exchange"; - public static final String PRE_PROCESSING_QUEUE = "preprocessingQueue"; - public static final String PRE_PROCESSING_DLQ = "preprocessingDLQ"; + public static final String VISUAL_LAYOUT_PARSING_RESPONSE_QUEUE_PREFIX = "visual_layout_parsing_response_queue"; + public static final String VISUAL_LAYOUT_PARSING_RESPONSE_EXCHANGE = "visual_layout_parsing_response_exchange"; - public static final String PDFTRON_QUEUE = "pdftron_queue"; + // pdftron-redaction-service + public static final String PREPROCESSING_EXCHANGE = "preprocessing_exchange"; + public static final String PREPROCESSING_DLQ = "preprocessing_dlq"; + + public static final String PDFTRON_REQUEST_EXCHANGE = "pdftron_request_exchange"; public static final String PDFTRON_DLQ = "pdftron_dlq"; - public static final String PDFTRON_RESULT_QUEUE = "pdftron_result_queue"; - public static final String CV_ANALYSIS_QUEUE = "cv_analysis_request_queue"; - public static final String CV_ANALYSIS_RESPONSE_QUEUE = "cv_analysis_response_queue"; - public static final String CV_ANALYSIS_DLQ = "cv_analysis_dead_letter_queue"; + // redaction-service + public static final String REDACTION_REQUEST_QUEUE_PREFIX = "redaction_request_queue"; + public static final String REDACTION_REQUEST_EXCHANGE = "redaction_request_exchange"; + public static final String REDACTION_PRIORITY_REQUEST_EXCHANGE = "redaction_priority_request_exchange"; + public static final String REDACTION_DLQ = "redaction_dlq"; - public static final String VISUAL_LAYOUT_PARSING_QUEUE = "visual_layout_parsing_service_queue"; - public static final String VISUAL_LAYOUT_RESPONSE_QUEUE = "visual_layout_parsing_service_response_queue"; - public static final String VISUAL_LAYOUT_DLQ = "visual_layout_parsing_service_dead_letter_queue"; - public static final String ANALYSIS_FLAG_CALCULATION_QUEUE = "analysis_flag_calculation_queue"; + // redaction-report-service + public static final String REPORT_REQUEST_EXCHANGE = "report_request_exchange"; + public static final String REPORT_REQUEST_DLQ = "report_request_dlq"; + + // search-service + public static final String INDEXING_EXCHANGE = "indexing_exchange"; + public static final String INDEXING_DLQ = "indexing_dlq"; + + public static final String DELETE_FROM_INDEX_EXCHANGE = "delete_from_index_exchange"; + public static final String DELETE_FROM_INDEX_DLQ = "delete_from_index_dlq"; + + // ocr-services + public static final String OCR_REQUEST_EXCHANGE = "ocr_request_exchange"; + public static final String OCR_DLQ = "ocr_dlq"; + + // image-service + public static final String IMAGE_REQUEST_EXCHANGE = "image_request_exchange"; + public static final String IMAGE_DLQ = "image_dlq"; + + // entity-recognition-service + public static final String ENTITY_REQUEST_EXCHANGE = "entity_request_exchange"; + public static final String ENTITY_DLQ = "entity_dlq"; + + // azure-ner-service + public static final String AZURE_ENTITY_REQUEST_EXCHANGE = "azure_entity_request_exchange"; + public static final String AZURE_ENTITY_DLQ = "azure_entity_dlq"; + + // cv-analysis-service + public static final String CV_ANALYSIS_REQUEST_EXCHANGE = "cv_analysis_request_exchange"; + public static final String CV_ANALYSIS_DLQ = "cv_analysis_dlq"; + + // visual-layout-parsing-service + public static final String VISUAL_LAYOUT_PARSING_REQUEST_EXCHANGE = "visual_layout_parsing_request_exchange"; + public static final String VISUAL_LAYOUT_PARSING_DLQ = "visual_layout_parsing_dlq"; public static final String X_ERROR_INFO_HEADER = "x-error-message"; public static final String X_ERROR_INFO_TIMESTAMP_HEADER = "x-error-message-timestamp"; - public static final String LAYOUT_PARSING_DLQ = "layout_parsing_dead_letter_queue"; - - // --- Saas Migration, can be removed later ---- - - public static final String MIGRATION_QUEUE = "migrationQueue"; - public static final String MIGRATION_DLQ = "migrationDLQ"; + // ---- Saas Migration, can be removed later ---- + public static final String MIGRATION_REQUEST_QUEUE = "migrationQueue"; public static final String MIGRATION_RESPONSE_QUEUE = "migrationResponseQueue"; + public static final String MIGRATION_DLQ = "migrationDLQ"; + // ---- persistence-service ---- @Bean - public Queue migrationQueue() { + public DirectExchange downloadExchange() { - return QueueBuilder.durable(MIGRATION_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", MIGRATION_DLQ).maxPriority(2).build(); + return new DirectExchange(DOWNLOAD_EXCHANGE); } @Bean - public Queue migrationDLQ() { + public Queue downloadDeadLetterQueue() { - return QueueBuilder.durable(MIGRATION_DLQ).build(); + return QueueBuilder.durable(DOWNLOAD_DLQ).build(); } @Bean - public Queue migrationResponseQueue() { + public DirectExchange downloadCompressionExchange() { - return QueueBuilder.durable(MIGRATION_RESPONSE_QUEUE) - .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", MIGRATION_DLQ) - .maxPriority(2) - .build(); - } - - // --- End Saas Migration - - - @Bean - public Queue nerRequestQueue() { - - return QueueBuilder.durable(NER_SERVICE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", NER_SERVICE_DLQ).build(); + return new DirectExchange(DOWNLOAD_COMPRESSION_EXCHANGE); } @Bean - public Queue nerResponseQueue() { + public Queue downloadCompressionQueueDLQ() { - return QueueBuilder.durable(NER_SERVICE_RESPONSE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", NER_SERVICE_DLQ).build(); + return QueueBuilder.durable(DOWNLOAD_COMPRESSION_DLQ).build(); } @Bean - public Queue nerResponseDLQ() { + public DirectExchange exportDownloadExchange() { - return QueueBuilder.durable(NER_SERVICE_DLQ).build(); + return new DirectExchange(EXPORT_DOWNLOAD_EXCHANGE); } @Bean - public Queue azureNerRequestQueue() { + public Queue exportDownloadDeadLetterQueue() { - return QueueBuilder.durable(AZURE_NER_SERVICE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", AZURE_NER_SERVICE_DLQ).build(); + return QueueBuilder.durable(EXPORT_DOWNLOAD_DLQ).build(); } @Bean - public Queue azureNerResponseQueue() { + public DirectExchange analysisFlagCalculationExchange() { - return QueueBuilder.durable(AZURE_NER_SERVICE_RESPONSE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", AZURE_NER_SERVICE_DLQ).build(); + return new DirectExchange(ANALYSIS_FLAG_CALCULATION_EXCHANGE); } @Bean - public Queue azureNerResponseDLQ() { + public DirectExchange redactionResponseExchange() { - return QueueBuilder.durable(AZURE_NER_SERVICE_DLQ).build(); + return new DirectExchange(REDACTION_RESPONSE_EXCHANGE); + } + + + @Bean + public DirectExchange reportResponseExchange() { + + return new DirectExchange(REPORT_RESPONSE_EXCHANGE); } @@ -214,130 +242,160 @@ public class MessagingConfiguration { @Bean - public Queue imageRequestQueue() { + public Queue reportResponseDLQ() { - return QueueBuilder.durable(IMAGE_SERVICE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", IMAGE_SERVICE_DLQ).build(); + return QueueBuilder.durable(REPORT_RESPONSE_DLQ).build(); } @Bean - public Queue imageResponseQueue() { + public DirectExchange ocrResponseExchange() { - return QueueBuilder.durable(IMAGE_SERVICE_RESPONSE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", IMAGE_SERVICE_DLQ).build(); + return new DirectExchange(OCR_RESPONSE_EXCHANGE); } @Bean - public Queue imageResponseDLQ() { + public DirectExchange imageResponseExchange() { - return QueueBuilder.durable(IMAGE_SERVICE_DLQ).build(); + return new DirectExchange(IMAGE_RESPONSE_EXCHANGE); } @Bean - public Queue cvAnalysisRequestQueue() { + public DirectExchange entityResponseExchange() { - return QueueBuilder.durable(CV_ANALYSIS_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", CV_ANALYSIS_DLQ).build(); + return new DirectExchange(ENTITY_RESPONSE_EXCHANGE); + } + + @Bean + public DirectExchange azureEntityResponseExchange() { + + return new DirectExchange(AZURE_ENTITY_RESPONSE_EXCHANGE); } @Bean - public Queue cvAnalysisResponseQueue() { + public DirectExchange cvAnalysisResponseExchange() { - return QueueBuilder.durable(CV_ANALYSIS_RESPONSE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", CV_ANALYSIS_DLQ).build(); + return new DirectExchange(CV_ANALYSIS_RESPONSE_EXCHANGE); } @Bean - public Queue cvAnalysisResponseDLQ() { + public DirectExchange visualLayoutParsingResponseExchange() { - return QueueBuilder.durable(CV_ANALYSIS_DLQ).build(); + return new DirectExchange(VISUAL_LAYOUT_PARSING_RESPONSE_EXCHANGE); } @Bean - public Queue visualLayoutParsingRequestQueue() { + public DirectExchange layoutParsingResponseExchange() { - return QueueBuilder.durable(VISUAL_LAYOUT_PARSING_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", VISUAL_LAYOUT_DLQ).build(); + return new DirectExchange(LayoutParsingQueueNames.LAYOUT_PARSING_RESPONSE_EXCHANGE); + } + + + // ---- pdftron-redaction-service ---- + @Bean + public DirectExchange preprocessingExchange() { + + return new DirectExchange(PREPROCESSING_EXCHANGE); } @Bean - public Queue visualLayoutParsingResponseQueue() { + public Queue preprocessingDeadLetterQueue() { - return QueueBuilder.durable(VISUAL_LAYOUT_RESPONSE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", VISUAL_LAYOUT_DLQ).build(); + return QueueBuilder.durable(PREPROCESSING_DLQ).build(); } @Bean - public Queue visualLayoutParsingDLQ() { + public DirectExchange pdftronResponseExchange() { - return QueueBuilder.durable(VISUAL_LAYOUT_DLQ).build(); + return new DirectExchange(PDFTRON_RESPONSE_EXCHANGE); } @Bean - public Queue ocrStatusUpdateResponseQueue() { + public Queue pdfTronDLQ() { - return QueueBuilder.durable(OCR_STATUS_UPDATE_RESPONSE_QUEUE) - .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", OCR_STATUS_UPDATE_RESPONSE_DQL) - .build(); + return QueueBuilder.durable(PDFTRON_DLQ).build(); + } + + + // ---- redaction-service ---- + @Bean + public DirectExchange redactionRequestExchange() { + + return new DirectExchange(REDACTION_REQUEST_EXCHANGE); } @Bean - public Queue ocrStatusUpdateResponseDQL() { + public DirectExchange redactionPriorityRequestExchange() { - return QueueBuilder.durable(OCR_STATUS_UPDATE_RESPONSE_DQL).build(); + return new DirectExchange(REDACTION_PRIORITY_REQUEST_EXCHANGE); } @Bean - public Queue redactionQueue() { + public Queue redactionDLQ() { - return QueueBuilder.durable(REDACTION_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", REDACTION_DQL).maxPriority(2).build(); + return QueueBuilder.durable(REDACTION_DLQ).build(); + } + + + // ---- redaction-report-service ---- + @Bean + public DirectExchange reportRequestExchange() { + + return new DirectExchange(REPORT_REQUEST_EXCHANGE); } @Bean - public Queue redactionPriorityQueue() { + public Queue reportRequestDLQ() { - return QueueBuilder.durable(REDACTION_PRIORITY_QUEUE) - .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", REDACTION_DQL) - .maxPriority(2) - .build(); + return QueueBuilder.durable(REPORT_REQUEST_DLQ).build(); + } + + + // ---- search-service ---- + @Bean + public DirectExchange indexingExchange() { + + return new DirectExchange(INDEXING_EXCHANGE); } @Bean - public Queue redactionAnalysisResponseQueue() { + public Queue indexingDLQ() { - return QueueBuilder.durable(REDACTION_ANALYSIS_RESPONSE_QUEUE) - .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", REDACTION_DQL) - .maxPriority(2) - .build(); + return QueueBuilder.durable(INDEXING_DLQ).build(); } @Bean - public Queue ocrRequestQueue() { + public DirectExchange deleteFromIndexExchange() { - return QueueBuilder.durable(OCR_REQUEST_QUEUE) - .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", OCR_DLQ) - .withArgument("x-max-priority", 2) // Higher value is higher priority. - .maxPriority(2) - .build(); + return new DirectExchange(DELETE_FROM_INDEX_EXCHANGE); } @Bean - public Queue ocrResponseQueue() { + public Queue deleteFromIndexDLQ() { - return QueueBuilder.durable(OCR_RESPONSE_QUEUE).build(); + return QueueBuilder.durable(DELETE_FROM_INDEX_DLQ).build(); + } + + + // ---- ocr-services ---- + @Bean + public DirectExchange ocrRequestExchange() { + + return new DirectExchange(OCR_REQUEST_EXCHANGE); } @@ -349,191 +407,134 @@ public class MessagingConfiguration { @Bean - public Queue downloadQueue() { + public DirectExchange ocrStatusUpdateExchange() { - return QueueBuilder.durable(DOWNLOAD_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", DOWNLOAD_DLQ).build(); + return new DirectExchange(OCR_STATUS_UPDATE_EXCHANGE); } @Bean - public Queue downloadDeadLetterQueue() { + public Queue ocrStatusUpdateDLQ() { - return QueueBuilder.durable(DOWNLOAD_DLQ).build(); + return QueueBuilder.durable(OCR_STATUS_UPDATE_DLQ).build(); + } + + + // ---- image-service ---- + @Bean + public DirectExchange imageRequestExchange() { + + return new DirectExchange(IMAGE_REQUEST_EXCHANGE); } @Bean - public Queue downloadCompressionQueue() { + public Queue imageDLQ() { - return QueueBuilder.durable(DOWNLOAD_COMPRESSION_QUEUE) - .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", DOWNLOAD_COMPRESSION_DLQ) - .build(); + return QueueBuilder.durable(IMAGE_DLQ).build(); + } + + + // ---- entity-recognition-service ---- + @Bean + public DirectExchange nerRequestExchange() { + + return new DirectExchange(ENTITY_REQUEST_EXCHANGE); } @Bean - public Queue downloadCompressionQueueDLQ() { + public Queue nerDLQ() { - return QueueBuilder.durable(DOWNLOAD_COMPRESSION_DLQ).build(); + return QueueBuilder.durable(ENTITY_DLQ).build(); + } + // ---- azure-ner-service ---- + @Bean + public DirectExchange azureNerRequestExchange() { + + return new DirectExchange(AZURE_ENTITY_REQUEST_EXCHANGE); } @Bean - public Queue exportDownloadQueue() { + public Queue azureNerDLQ() { - return QueueBuilder.durable(EXPORT_DOWNLOAD_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", EXPORT_DOWNLOAD_DLQ).build(); + return QueueBuilder.durable(AZURE_ENTITY_DLQ).build(); + } + + + // ---- cv-analysis-service ---- + @Bean + public DirectExchange cvAnalysisRequestExchange() { + + return new DirectExchange(CV_ANALYSIS_REQUEST_EXCHANGE); } @Bean - public Queue exportDownloadDeadLetterQueue() { + public Queue cvAnalysisDLQ() { - return QueueBuilder.durable(EXPORT_DOWNLOAD_DLQ).build(); + return QueueBuilder.durable(CV_ANALYSIS_DLQ).build(); + } + + + // ---- visual-layout-parsing-service ---- + @Bean + public DirectExchange visualLayoutParsingRequestExchange() { + + return new DirectExchange(VISUAL_LAYOUT_PARSING_REQUEST_EXCHANGE); } @Bean - public Queue reportQueue() { + public Queue visualLayoutParsingDLQ() { - return QueueBuilder.durable(REPORT_QUEUE) - .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", REPORT_DLQ) - .withArgument("x-max-priority", 2) - .maxPriority(2) - .build(); + return QueueBuilder.durable(VISUAL_LAYOUT_PARSING_DLQ).build(); + } + + + // ---- layoutparser-service ---- + @Bean + public DirectExchange layoutParsingRequestExchange() { + + return new DirectExchange(LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_EXCHANGE); } @Bean - public Queue reportDeadLetterQueue() { - - return QueueBuilder.durable(REPORT_DLQ).build(); - } - - - @Bean - public Queue reportResultQueue() { - - return QueueBuilder.durable(REPORT_RESULT_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", REPORT_RESULT_DLQ).build(); - } - - - @Bean - public Queue reportResultDeadLetterQueue() { - - return QueueBuilder.durable(REPORT_RESULT_DLQ).build(); - } - - - @Bean - public Queue analysisFlagCalculationQueue() { - - return QueueBuilder.durable(ANALYSIS_FLAG_CALCULATION_QUEUE).build(); - } - - - @Bean - public Queue indexingQueue() { - - return QueueBuilder.durable(INDEXING_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", INDEXING_DQL).maxPriority(2).build(); - } - - - @Bean - public Queue indexingDeadLetterQueue() { - - return QueueBuilder.durable(INDEXING_DQL).build(); - } - - - @Bean - public Queue deleteFromIndexQueue() { - - return QueueBuilder.durable(DELETE_FROM_INDEX_QUEUE) - .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", DELETE_FROM_INDEX_DLQ) - .maxPriority(2) - .build(); - } - - - @Bean - public Queue deleteFromIndexDLQ() { - - return QueueBuilder.durable(DELETE_FROM_INDEX_DLQ).build(); - } - - - @Bean - public Queue preprocessingQueue() { - - return QueueBuilder.durable(PRE_PROCESSING_QUEUE) - .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", PRE_PROCESSING_DLQ) - .withArgument("x-max-priority", 2) - .maxPriority(2) - .build(); - } - - - @Bean - public Queue preprocessingDeadLetterQueue() { - - return QueueBuilder.durable(PRE_PROCESSING_DLQ).build(); - } - - - @Bean - public Queue pdfTronQueue() { - - return QueueBuilder.durable(PDFTRON_QUEUE) - .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", PDFTRON_DLQ) - .withArgument("x-max-priority", 2) - .maxPriority(2) - .build(); - } - - - @Bean - public Queue pdfTronDlq() { - - return QueueBuilder.durable(PDFTRON_DLQ).build(); - } - - - @Bean - public Queue pdfTronResultQueue() { - - return QueueBuilder.durable(PDFTRON_RESULT_QUEUE) - .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", PDFTRON_DLQ) - .withArgument("x-max-priority", 2) - .maxPriority(2) - .build(); - } - - - @Bean - public Queue layoutparsingRequestQueue() { - - return QueueBuilder.durable(LAYOUT_PARSING_REQUEST_QUEUE)// - .withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", LAYOUT_PARSING_DLQ).build(); - } - - - @Bean - public Queue layoutparsingResponseQueue() { - - return QueueBuilder.durable(LAYOUT_PARSING_FINISHED_EVENT_QUEUE).build(); - } - - - @Bean - public Queue layoutparsingDLQ() { + public Queue layoutParsingDLQ() { return QueueBuilder.durable(LAYOUT_PARSING_DLQ).build(); } + + // ---- Saas Migration ---- + @Bean + public Queue migrationQueue() { + + return QueueBuilder.durable(MIGRATION_REQUEST_QUEUE) + .withArgument("x-dead-letter-exchange", "") + .withArgument("x-dead-letter-routing-key", MIGRATION_DLQ) + .maxPriority(2) + .build(); + } + + + @Bean + public Queue migrationDLQ() { + + return QueueBuilder.durable(MIGRATION_DLQ).build(); + } + + + @Bean + public Queue migrationResponseQueue() { + + return QueueBuilder.durable(MIGRATION_RESPONSE_QUEUE) + .withArgument("x-dead-letter-exchange", "") + .withArgument("x-dead-letter-routing-key", MIGRATION_DLQ) + .maxPriority(2) + .build(); + } + } diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/TenantMessagingConfigurationImpl.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/TenantMessagingConfigurationImpl.java new file mode 100644 index 000000000..4a4f41e24 --- /dev/null +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/TenantMessagingConfigurationImpl.java @@ -0,0 +1,10 @@ +package com.iqser.red.service.persistence.management.v1.processor.configuration; + +import org.springframework.context.annotation.Configuration; + +import com.knecon.fforesight.tenantcommons.queue.TenantMessagingConfiguration; + +@Configuration +public class TenantMessagingConfigurationImpl extends TenantMessagingConfiguration { + +} diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/UserMessagingConfiguration.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/UserMessagingConfiguration.java index e3f399e5f..c8724e8a5 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/UserMessagingConfiguration.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/UserMessagingConfiguration.java @@ -22,7 +22,7 @@ public class UserMessagingConfiguration { public static final String PERSISTENCE_SERVICE_USER_STATUS_CHANGED_QUEUE = "persistence-service-user-status-changed-queue"; public static final String PERSISTENCE_SERVICE_USER_ROLES_UPDATED_QUEUE = "persistence-service-user-roles-updated-queue"; public static final String PERSISTENCE_SERVICE_USER_OWN_PROFILE_UPDATED_QUEUE = "persistence-service-user-own-profile-updated-queue"; - public static final String PERSISTENCE_SERVICE_USER_EVENTS_DQL = "persistence-service-user-events-dql"; + public static final String PERSISTENCE_SERVICE_USER_EVENTS_DLQ = "persistence-service-user-events-dlq"; @Bean("persistenceServiceUserRolesUpdatedQueue") @@ -30,7 +30,7 @@ public class UserMessagingConfiguration { return QueueBuilder.durable(PERSISTENCE_SERVICE_USER_ROLES_UPDATED_QUEUE) .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DQL) + .withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DLQ) .build(); } @@ -48,7 +48,7 @@ public class UserMessagingConfiguration { return QueueBuilder.durable(PERSISTENCE_SERVICE_USER_STATUS_CHANGED_QUEUE) .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DQL) + .withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DLQ) .build(); } @@ -66,7 +66,7 @@ public class UserMessagingConfiguration { return QueueBuilder.durable(PERSISTENCE_SERVICE_USER_UPDATED_QUEUE) .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DQL) + .withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DLQ) .build(); } @@ -84,7 +84,7 @@ public class UserMessagingConfiguration { return QueueBuilder.durable(PERSISTENCE_SERVICE_USER_DELETED_QUEUE) .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DQL) + .withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DLQ) .build(); } @@ -102,7 +102,7 @@ public class UserMessagingConfiguration { return QueueBuilder.durable(PERSISTENCE_SERVICE_USER_OWN_PROFILE_UPDATED_QUEUE) .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DQL) + .withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DLQ) .build(); } @@ -120,7 +120,7 @@ public class UserMessagingConfiguration { return QueueBuilder.durable(PERSISTENCE_SERVICE_USER_CREATED_QUEUE) .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DQL) + .withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DLQ) .build(); } @@ -136,7 +136,7 @@ public class UserMessagingConfiguration { @Bean public Queue persistenceServiceUserEventsDLQ() { - return QueueBuilder.durable(PERSISTENCE_SERVICE_USER_EVENTS_DQL).build(); + return QueueBuilder.durable(PERSISTENCE_SERVICE_USER_EVENTS_DLQ).build(); } diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/ExportDownloadMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/ExportDownloadMessageReceiver.java index 81d2bb711..0ac475961 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/ExportDownloadMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/ExportDownloadMessageReceiver.java @@ -28,6 +28,7 @@ import lombok.extern.slf4j.Slf4j; @FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) public class ExportDownloadMessageReceiver { + public static final String EXPORT_DOWNLOAD_LISTENER_ID = "export-download-listener"; static long MB = 1024 * 1024; DossierTemplateExportService dossierTemplateService; FileExchangeExportService fileExchangeExportService; @@ -36,7 +37,7 @@ public class ExportDownloadMessageReceiver { @RabbitHandler - @RabbitListener(queues = MessagingConfiguration.EXPORT_DOWNLOAD_QUEUE) + @RabbitListener(id = EXPORT_DOWNLOAD_LISTENER_ID) public void receive(ExportDownloadMessage downloadJob) throws JsonProcessingException { var download = downloadStatusPersistenceService.getStatus(downloadJob.getStorageId()); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/service/DossierTemplateExportService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/service/DossierTemplateExportService.java index 514a5d45a..ef8d39377 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/service/DossierTemplateExportService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/service/DossierTemplateExportService.java @@ -64,6 +64,7 @@ import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemp import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.DossierStatusInfo; import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileAttributeConfig; import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.legalbasis.LegalBasis; +import com.knecon.fforesight.tenantcommons.TenantContext; import io.micrometer.observation.annotation.Observed; import lombok.AccessLevel; @@ -118,7 +119,7 @@ public class DossierTemplateExportService { private void addToExportDownloadQueue(ExportDownloadMessage downloadJob, int priority) { - rabbitTemplate.convertAndSend(MessagingConfiguration.EXPORT_DOWNLOAD_QUEUE, downloadJob, message -> { + rabbitTemplate.convertAndSend(MessagingConfiguration.EXPORT_DOWNLOAD_EXCHANGE, TenantContext.getTenantId(), downloadJob, message -> { message.getMessageProperties().setPriority(priority); return message; }); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/service/FileExchangeExportService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/service/FileExchangeExportService.java index d3d8d38ad..f8d206918 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/service/FileExchangeExportService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/dataexchange/service/FileExchangeExportService.java @@ -22,6 +22,7 @@ import com.iqser.red.service.persistence.service.v1.api.shared.model.DownloadRes import com.iqser.red.service.persistence.service.v1.api.shared.model.FileExchangeExportRequest; import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.Dossier; import com.knecon.fforesight.keycloakcommons.security.KeycloakSecurity; +import com.knecon.fforesight.tenantcommons.TenantContext; import io.micrometer.observation.annotation.Observed; import lombok.AccessLevel; @@ -74,7 +75,7 @@ public class FileExchangeExportService { private void addToExportDownloadQueue(ExportDownloadMessage downloadJob) { - rabbitTemplate.convertAndSend(MessagingConfiguration.EXPORT_DOWNLOAD_QUEUE, downloadJob, message -> { + rabbitTemplate.convertAndSend(MessagingConfiguration.EXPORT_DOWNLOAD_EXCHANGE, TenantContext.getTenantId(), downloadJob, message -> { message.getMessageProperties().setPriority(1); return message; }); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/migration/SaasMigrationService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/migration/SaasMigrationService.java index c7f98794b..87530d203 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/migration/SaasMigrationService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/migration/SaasMigrationService.java @@ -1,7 +1,6 @@ package com.iqser.red.service.persistence.management.v1.processor.migration; -import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.MIGRATION_QUEUE; -import static com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_QUEUE; +import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.MIGRATION_REQUEST_QUEUE; import java.time.OffsetDateTime; import java.util.List; @@ -39,6 +38,7 @@ import com.iqser.red.storage.commons.exception.StorageObjectDoesNotExist; import com.iqser.red.storage.commons.service.StorageService; import com.knecon.fforesight.databasetenantcommons.providers.TenantSyncService; import com.knecon.fforesight.databasetenantcommons.providers.events.TenantSyncEvent; +import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames; import com.knecon.fforesight.tenantcommons.TenantContext; import com.knecon.fforesight.tenantcommons.TenantProvider; import com.knecon.fforesight.tenantcommons.model.UpdateDetailsRequest; @@ -125,7 +125,7 @@ public class SaasMigrationService implements TenantSyncService { var layoutParsingRequest = layoutParsingRequestFactory.build(file.getDossierId(), file.getFileId(), false); - rabbitTemplate.convertAndSend(LAYOUT_PARSING_REQUEST_QUEUE, layoutParsingRequest); + rabbitTemplate.convertAndSend(LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_EXCHANGE, TenantContext.getTenantId(), layoutParsingRequest); numberOfFiles++; @@ -160,7 +160,7 @@ public class SaasMigrationService implements TenantSyncService { log.info("Starting Migration for dossierId {} and fileId {}", dossierId, fileId); saasMigrationStatusPersistenceService.createMigrationRequiredStatus(dossierId, fileId); var layoutParsingRequest = layoutParsingRequestFactory.build(dossierId, fileId, false); - rabbitTemplate.convertAndSend(LAYOUT_PARSING_REQUEST_QUEUE, layoutParsingRequest); + rabbitTemplate.convertAndSend(LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_EXCHANGE, TenantContext.getTenantId(), layoutParsingRequest); } @@ -183,7 +183,7 @@ public class SaasMigrationService implements TenantSyncService { String dossierTemplateId = dossierService.getDossierById(dossierId).getDossierTemplateId(); - rabbitTemplate.convertAndSend(MIGRATION_QUEUE, + rabbitTemplate.convertAndSend(MIGRATION_REQUEST_QUEUE, MigrationRequest.builder() .dossierTemplateId(dossierTemplateId) .dossierId(dossierId) @@ -230,13 +230,13 @@ public class SaasMigrationService implements TenantSyncService { } - public void handleError(String dossierId, String fileId, String errorCause, String retryQueue) { + public void handleError(String dossierId, String fileId, String errorCause, String retryExchange) { var migrationEntry = saasMigrationStatusPersistenceService.findById(fileId); Integer numErrors = migrationEntry.getProcessingErrorCounter(); if (numErrors != null && numErrors <= settings.getMaxErrorRetries()) { saasMigrationStatusPersistenceService.updateErrorCounter(fileId, numErrors + 1, errorCause); - rabbitTemplate.convertAndSend(retryQueue, MigrationRequest.builder().dossierId(dossierId).fileId(fileId).build()); + rabbitTemplate.convertAndSend(retryExchange, TenantContext.getTenantId(), MigrationRequest.builder().dossierId(dossierId).fileId(fileId).build()); log.error("Retrying error during saas migration for tenant {} dossier {} and file {}, cause {}", TenantContext.getTenantId(), dossierId, fileId, errorCause); } else { saasMigrationStatusPersistenceService.updateErrorStatus(fileId, errorCause); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/migration/migrations/QueueRenameMigration23.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/migration/migrations/QueueRenameMigration23.java new file mode 100644 index 000000000..7b4a45d64 --- /dev/null +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/migration/migrations/QueueRenameMigration23.java @@ -0,0 +1,101 @@ +package com.iqser.red.service.persistence.management.v1.processor.migration.migrations; + +import java.util.List; + +import org.springframework.amqp.core.AmqpAdmin; +import org.springframework.stereotype.Service; + +import com.iqser.red.service.persistence.management.v1.processor.migration.Migration; + +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Setter +@Service +public class QueueRenameMigration23 extends Migration { + + private final AmqpAdmin amqpAdmin; + + private static final String NAME = "Migration for renaming of most queues"; + private static final long VERSION = 23; + private static final List queueNames = List.of("analysis_flag_calculation_queue", + "cv_analysis_dead_letter_queue", + "cv_analysis_request_queue", + "cv_analysis_response_queue", + "deleteFromIndexDLQ", + "deleteFromIndexQueue", + "downloadDLQ", + "downloadQueue", + "download_compression_dlq", + "download_compression_queue", + "entity_dead_letter_queue", + "entity_request_queue", + "entity_response_queue", + "exportDownloadDLQ", + "exportDownloadQueue", + "image_dead_letter_queue", + "image_request_queue", + "image_response_queue", + "indexingDQL", + "indexingQueue", + "layout_parsing_dead_letter_queue", + "layout_parsing_request_queue", + "layout_parsing_response_queue", + "migrationDLQ", + "migrationQueue", + "migrationResponseQueue", + "mongo-tenant-created", + "mongo-tenant-created-dlq", + "ocr_dead_letter_queue", + "ocr_request_queue", + "ocr_response_queue", + "ocr_status_update_dead_letter_queue", + "ocr_status_update_response_queue", + "pdftron_dlq", + "pdftron_queue", + "pdftron_result_queue", + "persistence-service-user-created-queue", + "persistence-service-user-deleted-queue", + "persistence-service-user-events-dql", + "persistence-service-user-own-profile-updated-queue", + "persistence-service-user-roles-updated-queue", + "persistence-service-user-status-changed-queue", + "persistence-service-user-updated-queue", + "preprocessingDLQ", + "preprocessingQueue", + "redactionAnalysisResponseQueue", + "redactionDQL", + "redactionPriorityQueue", + "redactionQueue", + "reportDLQ", + "reportQueue", + "reportResultDLQ", + "reportResultQueue", + "tenant-created", + "tenant-created-dlq", + "tenant-delete-dlq", + "tenant-delete-queue", + "tenant-sync", + "tenant-sync-dlq", + "visual_layout_parsing_service_dead_letter_queue", + "visual_layout_parsing_service_queue", + "visual_layout_parsing_service_response_queue"); + + + public QueueRenameMigration23(AmqpAdmin amqpAdmin) { + + super(NAME, VERSION); + this.amqpAdmin = amqpAdmin; + } + + + @Override + protected void migrate() { + + log.info("Migration: Deleting queues..."); + queueNames.forEach(amqpAdmin::deleteQueue); + + } + +} diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/DownloadService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/DownloadService.java index 55af4280e..5ce2fbedd 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/DownloadService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/DownloadService.java @@ -28,6 +28,7 @@ import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemp import com.iqser.red.service.persistence.service.v1.api.shared.model.download.DownloadRequest; import com.iqser.red.service.persistence.service.v1.api.shared.model.download.DownloadStatus; import com.iqser.red.service.persistence.service.v1.api.shared.model.download.DownloadWithOptionRequest; +import com.knecon.fforesight.tenantcommons.TenantContext; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -139,7 +140,7 @@ public class DownloadService { private void addToDownloadQueue(DownloadJob downloadJob, int priority) { - rabbitTemplate.convertAndSend(MessagingConfiguration.DOWNLOAD_QUEUE, downloadJob, message -> { + rabbitTemplate.convertAndSend(MessagingConfiguration.DOWNLOAD_EXCHANGE, TenantContext.getTenantId(), downloadJob, message -> { message.getMessageProperties().setPriority(priority); return message; }); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusService.java index 08c23a611..8f1cf4821 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusService.java @@ -1,46 +1,41 @@ package com.iqser.red.service.persistence.management.v1.processor.service; import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.CHUNKING_SERVICE_QUEUE; -import static com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_QUEUE; import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import com.iqser.red.service.pdftron.redaction.v1.api.model.ApplicationType; -import com.iqser.red.service.persistence.management.v1.processor.exception.BadRequestException; -import com.iqser.red.service.persistence.management.v1.processor.exception.NotFoundException; - -import com.iqser.red.service.persistence.management.v1.processor.model.websocket.AnalyseStatus; -import com.iqser.red.service.persistence.management.v1.processor.model.websocket.FileEventType; -import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.ChunkingRequestFactory; -import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.QueueMessageIdentifierService; -import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DossierTemplatePersistenceService; - import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import com.google.common.collect.Sets; +import com.iqser.red.service.pdftron.redaction.v1.api.model.ApplicationType; import com.iqser.red.service.pdftron.redaction.v1.api.model.ProcessUntouchedDocumentRequest; import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration; import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.FileAttributeEntity; import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.FileEntity; +import com.iqser.red.service.persistence.management.v1.processor.exception.BadRequestException; import com.iqser.red.service.persistence.management.v1.processor.exception.InternalServerErrorException; +import com.iqser.red.service.persistence.management.v1.processor.exception.NotFoundException; import com.iqser.red.service.persistence.management.v1.processor.model.CvAnalysisServiceRequest; import com.iqser.red.service.persistence.management.v1.processor.model.FileIdentifier; import com.iqser.red.service.persistence.management.v1.processor.model.ManualChangesQueryOptions; -import com.iqser.red.service.persistence.service.v1.api.shared.model.NerServiceRequest; import com.iqser.red.service.persistence.management.v1.processor.model.OCRStatusUpdateResponse; import com.iqser.red.service.persistence.management.v1.processor.model.VisualLayoutParsingServiceRequest; import com.iqser.red.service.persistence.management.v1.processor.model.image.ImageServiceRequest; +import com.iqser.red.service.persistence.management.v1.processor.model.websocket.AnalyseStatus; +import com.iqser.red.service.persistence.management.v1.processor.model.websocket.FileEventType; +import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.ChunkingRequestFactory; import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.LayoutParsingRequestFactory; +import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.QueueMessageIdentifierService; import com.iqser.red.service.persistence.management.v1.processor.service.manualredactions.ManualRedactionProviderService; import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DossierPersistenceService; +import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DossierTemplatePersistenceService; import com.iqser.red.service.persistence.management.v1.processor.service.persistence.FileAttributeConfigPersistenceService; import com.iqser.red.service.persistence.management.v1.processor.service.persistence.FileStatusPersistenceService; import com.iqser.red.service.persistence.management.v1.processor.service.persistence.ViewedPagesPersistenceService; @@ -59,6 +54,7 @@ import com.iqser.red.service.persistence.service.v1.api.shared.model.AnalyzeRequ import com.iqser.red.service.persistence.service.v1.api.shared.model.AnalyzeResult; import com.iqser.red.service.persistence.service.v1.api.shared.model.FileAttribute; import com.iqser.red.service.persistence.service.v1.api.shared.model.MessageType; +import com.iqser.red.service.persistence.service.v1.api.shared.model.NerServiceRequest; import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileErrorInfo; import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileModel; import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileType; @@ -68,7 +64,9 @@ import com.iqser.red.service.persistence.service.v1.api.shared.mongo.service.Com import com.iqser.red.service.persistence.service.v1.api.shared.mongo.service.EntityLogMongoService; import com.knecon.fforesight.databasetenantcommons.providers.utils.MagicConverter; import com.knecon.fforesight.llm.service.LlmNerMessage; +import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames; import com.knecon.fforesight.service.ocr.v1.api.model.DocumentRequest; +import com.knecon.fforesight.tenantcommons.TenantContext; import jakarta.transaction.Transactional; import lombok.AccessLevel; @@ -279,7 +277,7 @@ public class FileStatusService { var layoutParsingRequest = layoutParsingRequestFactory.build(dossierId, fileId, priority); setStatusFullProcessing(fileId); log.info("Add file: {} from dossier {} to layout parsing request queue", fileId, dossierId); - rabbitTemplate.convertAndSend(LAYOUT_PARSING_REQUEST_QUEUE, layoutParsingRequest); + rabbitTemplate.convertAndSend(LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_EXCHANGE, TenantContext.getTenantId(), layoutParsingRequest); sendReadOnlyAnalysisEvent(dossierId, fileId, fileEntity); return; } @@ -351,9 +349,9 @@ public class FileStatusService { } if (priority) { - rabbitTemplate.convertAndSend(MessagingConfiguration.REDACTION_PRIORITY_QUEUE, analyseRequest); + rabbitTemplate.convertAndSend(MessagingConfiguration.REDACTION_PRIORITY_REQUEST_EXCHANGE, TenantContext.getTenantId(), analyseRequest); } else { - rabbitTemplate.convertAndSend(MessagingConfiguration.REDACTION_QUEUE, analyseRequest); + rabbitTemplate.convertAndSend(MessagingConfiguration.REDACTION_REQUEST_EXCHANGE, TenantContext.getTenantId(), analyseRequest); } sendAnalysisEvent(dossierId, fileId, fileEntity); } @@ -385,7 +383,8 @@ public class FileStatusService { private void addToVisualLayoutParsingQueue(String dossierId, String fileId) { fileStatusPersistenceService.updateProcessingStatus(fileId, ProcessingStatus.VISUAL_LAYOUT_PARSING_ANALYZING); - rabbitTemplate.convertAndSend(MessagingConfiguration.VISUAL_LAYOUT_PARSING_QUEUE, + rabbitTemplate.convertAndSend(MessagingConfiguration.VISUAL_LAYOUT_PARSING_REQUEST_EXCHANGE, + TenantContext.getTenantId(), VisualLayoutParsingServiceRequest.builder() .dossierId(dossierId) .fileId(fileId) @@ -412,7 +411,7 @@ public class FileStatusService { setStatusPreProcessingQueued(fileId); - rabbitTemplate.convertAndSend(MessagingConfiguration.PRE_PROCESSING_QUEUE, processUntouchedDocumentRequest); + rabbitTemplate.convertAndSend(MessagingConfiguration.PREPROCESSING_EXCHANGE, TenantContext.getTenantId(), processUntouchedDocumentRequest); } @@ -426,7 +425,8 @@ public class FileStatusService { fileStatusPersistenceService.updateProcessingStatus(fileId, ProcessingStatus.FIGURE_DETECTION_ANALYZING); - rabbitTemplate.convertAndSend(MessagingConfiguration.CV_ANALYSIS_QUEUE, + rabbitTemplate.convertAndSend(MessagingConfiguration.CV_ANALYSIS_REQUEST_EXCHANGE, + TenantContext.getTenantId(), CvAnalysisServiceRequest.builder() .dossierId(dossierId) .fileId(fileId) @@ -445,7 +445,8 @@ public class FileStatusService { fileStatusPersistenceService.updateProcessingStatus(fileId, ProcessingStatus.TABLE_PARSING_ANALYZING); - rabbitTemplate.convertAndSend(MessagingConfiguration.CV_ANALYSIS_QUEUE, + rabbitTemplate.convertAndSend(MessagingConfiguration.CV_ANALYSIS_REQUEST_EXCHANGE, + TenantContext.getTenantId(), CvAnalysisServiceRequest.builder() .dossierId(dossierId) .fileId(fileId) @@ -463,7 +464,8 @@ public class FileStatusService { public void addToImageQueue(String dossierId, String fileId) { setStatusImageAnalyzing(fileId); - rabbitTemplate.convertAndSend(MessagingConfiguration.IMAGE_SERVICE_QUEUE, + rabbitTemplate.convertAndSend(MessagingConfiguration.IMAGE_REQUEST_EXCHANGE, + TenantContext.getTenantId(), ImageServiceRequest.builder() .dossierId(dossierId) .fileId(fileId) @@ -496,7 +498,8 @@ public class FileStatusService { protected void addToNerQueue(String dossierId, String fileId) { setStatusNerAnalyzing(fileId); - rabbitTemplate.convertAndSend(MessagingConfiguration.NER_SERVICE_QUEUE, + rabbitTemplate.convertAndSend(MessagingConfiguration.ENTITY_REQUEST_EXCHANGE, + TenantContext.getTenantId(), NerServiceRequest.builder() .dossierId(dossierId) .fileId(fileId) @@ -533,7 +536,8 @@ public class FileStatusService { protected void addToAzureNerQueue(String dossierId, String fileId) { setStatusNerAnalyzing(fileId); - rabbitTemplate.convertAndSend(MessagingConfiguration.AZURE_NER_SERVICE_QUEUE, + rabbitTemplate.convertAndSend(MessagingConfiguration.AZURE_ENTITY_REQUEST_EXCHANGE, + TenantContext.getTenantId(), NerServiceRequest.builder() .dossierId(dossierId) .fileId(fileId) @@ -624,10 +628,13 @@ public class FileStatusService { public void addToOcrQueue(String dossierId, String fileId, int priority) { var removeWatermark = dossierTemplatePersistenceService.getDossierTemplate(dossierPersistenceService.getDossierTemplateId(dossierId)).isRemoveWatermark(); - rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_REQUEST_QUEUE, new DocumentRequest(dossierId, fileId, removeWatermark), message -> { - message.getMessageProperties().setPriority(priority); - return message; - }); + rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_REQUEST_EXCHANGE, + TenantContext.getTenantId(), + new DocumentRequest(dossierId, fileId, removeWatermark), + message -> { + message.getMessageProperties().setPriority(priority); + return message; + }); } diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/IndexingService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/IndexingService.java index 5b71c0a21..2d3e0f1f7 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/IndexingService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/IndexingService.java @@ -15,6 +15,7 @@ import com.iqser.red.service.persistence.management.v1.processor.entity.dossier. import com.iqser.red.service.persistence.management.v1.processor.service.persistence.FileStatusPersistenceService; import com.iqser.red.service.search.v1.model.IndexMessage; import com.iqser.red.service.search.v1.model.IndexMessageType; +import com.knecon.fforesight.tenantcommons.TenantContext; import lombok.RequiredArgsConstructor; @@ -66,7 +67,8 @@ public class IndexingService { public void addToIndexingQueue(IndexMessageType indexMessageType, String dossierTemplateId, String dossierId, String fileId, int priority) { - rabbitTemplate.convertAndSend(MessagingConfiguration.INDEXING_QUEUE, + rabbitTemplate.convertAndSend(MessagingConfiguration.INDEXING_EXCHANGE, + TenantContext.getTenantId(), IndexMessage.builder().messageType(indexMessageType).dossierTemplateId(dossierTemplateId).dossierId(dossierId).fileId(fileId).build(), message -> { message.getMessageProperties().setPriority(priority); @@ -77,10 +79,13 @@ public class IndexingService { public void addToDeleteFromIndexQueue(String dossierId, String fileId, int priority) { - rabbitTemplate.convertAndSend(MessagingConfiguration.DELETE_FROM_INDEX_QUEUE, IndexMessage.builder().dossierId(dossierId).fileId(fileId).build(), message -> { - message.getMessageProperties().setPriority(priority); - return message; - }); + rabbitTemplate.convertAndSend(MessagingConfiguration.DELETE_FROM_INDEX_EXCHANGE, + TenantContext.getTenantId(), + IndexMessage.builder().dossierId(dossierId).fileId(fileId).build(), + message -> { + message.getMessageProperties().setPriority(priority); + return message; + }); } } diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadCompressingService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadCompressingService.java index 20e920c8a..b0414806d 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadCompressingService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadCompressingService.java @@ -8,6 +8,7 @@ import com.iqser.red.service.persistence.management.v1.processor.model.DownloadJ import com.iqser.red.service.persistence.management.v1.processor.service.websocket.WebsocketService; import com.iqser.red.service.persistence.management.v1.processor.service.persistence.repository.DownloadStatusRepository; import com.iqser.red.service.persistence.service.v1.api.shared.model.download.DownloadStatusValue; +import com.knecon.fforesight.tenantcommons.TenantContext; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -29,7 +30,9 @@ public class DownloadCompressingService { if (updated == 1) { websocketService.sendDownloadEvent(downloadId, userId, DownloadStatusValue.COMPRESSING); - rabbitTemplate.convertAndSend(MessagingConfiguration.DOWNLOAD_COMPRESSION_QUEUE, DownloadJob.builder().storageId(downloadId).userId(userId).build()); + rabbitTemplate.convertAndSend(MessagingConfiguration.DOWNLOAD_COMPRESSION_EXCHANGE, + TenantContext.getTenantId(), + DownloadJob.builder().storageId(downloadId).userId(userId).build()); } } diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadCompressionMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadCompressionMessageReceiver.java index d9ab7b453..138e97085 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadCompressionMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadCompressionMessageReceiver.java @@ -1,13 +1,11 @@ package com.iqser.red.service.persistence.management.v1.processor.service.download; -import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration; import com.iqser.red.service.persistence.management.v1.processor.model.DownloadJob; import lombok.RequiredArgsConstructor; @@ -19,13 +17,15 @@ import lombok.extern.slf4j.Slf4j; @RequiredArgsConstructor public class DownloadCompressionMessageReceiver { + public static final String DOWNLOAD_COMPRESSION_LISTENER_ID = "download-compression-listener"; + private final DownloadCompressingService downloadCompressingService; private final ObjectMapper objectMapper; @SneakyThrows @RabbitHandler - @RabbitListener(queues = MessagingConfiguration.DOWNLOAD_COMPRESSION_QUEUE) + @RabbitListener(id = DOWNLOAD_COMPRESSION_LISTENER_ID) public void receive(DownloadJob downloadJob) throws JsonProcessingException { downloadCompressingService.compressDownload(downloadJob.getStorageId()); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadDLQMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadDLQMessageReceiver.java index b59bb9e75..8616882d7 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadDLQMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadDLQMessageReceiver.java @@ -56,7 +56,7 @@ public class DownloadDLQMessageReceiver { @SneakyThrows - @RabbitListener(queues = MessagingConfiguration.REPORT_DLQ) + @RabbitListener(queues = MessagingConfiguration.REPORT_REQUEST_DLQ) public void handleReportDlqMessage(Message failedMessage) { var reportRequestMessage = objectMapper.readValue(failedMessage.getBody(), ReportRequestMessage.class); @@ -72,7 +72,7 @@ public class DownloadDLQMessageReceiver { } - @RabbitListener(queues = MessagingConfiguration.REPORT_RESULT_DLQ) + @RabbitListener(queues = MessagingConfiguration.REPORT_RESPONSE_DLQ) public void handleReportResponseDlqMessage(ReportResultMessage reportResultMessage) { log.warn("Handling report request in DLQ userId: {} storageId: {} - setting status to error!", reportResultMessage.getUserId(), reportResultMessage.getDownloadId()); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadMessageReceiver.java index dfe25a1e9..1aadb2b42 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadMessageReceiver.java @@ -8,7 +8,6 @@ import org.springframework.stereotype.Service; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration; import com.iqser.red.service.persistence.management.v1.processor.model.DownloadJob; import lombok.RequiredArgsConstructor; @@ -20,13 +19,15 @@ import lombok.extern.slf4j.Slf4j; @RequiredArgsConstructor public class DownloadMessageReceiver { + public static final String DOWNLOAD_LISTENER_ID = "download-listener"; + private final DownloadProcessorService downloadProcessorService; private final ObjectMapper objectMapper; @SneakyThrows @RabbitHandler - @RabbitListener(queues = MessagingConfiguration.DOWNLOAD_QUEUE) + @RabbitListener(id = DOWNLOAD_LISTENER_ID) public void receive(Message message) throws JsonProcessingException { DownloadJob downloadJob = objectMapper.readValue(message.getBody(), DownloadJob.class); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadPreparationService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadPreparationService.java index 091825021..b8df4091d 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadPreparationService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadPreparationService.java @@ -86,7 +86,7 @@ public class DownloadPreparationService { .forEach(fileEntity -> { RedactionMessage message = messageBuilder.fileId(fileEntity.getId()).unapprovedFile(fileEntity.getWorkflowStatus() != WorkflowStatus.APPROVED).build(); log.info("Sending redaction request for downloadId:{} fileId:{} to pdftron-redaction-queue", downloadId, fileEntity.getId()); - rabbitTemplate.convertAndSend(MessagingConfiguration.PDFTRON_QUEUE, message); + rabbitTemplate.convertAndSend(MessagingConfiguration.PDFTRON_REQUEST_EXCHANGE, TenantContext.getTenantId(), message); }); } diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadProcessorService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadProcessorService.java index a5f91152c..b41086b4b 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadProcessorService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadProcessorService.java @@ -17,6 +17,7 @@ import com.iqser.red.service.persistence.management.v1.processor.service.websock import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DownloadStatusPersistenceService; import com.iqser.red.service.persistence.service.v1.api.shared.model.download.DownloadStatusValue; import com.iqser.red.service.redaction.report.v1.api.model.ReportRequestMessage; +import com.knecon.fforesight.tenantcommons.TenantContext; import jakarta.transaction.Transactional; import lombok.RequiredArgsConstructor; @@ -64,7 +65,7 @@ public class DownloadProcessorService { private void addReportQueue(ReportRequestMessage reportRequestMessage, int priority) { - rabbitTemplate.convertAndSend(MessagingConfiguration.REPORT_QUEUE, reportRequestMessage, message -> { + rabbitTemplate.convertAndSend(MessagingConfiguration.REPORT_REQUEST_EXCHANGE, TenantContext.getTenantId(), reportRequestMessage, message -> { message.getMessageProperties().setPriority(priority); return message; }); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadReportMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadReportMessageReceiver.java index 9ba04e78e..6a918470e 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadReportMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/DownloadReportMessageReceiver.java @@ -23,13 +23,15 @@ import lombok.extern.slf4j.Slf4j; @FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) public class DownloadReportMessageReceiver { + public static final String REPORT_RESPONSE_LISTENER_ID = "report-response-listener"; + ObjectMapper objectMapper; DownloadPreparationService downloadPreparationService; @SneakyThrows @RabbitHandler - @RabbitListener(queues = MessagingConfiguration.REPORT_RESULT_QUEUE) + @RabbitListener(id = REPORT_RESPONSE_LISTENER_ID) public void receive(Message message) { ReportResultMessage reportResultMessage = objectMapper.readValue(message.getBody(), ReportResultMessage.class); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/RedactionDlqMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/RedactionDlqMessageReceiver.java index ab0928b1d..cb2167286 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/RedactionDlqMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/RedactionDlqMessageReceiver.java @@ -8,7 +8,7 @@ import com.iqser.red.service.persistence.management.v1.processor.configuration.M import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DownloadStatusPersistenceService; import com.iqser.red.service.persistence.management.v1.processor.settings.FileManagementServiceSettings; import com.iqser.red.service.persistence.service.v1.api.shared.model.download.DownloadStatusValue; -import com.iqser.red.service.redaction.report.v1.api.model.ReportRequestMessage; +import com.knecon.fforesight.tenantcommons.TenantContext; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; @@ -64,11 +64,11 @@ public class RedactionDlqMessageReceiver { downloadPreparationService.clearRedactionStatusEntries(redactionMessage.getDownloadId()); } else { downloadPreparationService.increaseProcessingErrorCounter(redactionMessage, numErrors); - rabbitTemplate.convertAndSend(MessagingConfiguration.PDFTRON_QUEUE, message); + rabbitTemplate.convertAndSend(MessagingConfiguration.PDFTRON_REQUEST_EXCHANGE, TenantContext.getTenantId(), message); } } else { downloadPreparationService.markFileAsProcessingError(redactionMessage); - rabbitTemplate.convertAndSend(MessagingConfiguration.PDFTRON_QUEUE, message); + rabbitTemplate.convertAndSend(MessagingConfiguration.PDFTRON_REQUEST_EXCHANGE, TenantContext.getTenantId(), message); } } diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/RedactionResultMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/RedactionResultMessageReceiver.java index 2e9f19bbd..b8a024b23 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/RedactionResultMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/download/RedactionResultMessageReceiver.java @@ -23,13 +23,15 @@ import org.springframework.stereotype.Service; @FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) public class RedactionResultMessageReceiver { + public static final String PDFTRON_RESPONSE_LISTENER_ID = "pdftron-response-listener"; + ObjectMapper objectMapper; DownloadPreparationService downloadPreparationService; @SneakyThrows @RabbitHandler - @RabbitListener(queues = MessagingConfiguration.PDFTRON_RESULT_QUEUE) + @RabbitListener(id = PDFTRON_RESPONSE_LISTENER_ID) public void receive(Message message) throws JsonProcessingException { RedactionResultMessage redactionResultMessage = objectMapper.readValue(message.getBody(), RedactionResultMessage.class); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/job/AnalysisFlagCalculationSchedulerJob.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/job/AnalysisFlagCalculationSchedulerJob.java index 4b1c80f4d..449dfdcc9 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/job/AnalysisFlagCalculationSchedulerJob.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/job/AnalysisFlagCalculationSchedulerJob.java @@ -51,7 +51,8 @@ import lombok.extern.slf4j.Slf4j; log.debug("Found {} files which require analysis flag calculation. Tenant: {}", fileIdentifiers.size(), tenant.getDisplayName()); - fileIdentifiers.forEach(fileIdentifier -> rabbitTemplate.convertAndSend(MessagingConfiguration.ANALYSIS_FLAG_CALCULATION_QUEUE, + fileIdentifiers.forEach(fileIdentifier -> rabbitTemplate.convertAndSend(MessagingConfiguration.ANALYSIS_FLAG_CALCULATION_EXCHANGE, + TenantContext.getTenantId(), new AnalysisFlagCalculationMessage(fileIdentifier.dossierId(), fileIdentifier.fileId()))); TenantContext.clear(); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/job/AutomaticAnalysisJob.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/job/AutomaticAnalysisJob.java index 3bca9e1e5..48f8ee47e 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/job/AutomaticAnalysisJob.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/job/AutomaticAnalysisJob.java @@ -74,11 +74,12 @@ public class AutomaticAnalysisJob implements Job { return; } - var redactionQueueInfo = amqpAdmin.getQueueInfo(MessagingConfiguration.REDACTION_QUEUE); + String queueName = MessagingConfiguration.REDACTION_REQUEST_QUEUE_PREFIX + "_" + tenant.getTenantId(); + var redactionQueueInfo = amqpAdmin.getQueueInfo(queueName); if (redactionQueueInfo != null) { log.debug("[Tenant:{}] Checking queue status to see if background analysis can happen. Currently {} holds {} elements and has {} consumers", tenant.getTenantId(), - MessagingConfiguration.REDACTION_QUEUE, + queueName, redactionQueueInfo.getMessageCount(), redactionQueueInfo.getConsumerCount()); // only 1 file in queue @@ -105,7 +106,7 @@ public class AutomaticAnalysisJob implements Job { } } else { - log.info("[Tenant:{}] Failed to obtain queue info for queue: {}", TenantContext.getTenantId(), MessagingConfiguration.REDACTION_QUEUE); + log.info("[Tenant:{}] Failed to obtain queue info for queue: {}", TenantContext.getTenantId(), queueName); } }); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/manualredactions/ManualRedactionService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/manualredactions/ManualRedactionService.java index 0085e2102..d1c6fe765 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/manualredactions/ManualRedactionService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/manualredactions/ManualRedactionService.java @@ -81,6 +81,7 @@ import com.iqser.red.service.persistence.service.v1.api.shared.model.manual.Resi import com.iqser.red.service.persistence.service.v1.api.shared.mongo.service.EntityLogMongoService; import com.knecon.fforesight.databasetenantcommons.providers.utils.MagicConverter; import com.knecon.fforesight.keycloakcommons.security.KeycloakSecurity; +import com.knecon.fforesight.tenantcommons.TenantContext; import io.micrometer.observation.annotation.Observed; import lombok.AccessLevel; @@ -667,7 +668,7 @@ public class ManualRedactionService { .build(); log.info("Sending Surrounding Text Analysis for unprocessed manual redactions: {} for file: {} ", manualRedactions, fileId); - rabbitTemplate.convertAndSend(MessagingConfiguration.REDACTION_PRIORITY_QUEUE, analyseRequest); + rabbitTemplate.convertAndSend(MessagingConfiguration.REDACTION_PRIORITY_REQUEST_EXCHANGE, TenantContext.getTenantId(), analyseRequest); } diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/AnalysisFlagsCalculationMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/AnalysisFlagsCalculationMessageReceiver.java index 8ed2d5eca..cd404f848 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/AnalysisFlagsCalculationMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/AnalysisFlagsCalculationMessageReceiver.java @@ -6,7 +6,6 @@ import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import com.fasterxml.jackson.databind.ObjectMapper; -import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration; import com.iqser.red.service.persistence.management.v1.processor.model.AnalysisFlagCalculationMessage; import com.iqser.red.service.persistence.management.v1.processor.service.AnalysisFlagsCalculationService; @@ -22,13 +21,15 @@ import lombok.extern.slf4j.Slf4j; @FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) public class AnalysisFlagsCalculationMessageReceiver { + public static final String ANALYSIS_FLAG_CALCULATION_LISTENER_ID = "analysis-flag-calculation-listener"; + AnalysisFlagsCalculationService analysisFlagsCalculationService; ObjectMapper mapper; @SneakyThrows @RabbitHandler - @RabbitListener(queues = MessagingConfiguration.ANALYSIS_FLAG_CALCULATION_QUEUE) + @RabbitListener(id = ANALYSIS_FLAG_CALCULATION_LISTENER_ID) public void receiveAnalysisFlagCalculationMessage(Message message) { var analysisFlagCalculationMessage = mapper.readValue(message.getBody(), AnalysisFlagCalculationMessage.class); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/AzureNerMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/AzureNerMessageReceiver.java index 06ecd074b..350aaee5f 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/AzureNerMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/AzureNerMessageReceiver.java @@ -27,14 +27,15 @@ import lombok.extern.slf4j.Slf4j; @RequiredArgsConstructor public class AzureNerMessageReceiver { + public static final String AZURE_ENTITY_RESPONSE_LISTENER_ID = "azure-entity-response-listener"; + private final FileStatusService fileStatusService; private final ObjectMapper objectMapper; private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService; @SneakyThrows - @RabbitHandler - @RabbitListener(queues = MessagingConfiguration.AZURE_NER_SERVICE_RESPONSE_QUEUE) + @RabbitListener(id = AZURE_ENTITY_RESPONSE_LISTENER_ID) public void receive(Message message) { HashMap entityResponse = objectMapper.readValue(message.getBody(), new TypeReference<>() { @@ -43,13 +44,12 @@ public class AzureNerMessageReceiver { String dossierId = (String) entityResponse.get("dossierId"); String fileId = (String) entityResponse.get("fileId"); - log.info("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.AZURE_NER_SERVICE_RESPONSE_QUEUE, dossierId, fileId); + log.info("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.AZURE_ENTITY_RESPONSE_EXCHANGE, dossierId, fileId); fileStatusService.setStatusAnalyse(dossierId, fileId, false); } - @RabbitHandler - @RabbitListener(queues = MessagingConfiguration.AZURE_NER_SERVICE_DLQ) + @RabbitListener(queues = MessagingConfiguration.AZURE_ENTITY_DLQ) public void handleDLQMessage(Message failedMessage) throws IOException { HashMap entityResponse = objectMapper.readValue(failedMessage.getBody(), new TypeReference<>() { @@ -57,11 +57,11 @@ public class AzureNerMessageReceiver { String dossierId = (String) entityResponse.get("dossierId"); String fileId = (String) entityResponse.get("fileId"); - log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.AZURE_NER_SERVICE_DLQ, dossierId, fileId); + log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.AZURE_ENTITY_DLQ, dossierId, fileId); fileStatusProcessingUpdateService.analysisFailed(dossierId, fileId, new FileErrorInfo("azure ner service failed", - MessagingConfiguration.AZURE_NER_SERVICE_DLQ, + MessagingConfiguration.AZURE_ENTITY_DLQ, "azure-ner-service", OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS))); } diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/CvAnalysisMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/CvAnalysisMessageReceiver.java index 0fbc1d432..6e5871ca8 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/CvAnalysisMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/CvAnalysisMessageReceiver.java @@ -24,6 +24,8 @@ import lombok.extern.slf4j.Slf4j; @RequiredArgsConstructor public class CvAnalysisMessageReceiver { + public static final String CV_ANALYSIS_RESPONSE_LISTENER_ID = "cv-analysis-response-listener"; + private final ObjectMapper objectMapper; private final FileStatusService fileStatusService; private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService; @@ -31,13 +33,16 @@ public class CvAnalysisMessageReceiver { @SneakyThrows - @RabbitListener(queues = MessagingConfiguration.CV_ANALYSIS_RESPONSE_QUEUE) + @RabbitListener(id = CV_ANALYSIS_RESPONSE_LISTENER_ID) public void receive(CvAnalysisServiceResponse response) { addFileIdToTrace(response.getFileId()); fileStatusService.setStatusAnalyse(response.getDossierId(), response.getFileId(), false); - log.info("Received message in {} for dossierId {} and fileId {}", MessagingConfiguration.CV_ANALYSIS_RESPONSE_QUEUE, response.getDossierId(), response.getFileId()); + log.info("Received message from exchange {} for dossierId {} and fileId {}", + MessagingConfiguration.CV_ANALYSIS_RESPONSE_EXCHANGE, + response.getDossierId(), + response.getFileId()); } @@ -48,11 +53,11 @@ public class CvAnalysisMessageReceiver { var response = objectMapper.readValue(failedMessage.getBody(), CvAnalysisServiceResponse.class); addFileIdToTrace(response.getFileId()); - log.warn("Received message from {} for dossierId {} and fileId {}", MessagingConfiguration.IMAGE_SERVICE_DLQ, response.getDossierId(), response.getFileId()); + log.warn("Received message from {} for dossierId {} and fileId {}", MessagingConfiguration.IMAGE_DLQ, response.getDossierId(), response.getFileId()); fileStatusProcessingUpdateService.analysisFailed(response.getDossierId(), response.getFileId(), new FileErrorInfo("cv analysis failed", - MessagingConfiguration.IMAGE_SERVICE_DLQ, + MessagingConfiguration.IMAGE_DLQ, "cv-analysis-service-v1", OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS))); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/ImageMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/ImageMessageReceiver.java index a1807c6b5..486869eaa 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/ImageMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/ImageMessageReceiver.java @@ -27,6 +27,8 @@ import lombok.extern.slf4j.Slf4j; @RequiredArgsConstructor public class ImageMessageReceiver { + public static final String IMAGE_RESPONSE_LISTENER_ID = "image-response-listener"; + private final FileStatusService fileStatusService; private final ObjectMapper objectMapper; private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService; @@ -34,7 +36,7 @@ public class ImageMessageReceiver { @SneakyThrows - @RabbitListener(queues = MessagingConfiguration.IMAGE_SERVICE_RESPONSE_QUEUE) + @RabbitListener(id = IMAGE_RESPONSE_LISTENER_ID) public void receive(Message message) { JsonNode imageResponse = objectMapper.readTree(message.getBody()); @@ -46,12 +48,11 @@ public class ImageMessageReceiver { fileStatusService.setStatusAnalyse(dossierId, fileId, false); - log.info("Received message from {} for dossierId {} and fileId {}", MessagingConfiguration.IMAGE_SERVICE_RESPONSE_QUEUE, dossierId, fileId); - + log.info("Received message from exchange {} for dossierId {} and fileId {}", MessagingConfiguration.IMAGE_RESPONSE_EXCHANGE, dossierId, fileId); } - @RabbitListener(queues = MessagingConfiguration.IMAGE_SERVICE_DLQ) + @RabbitListener(queues = MessagingConfiguration.IMAGE_DLQ) public void handleDLQMessage(Message failedMessage) throws IOException { HashMap imageResponse = objectMapper.readValue(failedMessage.getBody(), new TypeReference<>() { @@ -60,11 +61,11 @@ public class ImageMessageReceiver { String fileId = (String) imageResponse.get("fileId"); addFileIdToTrace(fileId); - log.warn("Received message from {} for dossierId {} and fileId {}", MessagingConfiguration.IMAGE_SERVICE_DLQ, dossierId, fileId); + log.warn("Received message from {} for dossierId {} and fileId {}", MessagingConfiguration.IMAGE_DLQ, dossierId, fileId); fileStatusProcessingUpdateService.analysisFailed(dossierId, fileId, new FileErrorInfo("image analysis failed", - MessagingConfiguration.IMAGE_SERVICE_DLQ, + MessagingConfiguration.IMAGE_DLQ, "image-service-v3", OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS))); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/LayoutParsingFinishedMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/LayoutParsingFinishedMessageReceiver.java index 68b54cfd0..d6bf709d7 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/LayoutParsingFinishedMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/LayoutParsingFinishedMessageReceiver.java @@ -1,6 +1,6 @@ package com.iqser.red.service.persistence.management.v1.processor.service.queue; -import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.LAYOUT_PARSING_DLQ; +import static com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames.LAYOUT_PARSING_DLQ; import java.time.OffsetDateTime; import java.time.temporal.ChronoUnit; @@ -35,6 +35,8 @@ import lombok.extern.slf4j.Slf4j; @RequiredArgsConstructor public class LayoutParsingFinishedMessageReceiver { + public static final String LAYOUT_PARSING_RESPONSE_LISTENER_ID = "layout-parsing-response-listener"; + private final FileStatusService fileStatusService; private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService; private final ObjectMapper objectMapper; @@ -45,12 +47,12 @@ public class LayoutParsingFinishedMessageReceiver { @SneakyThrows - @RabbitListener(queues = LayoutParsingQueueNames.LAYOUT_PARSING_FINISHED_EVENT_QUEUE) + @RabbitListener(id = LAYOUT_PARSING_RESPONSE_LISTENER_ID) public void receive(LayoutParsingFinishedEvent response) { var dossierId = QueueMessageIdentifierService.parseDossierId(response.identifier()); var fileId = QueueMessageIdentifierService.parseFileId(response.identifier()); - log.info("Layout parsing has finished for {}/{} in {}", dossierId, fileId, LayoutParsingQueueNames.LAYOUT_PARSING_FINISHED_EVENT_QUEUE); + log.info("Layout parsing has finished for {}/{} in {}", dossierId, fileId, LayoutParsingQueueNames.LAYOUT_PARSING_RESPONSE_EXCHANGE); if (saasMigrationStatusPersistenceService.isMigrating(QueueMessageIdentifierService.parseFileId(response.identifier()))) { saasMigrationService.handleLayoutParsingFinished(QueueMessageIdentifierService.parseDossierId(response.identifier()), QueueMessageIdentifierService.parseFileId(response.identifier())); @@ -74,6 +76,7 @@ public class LayoutParsingFinishedMessageReceiver { throw e; } + log.info("Received message {} from exchange {}", response, LayoutParsingQueueNames.LAYOUT_PARSING_RESPONSE_EXCHANGE); } @@ -91,7 +94,7 @@ public class LayoutParsingFinishedMessageReceiver { saasMigrationService.handleError(QueueMessageIdentifierService.parseDossierId(analyzeRequest.identifier()), QueueMessageIdentifierService.parseFileId(analyzeRequest.identifier()), errorCause, - LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_QUEUE); + LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_EXCHANGE); return; } diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/NerMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/NerMessageReceiver.java index 6d32f55ca..ae5a48f75 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/NerMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/NerMessageReceiver.java @@ -29,6 +29,8 @@ import lombok.extern.slf4j.Slf4j; @RequiredArgsConstructor public class NerMessageReceiver { + public static final String ENTITY_RESPONSE_LISTENER_ID = "entity-response-listener"; + private final FileStatusService fileStatusService; private final ObjectMapper objectMapper; private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService; @@ -36,7 +38,7 @@ public class NerMessageReceiver { @SneakyThrows - @RabbitListener(queues = MessagingConfiguration.NER_SERVICE_RESPONSE_QUEUE) + @RabbitListener(id = ENTITY_RESPONSE_LISTENER_ID) public void receive(Message message) { HashMap entityResponse = objectMapper.readValue(message.getBody(), new TypeReference<>() { @@ -46,7 +48,7 @@ public class NerMessageReceiver { String fileId = (String) entityResponse.get("fileId"); addFileIdToTrace(fileId); - log.info("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.NER_SERVICE_RESPONSE_QUEUE, dossierId, fileId); + log.info("Received message from exchange {} for dossierId {} and fileId {}", MessagingConfiguration.ENTITY_RESPONSE_EXCHANGE, dossierId, fileId); fileStatusService.setStatusAnalyse(dossierId, fileId, false); } @@ -64,7 +66,7 @@ public class NerMessageReceiver { } - @RabbitListener(queues = MessagingConfiguration.NER_SERVICE_DLQ) + @RabbitListener(queues = MessagingConfiguration.ENTITY_DLQ) public void handleDLQMessage(Message failedMessage) throws IOException { HashMap entityResponse = objectMapper.readValue(failedMessage.getBody(), new TypeReference<>() { @@ -73,11 +75,11 @@ public class NerMessageReceiver { String fileId = (String) entityResponse.get("fileId"); addFileIdToTrace(fileId); - log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.NER_SERVICE_DLQ, dossierId, fileId); + log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.ENTITY_DLQ, dossierId, fileId); fileStatusProcessingUpdateService.analysisFailed(dossierId, fileId, new FileErrorInfo("ner service failed", - MessagingConfiguration.NER_SERVICE_DLQ, + MessagingConfiguration.ENTITY_DLQ, "entity-recognition-service-v3", OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS))); } @@ -87,7 +89,7 @@ public class NerMessageReceiver { public void handleLLMDLQMessage(Message failedMessage) throws IOException { LlmNerMessage entityResponse = objectMapper.readValue(failedMessage.getBody(), LlmNerMessage.class); - + String dossierId = QueueMessageIdentifierService.parseDossierId(entityResponse.getIdentifier()); String fileId = QueueMessageIdentifierService.parseDossierId(entityResponse.getIdentifier()); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/OCRProcessingMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/OCRProcessingMessageReceiver.java index f1b9b659e..ab369d5fb 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/OCRProcessingMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/OCRProcessingMessageReceiver.java @@ -29,6 +29,9 @@ import lombok.extern.slf4j.Slf4j; @RequiredArgsConstructor public class OCRProcessingMessageReceiver { + public static final String OCR_RESPONSE_LISTENER_ID = "ocr-response-listener"; + public static final String OCR_STATUS_UPDATE_LISTENER_ID = "ocr-status-update-listener"; + private final ObjectMapper objectMapper; private final FileStatusService fileStatusService; private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService; @@ -36,7 +39,8 @@ public class OCRProcessingMessageReceiver { @SneakyThrows - @RabbitListener(queues = MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE) + @RabbitHandler + @RabbitListener(id = OCR_STATUS_UPDATE_LISTENER_ID) public void handleOCRStatusUpdateMessage(OCRStatusUpdateResponse response) { var fileModel = fileStatusService.getStatus(response.getFileId()); @@ -53,22 +57,23 @@ public class OCRProcessingMessageReceiver { response.getNumberOfOCRedPages()); } - log.debug("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE); + log.debug("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_EXCHANGE); } @SneakyThrows - @RabbitListener(queues = MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_DQL) + @RabbitHandler + @RabbitListener(queues = MessagingConfiguration.OCR_STATUS_UPDATE_DLQ) public void handleOCRStatusUpdateDLQMessage(Message failedMessage) { var response = objectMapper.readValue(failedMessage.getBody(), OCRStatusUpdateResponse.class); - log.debug("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_DQL); + log.debug("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_DLQ); } @RabbitHandler - @RabbitListener(queues = MessagingConfiguration.OCR_RESPONSE_QUEUE) + @RabbitListener(id = OCR_RESPONSE_LISTENER_ID) public void handleOCRResponseMessage(Message successMessage) throws IOException { DocumentRequest ocrResponseMessage = objectMapper.readValue(successMessage.getBody(), DocumentRequest.class); @@ -80,7 +85,7 @@ public class OCRProcessingMessageReceiver { @RabbitHandler @RabbitListener(queues = MessagingConfiguration.OCR_DLQ) - public void handleOCRDQLMessage(Message failedMessage) throws IOException { + public void handleOCRDLQMessage(Message failedMessage) throws IOException { DocumentRequest ocrRequestMessage = objectMapper.readValue(failedMessage.getBody(), DocumentRequest.class); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/RedactionAnalysisResponseReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/RedactionAnalysisResponseReceiver.java index 4ee9a7085..b93872fb9 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/RedactionAnalysisResponseReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/RedactionAnalysisResponseReceiver.java @@ -11,7 +11,6 @@ import org.springframework.stereotype.Service; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration; import com.iqser.red.service.persistence.management.v1.processor.entity.annotations.ManualRedactionEntryEntity; import com.iqser.red.service.persistence.management.v1.processor.entity.annotations.ManualResizeRedactionEntity; import com.iqser.red.service.persistence.management.v1.processor.entity.annotations.RectangleEntity; @@ -33,6 +32,8 @@ import lombok.extern.slf4j.Slf4j; @FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) public class RedactionAnalysisResponseReceiver { + public static final String REDACTION_RESPONSE_LISTENER_ID = "redaction-response-listener"; + ObjectMapper objectMapper; AddRedactionPersistenceService addRedactionPersistenceService; ResizeRedactionPersistenceService resizeRedactionPersistenceService; @@ -40,7 +41,7 @@ public class RedactionAnalysisResponseReceiver { @SneakyThrows @RabbitHandler - @RabbitListener(queues = MessagingConfiguration.REDACTION_ANALYSIS_RESPONSE_QUEUE) + @RabbitListener(id = REDACTION_RESPONSE_LISTENER_ID) public void receive(Message message) throws JsonProcessingException { AnalyzeResponse analyzeResponse = objectMapper.readValue(message.getBody(), new TypeReference<>() { diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/RedactionServiceSaasMigrationMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/RedactionServiceSaasMigrationMessageReceiver.java index 6bb603668..a098afcf9 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/RedactionServiceSaasMigrationMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/RedactionServiceSaasMigrationMessageReceiver.java @@ -1,7 +1,7 @@ package com.iqser.red.service.persistence.management.v1.processor.service.queue; import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.MIGRATION_DLQ; -import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.MIGRATION_QUEUE; +import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.MIGRATION_REQUEST_QUEUE; import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.MIGRATION_RESPONSE_QUEUE; import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.X_ERROR_INFO_HEADER; @@ -46,7 +46,7 @@ public class RedactionServiceSaasMigrationMessageReceiver { if (errorCause == null) { errorCause = "Error occured during entityLog migration!"; } - saasMigrationService.handleError(migrationRequest.getDossierId(), migrationRequest.getFileId(), errorCause, MIGRATION_QUEUE); + saasMigrationService.handleError(migrationRequest.getDossierId(), migrationRequest.getFileId(), errorCause, MIGRATION_REQUEST_QUEUE); } diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/TenantExchangeMessageReceiverImpl.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/TenantExchangeMessageReceiverImpl.java new file mode 100644 index 000000000..4199caeab --- /dev/null +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/TenantExchangeMessageReceiverImpl.java @@ -0,0 +1,166 @@ +package com.iqser.red.service.persistence.management.v1.processor.service.queue; + +import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.*; +import static com.iqser.red.service.persistence.management.v1.processor.dataexchange.ExportDownloadMessageReceiver.EXPORT_DOWNLOAD_LISTENER_ID; +import static com.iqser.red.service.persistence.management.v1.processor.service.download.DownloadCompressionMessageReceiver.DOWNLOAD_COMPRESSION_LISTENER_ID; +import static com.iqser.red.service.persistence.management.v1.processor.service.download.DownloadMessageReceiver.DOWNLOAD_LISTENER_ID; +import static com.iqser.red.service.persistence.management.v1.processor.service.download.DownloadReportMessageReceiver.REPORT_RESPONSE_LISTENER_ID; +import static com.iqser.red.service.persistence.management.v1.processor.service.download.RedactionResultMessageReceiver.PDFTRON_RESPONSE_LISTENER_ID; +import static com.iqser.red.service.persistence.management.v1.processor.service.queue.AnalysisFlagsCalculationMessageReceiver.ANALYSIS_FLAG_CALCULATION_LISTENER_ID; +import static com.iqser.red.service.persistence.management.v1.processor.service.queue.AzureNerMessageReceiver.AZURE_ENTITY_RESPONSE_LISTENER_ID; +import static com.iqser.red.service.persistence.management.v1.processor.service.queue.CvAnalysisMessageReceiver.CV_ANALYSIS_RESPONSE_LISTENER_ID; +import static com.iqser.red.service.persistence.management.v1.processor.service.queue.ImageMessageReceiver.IMAGE_RESPONSE_LISTENER_ID; +import static com.iqser.red.service.persistence.management.v1.processor.service.queue.LayoutParsingFinishedMessageReceiver.LAYOUT_PARSING_RESPONSE_LISTENER_ID; +import static com.iqser.red.service.persistence.management.v1.processor.service.queue.NerMessageReceiver.ENTITY_RESPONSE_LISTENER_ID; +import static com.iqser.red.service.persistence.management.v1.processor.service.queue.OCRProcessingMessageReceiver.OCR_RESPONSE_LISTENER_ID; +import static com.iqser.red.service.persistence.management.v1.processor.service.queue.OCRProcessingMessageReceiver.OCR_STATUS_UPDATE_LISTENER_ID; +import static com.iqser.red.service.persistence.management.v1.processor.service.queue.RedactionAnalysisResponseReceiver.REDACTION_RESPONSE_LISTENER_ID; +import static com.iqser.red.service.persistence.management.v1.processor.service.queue.VisualLayoutParsingMessageReceiver.VISUAL_LAYOUT_PARSING_RESPONSE_LISTENER_ID; + +import java.util.Map; +import java.util.Set; + +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Service; + +import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames; +import com.knecon.fforesight.tenantcommons.TenantProvider; +import com.knecon.fforesight.tenantcommons.model.TenantCreatedEvent; +import com.knecon.fforesight.tenantcommons.model.TenantQueueConfiguration; +import com.knecon.fforesight.tenantcommons.model.TenantResponse; +import com.knecon.fforesight.tenantcommons.queue.RabbitQueueFromExchangeService; +import com.knecon.fforesight.tenantcommons.queue.TenantExchangeMessageReceiver; + +@Service +public class TenantExchangeMessageReceiverImpl extends TenantExchangeMessageReceiver { + + public TenantExchangeMessageReceiverImpl(RabbitQueueFromExchangeService rabbitQueueService, TenantProvider tenantProvider) { + + super(rabbitQueueService, tenantProvider); + } + + + @Override + protected Set getTenantQueueConfigs() { + + return Set.of(TenantQueueConfiguration.builder() + .listenerId(DOWNLOAD_LISTENER_ID) + .exchangeName(DOWNLOAD_EXCHANGE) + .queuePrefix(DOWNLOAD_QUEUE_PREFIX) + .dlqName(DOWNLOAD_DLQ) + .build(), + TenantQueueConfiguration.builder() + .listenerId(DOWNLOAD_COMPRESSION_LISTENER_ID) + .exchangeName(DOWNLOAD_COMPRESSION_EXCHANGE) + .queuePrefix(DOWNLOAD_COMPRESSION_QUEUE_PREFIX) + .dlqName(DOWNLOAD_COMPRESSION_DLQ) + .build(), + TenantQueueConfiguration.builder() + .listenerId(EXPORT_DOWNLOAD_LISTENER_ID) + .exchangeName(EXPORT_DOWNLOAD_EXCHANGE) + .queuePrefix(EXPORT_DOWNLOAD_QUEUE_PREFIX) + .dlqName(EXPORT_DOWNLOAD_DLQ) + .build(), + TenantQueueConfiguration.builder() + .listenerId(ANALYSIS_FLAG_CALCULATION_LISTENER_ID) + .exchangeName(ANALYSIS_FLAG_CALCULATION_EXCHANGE) + .queuePrefix(ANALYSIS_FLAG_CALCULATION_QUEUE_PREFIX) + .build(), + TenantQueueConfiguration.builder() + .listenerId(REDACTION_RESPONSE_LISTENER_ID) + .exchangeName(REDACTION_RESPONSE_EXCHANGE) + .queuePrefix(REDACTION_RESPONSE_QUEUE_PREFIX) + .dlqName(REDACTION_DLQ) + .arguments(Map.of("x-max-priority", 2)) + .build(), + TenantQueueConfiguration.builder() + .listenerId(PDFTRON_RESPONSE_LISTENER_ID) + .exchangeName(PDFTRON_RESPONSE_EXCHANGE) + .queuePrefix(PDFTRON_RESPONSE_QUEUE_PREFIX) + .dlqName(PDFTRON_DLQ) + .arguments(Map.of("x-max-priority", 2)) + .build(), + TenantQueueConfiguration.builder() + .listenerId(REPORT_RESPONSE_LISTENER_ID) + .exchangeName(REPORT_RESPONSE_EXCHANGE) + .queuePrefix(REPORT_RESPONSE_QUEUE_PREFIX) + .dlqName(REPORT_RESPONSE_DLQ) + .build(), + TenantQueueConfiguration.builder() + .listenerId(OCR_RESPONSE_LISTENER_ID) + .exchangeName(OCR_RESPONSE_EXCHANGE) + .queuePrefix(OCR_RESPONSE_QUEUE_PREFIX) + .build(), + TenantQueueConfiguration.builder() + .listenerId(OCR_STATUS_UPDATE_LISTENER_ID) + .exchangeName(OCR_STATUS_UPDATE_EXCHANGE) + .queuePrefix(OCR_STATUS_UPDATE_QUEUE_PREFIX) + .dlqName(OCR_STATUS_UPDATE_DLQ) + .build(), + TenantQueueConfiguration.builder() + .listenerId(IMAGE_RESPONSE_LISTENER_ID) + .exchangeName(IMAGE_RESPONSE_EXCHANGE) + .queuePrefix(IMAGE_RESPONSE_QUEUE_PREFIX) + .dlqName(IMAGE_DLQ) + .build(), + TenantQueueConfiguration.builder() + .listenerId(ENTITY_RESPONSE_LISTENER_ID) + .exchangeName(ENTITY_RESPONSE_EXCHANGE) + .queuePrefix(ENTITY_RESPONSE_QUEUE_PREFIX) + .dlqName(ENTITY_DLQ) + .build(), + TenantQueueConfiguration.builder() + .listenerId(AZURE_ENTITY_RESPONSE_LISTENER_ID) + .exchangeName(AZURE_ENTITY_RESPONSE_EXCHANGE) + .queuePrefix(AZURE_ENTITY_RESPONSE_QUEUE_PREFIX) + .dlqName(AZURE_ENTITY_DLQ) + .build(), + TenantQueueConfiguration.builder() + .listenerId(CV_ANALYSIS_RESPONSE_LISTENER_ID) + .exchangeName(CV_ANALYSIS_RESPONSE_EXCHANGE) + .queuePrefix(CV_ANALYSIS_RESPONSE_QUEUE_PREFIX) + .dlqName(CV_ANALYSIS_DLQ) + .build(), + TenantQueueConfiguration.builder() + .listenerId(VISUAL_LAYOUT_PARSING_RESPONSE_LISTENER_ID) + .exchangeName(VISUAL_LAYOUT_PARSING_RESPONSE_EXCHANGE) + .queuePrefix(VISUAL_LAYOUT_PARSING_RESPONSE_QUEUE_PREFIX) + .dlqName(VISUAL_LAYOUT_PARSING_DLQ) + .build(), + TenantQueueConfiguration.builder() + .listenerId(LAYOUT_PARSING_RESPONSE_LISTENER_ID) + .exchangeName(LayoutParsingQueueNames.LAYOUT_PARSING_RESPONSE_EXCHANGE) + .queuePrefix(LayoutParsingQueueNames.LAYOUT_PARSING_RESPONSE_QUEUE_PREFIX) + .build()); + + } + + + @EventListener(ApplicationReadyEvent.class) + public void onApplicationReady() { + + System.out.println("application ready invoked"); + super.initializeQueues(); + } + + + @RabbitHandler + @RabbitListener(queues = "#{tenantMessagingConfigurationImpl.getTenantCreatedQueueName()}") + public void reactToTenantCreation(TenantCreatedEvent tenantCreatedEvent) { + + super.reactToTenantCreation(tenantCreatedEvent); + } + + + @RabbitHandler + @RabbitListener(queues = "#{tenantMessagingConfigurationImpl.getTenantDeletedQueueName()}") + public void reactToTenantDeletion(TenantResponse tenantResponse) { + + super.reactToTenantDeletion(tenantResponse); + + } + +} diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/VisualLayoutParsingMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/VisualLayoutParsingMessageReceiver.java index bc70bf145..e9a0636c2 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/VisualLayoutParsingMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/VisualLayoutParsingMessageReceiver.java @@ -23,32 +23,37 @@ import lombok.extern.slf4j.Slf4j; @RequiredArgsConstructor public class VisualLayoutParsingMessageReceiver { + public static final String VISUAL_LAYOUT_PARSING_RESPONSE_LISTENER_ID = "visual-layout-parsing-response-listener"; + private final ObjectMapper objectMapper; private final FileStatusService fileStatusService; private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService; @SneakyThrows - @RabbitListener(queues = MessagingConfiguration.VISUAL_LAYOUT_RESPONSE_QUEUE) + @RabbitListener(id = VISUAL_LAYOUT_PARSING_RESPONSE_LISTENER_ID) public void receive(VisualLayoutParsingServiceResponse response) { fileStatusService.setStatusAnalyse(response.getDossierId(), response.getFileId(), false); - log.info("Received message in {} for dossierId {} and fileId {}", MessagingConfiguration.VISUAL_LAYOUT_RESPONSE_QUEUE, response.getDossierId(), response.getFileId()); + log.info("Received message from exchange {} for dossierId {} and fileId {}", + MessagingConfiguration.VISUAL_LAYOUT_PARSING_RESPONSE_EXCHANGE, + response.getDossierId(), + response.getFileId()); } @SneakyThrows - @RabbitListener(queues = MessagingConfiguration.VISUAL_LAYOUT_DLQ) + @RabbitListener(queues = MessagingConfiguration.VISUAL_LAYOUT_PARSING_DLQ) public void handleDLQMessage(Message failedMessage) { var response = objectMapper.readValue(failedMessage.getBody(), VisualLayoutParsingServiceResponse.class); - log.warn("Received message from {} for dossierId {} and fileId {}", MessagingConfiguration.VISUAL_LAYOUT_DLQ, response.getDossierId(), response.getFileId()); + log.warn("Received message from {} for dossierId {} and fileId {}", MessagingConfiguration.VISUAL_LAYOUT_PARSING_DLQ, response.getDossierId(), response.getFileId()); fileStatusProcessingUpdateService.analysisFailed(response.getDossierId(), response.getFileId(), new FileErrorInfo("table extractor failed", - MessagingConfiguration.VISUAL_LAYOUT_DLQ, + MessagingConfiguration.VISUAL_LAYOUT_PARSING_DLQ, "table-extractor", OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS))); diff --git a/persistence-service-v1/persistence-service-server-v1/src/test/java/com/iqser/red/service/peristence/v1/server/integration/utils/AbstractPersistenceServerServiceTest.java b/persistence-service-v1/persistence-service-server-v1/src/test/java/com/iqser/red/service/peristence/v1/server/integration/utils/AbstractPersistenceServerServiceTest.java index d14bf1f65..c256fbf99 100644 --- a/persistence-service-v1/persistence-service-server-v1/src/test/java/com/iqser/red/service/peristence/v1/server/integration/utils/AbstractPersistenceServerServiceTest.java +++ b/persistence-service-v1/persistence-service-server-v1/src/test/java/com/iqser/red/service/peristence/v1/server/integration/utils/AbstractPersistenceServerServiceTest.java @@ -23,7 +23,9 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mockito; import org.quartz.Scheduler; import org.springframework.amqp.core.AmqpAdmin; +import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration; @@ -245,7 +247,9 @@ public abstract class AbstractPersistenceServerServiceTest { @Autowired protected PrometheusMeterRegistry prometheusMeterRegistry; @MockBean - private AmqpAdmin amqpAdmin; + private RabbitAdmin rabbitAdmin; + @MockBean + private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry; @Autowired private DossierStatusRepository dossierStatusRepository; @Autowired @@ -351,7 +355,7 @@ public abstract class AbstractPersistenceServerServiceTest { when(usersClient.getAllUsers(true)).thenReturn(allUsers); // doNothing().when(pdfTronRedactionClient).testDigitalCurrentSignature(Mockito.any()); - when(amqpAdmin.getQueueInfo(Mockito.any())).thenReturn(null); + when(rabbitAdmin.getQueueInfo(Mockito.any())).thenReturn(null); when(entityLogService.getEntityLog(Mockito.any(), Mockito.any())).thenReturn(new EntityLog(1, 1, Lists.newArrayList(), null, 0, 0, 0, 0)); when(entityLogService.getEntityLog(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyBoolean())).thenReturn(new EntityLog(1, 1, @@ -590,7 +594,8 @@ public abstract class AbstractPersistenceServerServiceTest { "MONGODB_USER=" + MONGO_USERNAME, "MONGODB_PASSWORD=" + MONGO_PASSWORD, "fforesight.jobs.enabled=false", - "fforesight.keycloak.enabled=false").applyTo(configurableApplicationContext.getEnvironment()); + "fforesight.keycloak.enabled=false", + "POD_NAME=persistence-service").applyTo(configurableApplicationContext.getEnvironment()); } diff --git a/persistence-service-v1/persistence-service-shared-mongo-v1/build.gradle.kts b/persistence-service-v1/persistence-service-shared-mongo-v1/build.gradle.kts index 20b2a0c95..747c42390 100644 --- a/persistence-service-v1/persistence-service-shared-mongo-v1/build.gradle.kts +++ b/persistence-service-v1/persistence-service-shared-mongo-v1/build.gradle.kts @@ -9,7 +9,7 @@ dependencies { api(project(":persistence-service-shared-api-v1")) api("com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.17.2") api("com.google.guava:guava:31.1-jre") - api("com.knecon.fforesight:mongo-database-commons:0.10.0") + api("com.knecon.fforesight:mongo-database-commons:0.11.0") api("org.springframework.boot:spring-boot-starter-data-mongodb:${springBootStarterVersion}") api("org.springframework.boot:spring-boot-starter-validation:3.1.3") testImplementation("com.iqser.red.commons:test-commons:2.1.0")