RED-9331: Explore possibilities for fair upload / analysis processing per tenant

This commit is contained in:
Maverick Studer 2024-08-27 11:29:14 +02:00
parent bb4af4c1ce
commit 22bf27dd73
38 changed files with 733 additions and 400 deletions

View File

@ -5,7 +5,6 @@ import java.time.temporal.ChronoUnit;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
@ -39,7 +38,7 @@ public class FileStatusProcessingUpdateInternalController implements FileStatusP
fileStatusProcessingUpdateService.preprocessingFailed(dossierId,
fileId,
new FileErrorInfo("preprocessing failed",
MessagingConfiguration.PRE_PROCESSING_DLQ,
MessagingConfiguration.PREPROCESSING_DLQ,
"pdftron-service",
OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS)));
}
@ -125,7 +124,7 @@ public class FileStatusProcessingUpdateInternalController implements FileStatusP
fileStatusProcessingUpdateService.indexingFailed(dossierId,
fileId,
new FileErrorInfo("indexing failed",
MessagingConfiguration.INDEXING_DQL,
MessagingConfiguration.INDEXING_DLQ,
"search-service",
OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS)));
}

View File

@ -30,7 +30,7 @@ dependencies {
exclude(group = "com.iqser.red.service", module = "persistence-service-internal-api-v1")
exclude(group = "com.iqser.red.service", module = "persistence-service-shared-api-v1")
}
api("com.knecon.fforesight:layoutparser-service-internal-api:0.149.0") {
api("com.knecon.fforesight:layoutparser-service-internal-api:0.161.0") {
exclude(group = "com.iqser.red.service", module = "persistence-service-internal-api-v1")
exclude(group = "com.iqser.red.service", module = "persistence-service-shared-api-v1")
}
@ -45,7 +45,7 @@ dependencies {
implementation("com.knecon.fforesight:llm-service-api:1.10.0")
api("com.knecon.fforesight:jobs-commons:0.10.0")
api("com.knecon.fforesight:database-tenant-commons:0.24.0")
api("com.knecon.fforesight:keycloak-commons:0.29.0")
api("com.knecon.fforesight:keycloak-commons:0.30.0")
api("com.knecon.fforesight:tracing-commons:0.5.0")
api("com.knecon.fforesight:swagger-commons:0.7.0")
api("com.giffing.bucket4j.spring.boot.starter:bucket4j-spring-boot-starter:0.4.0")

View File

@ -1,13 +1,14 @@
package com.iqser.red.service.persistence.management.v1.processor.configuration;
import static com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames.LAYOUT_PARSING_FINISHED_EVENT_QUEUE;
import static com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_QUEUE;
import static com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames.LAYOUT_PARSING_DLQ;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames;
import com.knecon.fforesight.llm.service.QueueNames;
import lombok.RequiredArgsConstructor;
@ -16,45 +17,46 @@ import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class MessagingConfiguration {
public static final String REDACTION_QUEUE = "redactionQueue";
public static final String REDACTION_PRIORITY_QUEUE = "redactionPriorityQueue";
public static final String REDACTION_ANALYSIS_RESPONSE_QUEUE = "redactionAnalysisResponseQueue";
public static final String REDACTION_DQL = "redactionDQL";
// persistence-service
public static final String DOWNLOAD_QUEUE_PREFIX = "download_queue";
public static final String DOWNLOAD_EXCHANGE = "download_exchange";
public static final String DOWNLOAD_DLQ = "download_dlq";
public static final String DOWNLOAD_QUEUE = "downloadQueue";
public static final String DOWNLOAD_DLQ = "downloadDLQ";
public static final String DOWNLOAD_COMPRESSION_QUEUE = "download_compression_queue";
public static final String DOWNLOAD_COMPRESSION_QUEUE_PREFIX = "download_compression_queue";
public static final String DOWNLOAD_COMPRESSION_EXCHANGE = "download_compression_exchange";
public static final String DOWNLOAD_COMPRESSION_DLQ = "download_compression_dlq";
public static final String EXPORT_DOWNLOAD_QUEUE = "exportDownloadQueue";
public static final String EXPORT_DOWNLOAD_DLQ = "exportDownloadDLQ";
public static final String EXPORT_DOWNLOAD_QUEUE_PREFIX = "export_download_queue";
public static final String EXPORT_DOWNLOAD_EXCHANGE = "export_download_exchange";
public static final String EXPORT_DOWNLOAD_DLQ = "export_download_dlq";
public static final String REPORT_QUEUE = "reportQueue";
public static final String REPORT_DLQ = "reportDLQ";
public static final String ANALYSIS_FLAG_CALCULATION_QUEUE_PREFIX = "analysis_flag_calculation_queue";
public static final String ANALYSIS_FLAG_CALCULATION_EXCHANGE = "analysis_flag_calculation_exchange";
public static final String REPORT_RESULT_QUEUE = "reportResultQueue";
public static final String REPORT_RESULT_DLQ = "reportResultDLQ";
public static final String REDACTION_RESPONSE_QUEUE_PREFIX = "redaction_response_queue";
public static final String REDACTION_RESPONSE_EXCHANGE = "redaction_response_exchange";
public static final String OCR_REQUEST_QUEUE = "ocr_request_queue";
public static final String OCR_RESPONSE_QUEUE = "ocr_response_queue";
public static final String OCR_DLQ = "ocr_dead_letter_queue";
public static final String OCR_STATUS_UPDATE_RESPONSE_QUEUE = "ocr_status_update_response_queue";
public static final String OCR_STATUS_UPDATE_RESPONSE_DQL = "ocr_status_update_dead_letter_queue";
public static final String PDFTRON_RESPONSE_QUEUE_PREFIX = "pdftron_response_queue";
public static final String PDFTRON_RESPONSE_EXCHANGE = "pdftron_response_exchange";
public static final String INDEXING_QUEUE = "indexingQueue";
public static final String INDEXING_DQL = "indexingDQL";
public static final String REPORT_RESPONSE_QUEUE_PREFIX = "report_response_queue";
public static final String REPORT_RESPONSE_EXCHANGE = "report_response_exchange";
public static final String REPORT_RESPONSE_DLQ = "report_response_dlq";
public static final String DELETE_FROM_INDEX_QUEUE = "deleteFromIndexQueue";
public static final String DELETE_FROM_INDEX_DLQ = "deleteFromIndexDLQ";
public static final String OCR_RESPONSE_QUEUE_PREFIX = "ocr_response_queue";
public static final String OCR_RESPONSE_EXCHANGE = "ocr_response_exchange";
public static final String OCR_STATUS_UPDATE_QUEUE_PREFIX = "ocr_status_update_queue";
public static final String OCR_STATUS_UPDATE_EXCHANGE = "ocr_status_update_exchange";
public static final String OCR_STATUS_UPDATE_DLQ = "ocr_status_update_dlq";
public static final String IMAGE_SERVICE_QUEUE = "image_request_queue";
public static final String IMAGE_SERVICE_RESPONSE_QUEUE = "image_response_queue";
public static final String IMAGE_SERVICE_DLQ = "image_dead_letter_queue";
public static final String IMAGE_RESPONSE_QUEUE_PREFIX = "image_response_queue";
public static final String IMAGE_RESPONSE_EXCHANGE = "image_response_exchange";
public static final String NER_SERVICE_QUEUE = "entity_request_queue";
public static final String NER_SERVICE_RESPONSE_QUEUE = "entity_response_queue";
public static final String NER_SERVICE_DLQ = "entity_dead_letter_queue";
public static final String ENTITY_RESPONSE_QUEUE_PREFIX = "entity_response_queue";
public static final String ENTITY_RESPONSE_EXCHANGE = "entity_response_exchange";
public static final String AZURE_ENTITY_RESPONSE_QUEUE_PREFIX = "azure_entity_response_queue";
public static final String AZURE_ENTITY_RESPONSE_EXCHANGE = "azure_entity_response_exchange";
public static final String CHUNKING_SERVICE_QUEUE = "chunking_service_request_queue";
public static final String CHUNKING_SERVICE_RESPONSE_QUEUE = "chunking_service_response_queue";
@ -64,104 +66,130 @@ public class MessagingConfiguration {
public static final String LLM_NER_SERVICE_RESPONSE_QUEUE = QueueNames.LLM_NER_SERVICE_RESPONSE_QUEUE;
public static final String LLM_NER_SERVICE_DLQ = QueueNames.LLM_NER_SERVICE_DLQ;
public static final String AZURE_NER_SERVICE_QUEUE = "azure_entity_request_queue";
public static final String AZURE_NER_SERVICE_RESPONSE_QUEUE = "azure_entity_response_queue";
public static final String AZURE_NER_SERVICE_DLQ = "azure_entity_dead_letter_queue";
public static final String CV_ANALYSIS_RESPONSE_QUEUE_PREFIX = "cv_analysis_response_queue";
public static final String CV_ANALYSIS_RESPONSE_EXCHANGE = "cv_analysis_response_exchange";
public static final String PRE_PROCESSING_QUEUE = "preprocessingQueue";
public static final String PRE_PROCESSING_DLQ = "preprocessingDLQ";
public static final String VISUAL_LAYOUT_PARSING_RESPONSE_QUEUE_PREFIX = "visual_layout_parsing_response_queue";
public static final String VISUAL_LAYOUT_PARSING_RESPONSE_EXCHANGE = "visual_layout_parsing_response_exchange";
public static final String PDFTRON_QUEUE = "pdftron_queue";
// pdftron-redaction-service
public static final String PREPROCESSING_EXCHANGE = "preprocessing_exchange";
public static final String PREPROCESSING_DLQ = "preprocessing_dlq";
public static final String PDFTRON_REQUEST_EXCHANGE = "pdftron_request_exchange";
public static final String PDFTRON_DLQ = "pdftron_dlq";
public static final String PDFTRON_RESULT_QUEUE = "pdftron_result_queue";
public static final String CV_ANALYSIS_QUEUE = "cv_analysis_request_queue";
public static final String CV_ANALYSIS_RESPONSE_QUEUE = "cv_analysis_response_queue";
public static final String CV_ANALYSIS_DLQ = "cv_analysis_dead_letter_queue";
// redaction-service
public static final String REDACTION_REQUEST_QUEUE_PREFIX = "redaction_request_queue";
public static final String REDACTION_REQUEST_EXCHANGE = "redaction_request_exchange";
public static final String REDACTION_PRIORITY_REQUEST_EXCHANGE = "redaction_priority_request_exchange";
public static final String REDACTION_DLQ = "redaction_dlq";
public static final String VISUAL_LAYOUT_PARSING_QUEUE = "visual_layout_parsing_service_queue";
public static final String VISUAL_LAYOUT_RESPONSE_QUEUE = "visual_layout_parsing_service_response_queue";
public static final String VISUAL_LAYOUT_DLQ = "visual_layout_parsing_service_dead_letter_queue";
public static final String ANALYSIS_FLAG_CALCULATION_QUEUE = "analysis_flag_calculation_queue";
// redaction-report-service
public static final String REPORT_REQUEST_EXCHANGE = "report_request_exchange";
public static final String REPORT_REQUEST_DLQ = "report_request_dlq";
// search-service
public static final String INDEXING_EXCHANGE = "indexing_exchange";
public static final String INDEXING_DLQ = "indexing_dlq";
public static final String DELETE_FROM_INDEX_EXCHANGE = "delete_from_index_exchange";
public static final String DELETE_FROM_INDEX_DLQ = "delete_from_index_dlq";
// ocr-services
public static final String OCR_REQUEST_EXCHANGE = "ocr_request_exchange";
public static final String OCR_DLQ = "ocr_dlq";
// image-service
public static final String IMAGE_REQUEST_EXCHANGE = "image_request_exchange";
public static final String IMAGE_DLQ = "image_dlq";
// entity-recognition-service
public static final String ENTITY_REQUEST_EXCHANGE = "entity_request_exchange";
public static final String ENTITY_DLQ = "entity_dlq";
// azure-ner-service
public static final String AZURE_ENTITY_REQUEST_EXCHANGE = "azure_entity_request_exchange";
public static final String AZURE_ENTITY_DLQ = "azure_entity_dlq";
// cv-analysis-service
public static final String CV_ANALYSIS_REQUEST_EXCHANGE = "cv_analysis_request_exchange";
public static final String CV_ANALYSIS_DLQ = "cv_analysis_dlq";
// visual-layout-parsing-service
public static final String VISUAL_LAYOUT_PARSING_REQUEST_EXCHANGE = "visual_layout_parsing_request_exchange";
public static final String VISUAL_LAYOUT_PARSING_DLQ = "visual_layout_parsing_dlq";
public static final String X_ERROR_INFO_HEADER = "x-error-message";
public static final String X_ERROR_INFO_TIMESTAMP_HEADER = "x-error-message-timestamp";
public static final String LAYOUT_PARSING_DLQ = "layout_parsing_dead_letter_queue";
// --- Saas Migration, can be removed later ----
public static final String MIGRATION_QUEUE = "migrationQueue";
public static final String MIGRATION_DLQ = "migrationDLQ";
// ---- Saas Migration, can be removed later ----
public static final String MIGRATION_REQUEST_QUEUE = "migrationQueue";
public static final String MIGRATION_RESPONSE_QUEUE = "migrationResponseQueue";
public static final String MIGRATION_DLQ = "migrationDLQ";
// ---- persistence-service ----
@Bean
public Queue migrationQueue() {
public DirectExchange downloadExchange() {
return QueueBuilder.durable(MIGRATION_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", MIGRATION_DLQ).maxPriority(2).build();
return new DirectExchange(DOWNLOAD_EXCHANGE);
}
@Bean
public Queue migrationDLQ() {
public Queue downloadDeadLetterQueue() {
return QueueBuilder.durable(MIGRATION_DLQ).build();
return QueueBuilder.durable(DOWNLOAD_DLQ).build();
}
@Bean
public Queue migrationResponseQueue() {
public DirectExchange downloadCompressionExchange() {
return QueueBuilder.durable(MIGRATION_RESPONSE_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", MIGRATION_DLQ)
.maxPriority(2)
.build();
}
// --- End Saas Migration
@Bean
public Queue nerRequestQueue() {
return QueueBuilder.durable(NER_SERVICE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", NER_SERVICE_DLQ).build();
return new DirectExchange(DOWNLOAD_COMPRESSION_EXCHANGE);
}
@Bean
public Queue nerResponseQueue() {
public Queue downloadCompressionQueueDLQ() {
return QueueBuilder.durable(NER_SERVICE_RESPONSE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", NER_SERVICE_DLQ).build();
return QueueBuilder.durable(DOWNLOAD_COMPRESSION_DLQ).build();
}
@Bean
public Queue nerResponseDLQ() {
public DirectExchange exportDownloadExchange() {
return QueueBuilder.durable(NER_SERVICE_DLQ).build();
return new DirectExchange(EXPORT_DOWNLOAD_EXCHANGE);
}
@Bean
public Queue azureNerRequestQueue() {
public Queue exportDownloadDeadLetterQueue() {
return QueueBuilder.durable(AZURE_NER_SERVICE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", AZURE_NER_SERVICE_DLQ).build();
return QueueBuilder.durable(EXPORT_DOWNLOAD_DLQ).build();
}
@Bean
public Queue azureNerResponseQueue() {
public DirectExchange analysisFlagCalculationExchange() {
return QueueBuilder.durable(AZURE_NER_SERVICE_RESPONSE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", AZURE_NER_SERVICE_DLQ).build();
return new DirectExchange(ANALYSIS_FLAG_CALCULATION_EXCHANGE);
}
@Bean
public Queue azureNerResponseDLQ() {
public DirectExchange redactionResponseExchange() {
return QueueBuilder.durable(AZURE_NER_SERVICE_DLQ).build();
return new DirectExchange(REDACTION_RESPONSE_EXCHANGE);
}
@Bean
public DirectExchange reportResponseExchange() {
return new DirectExchange(REPORT_RESPONSE_EXCHANGE);
}
@ -214,130 +242,160 @@ public class MessagingConfiguration {
@Bean
public Queue imageRequestQueue() {
public Queue reportResponseDLQ() {
return QueueBuilder.durable(IMAGE_SERVICE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", IMAGE_SERVICE_DLQ).build();
return QueueBuilder.durable(REPORT_RESPONSE_DLQ).build();
}
@Bean
public Queue imageResponseQueue() {
public DirectExchange ocrResponseExchange() {
return QueueBuilder.durable(IMAGE_SERVICE_RESPONSE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", IMAGE_SERVICE_DLQ).build();
return new DirectExchange(OCR_RESPONSE_EXCHANGE);
}
@Bean
public Queue imageResponseDLQ() {
public DirectExchange imageResponseExchange() {
return QueueBuilder.durable(IMAGE_SERVICE_DLQ).build();
return new DirectExchange(IMAGE_RESPONSE_EXCHANGE);
}
@Bean
public Queue cvAnalysisRequestQueue() {
public DirectExchange entityResponseExchange() {
return QueueBuilder.durable(CV_ANALYSIS_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", CV_ANALYSIS_DLQ).build();
return new DirectExchange(ENTITY_RESPONSE_EXCHANGE);
}
@Bean
public DirectExchange azureEntityResponseExchange() {
return new DirectExchange(AZURE_ENTITY_RESPONSE_EXCHANGE);
}
@Bean
public Queue cvAnalysisResponseQueue() {
public DirectExchange cvAnalysisResponseExchange() {
return QueueBuilder.durable(CV_ANALYSIS_RESPONSE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", CV_ANALYSIS_DLQ).build();
return new DirectExchange(CV_ANALYSIS_RESPONSE_EXCHANGE);
}
@Bean
public Queue cvAnalysisResponseDLQ() {
public DirectExchange visualLayoutParsingResponseExchange() {
return QueueBuilder.durable(CV_ANALYSIS_DLQ).build();
return new DirectExchange(VISUAL_LAYOUT_PARSING_RESPONSE_EXCHANGE);
}
@Bean
public Queue visualLayoutParsingRequestQueue() {
public DirectExchange layoutParsingResponseExchange() {
return QueueBuilder.durable(VISUAL_LAYOUT_PARSING_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", VISUAL_LAYOUT_DLQ).build();
return new DirectExchange(LayoutParsingQueueNames.LAYOUT_PARSING_RESPONSE_EXCHANGE);
}
// ---- pdftron-redaction-service ----
@Bean
public DirectExchange preprocessingExchange() {
return new DirectExchange(PREPROCESSING_EXCHANGE);
}
@Bean
public Queue visualLayoutParsingResponseQueue() {
public Queue preprocessingDeadLetterQueue() {
return QueueBuilder.durable(VISUAL_LAYOUT_RESPONSE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", VISUAL_LAYOUT_DLQ).build();
return QueueBuilder.durable(PREPROCESSING_DLQ).build();
}
@Bean
public Queue visualLayoutParsingDLQ() {
public DirectExchange pdftronResponseExchange() {
return QueueBuilder.durable(VISUAL_LAYOUT_DLQ).build();
return new DirectExchange(PDFTRON_RESPONSE_EXCHANGE);
}
@Bean
public Queue ocrStatusUpdateResponseQueue() {
public Queue pdfTronDLQ() {
return QueueBuilder.durable(OCR_STATUS_UPDATE_RESPONSE_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", OCR_STATUS_UPDATE_RESPONSE_DQL)
.build();
return QueueBuilder.durable(PDFTRON_DLQ).build();
}
// ---- redaction-service ----
@Bean
public DirectExchange redactionRequestExchange() {
return new DirectExchange(REDACTION_REQUEST_EXCHANGE);
}
@Bean
public Queue ocrStatusUpdateResponseDQL() {
public DirectExchange redactionPriorityRequestExchange() {
return QueueBuilder.durable(OCR_STATUS_UPDATE_RESPONSE_DQL).build();
return new DirectExchange(REDACTION_PRIORITY_REQUEST_EXCHANGE);
}
@Bean
public Queue redactionQueue() {
public Queue redactionDLQ() {
return QueueBuilder.durable(REDACTION_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", REDACTION_DQL).maxPriority(2).build();
return QueueBuilder.durable(REDACTION_DLQ).build();
}
// ---- redaction-report-service ----
@Bean
public DirectExchange reportRequestExchange() {
return new DirectExchange(REPORT_REQUEST_EXCHANGE);
}
@Bean
public Queue redactionPriorityQueue() {
public Queue reportRequestDLQ() {
return QueueBuilder.durable(REDACTION_PRIORITY_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", REDACTION_DQL)
.maxPriority(2)
.build();
return QueueBuilder.durable(REPORT_REQUEST_DLQ).build();
}
// ---- search-service ----
@Bean
public DirectExchange indexingExchange() {
return new DirectExchange(INDEXING_EXCHANGE);
}
@Bean
public Queue redactionAnalysisResponseQueue() {
public Queue indexingDLQ() {
return QueueBuilder.durable(REDACTION_ANALYSIS_RESPONSE_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", REDACTION_DQL)
.maxPriority(2)
.build();
return QueueBuilder.durable(INDEXING_DLQ).build();
}
@Bean
public Queue ocrRequestQueue() {
public DirectExchange deleteFromIndexExchange() {
return QueueBuilder.durable(OCR_REQUEST_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", OCR_DLQ)
.withArgument("x-max-priority", 2) // Higher value is higher priority.
.maxPriority(2)
.build();
return new DirectExchange(DELETE_FROM_INDEX_EXCHANGE);
}
@Bean
public Queue ocrResponseQueue() {
public Queue deleteFromIndexDLQ() {
return QueueBuilder.durable(OCR_RESPONSE_QUEUE).build();
return QueueBuilder.durable(DELETE_FROM_INDEX_DLQ).build();
}
// ---- ocr-services ----
@Bean
public DirectExchange ocrRequestExchange() {
return new DirectExchange(OCR_REQUEST_EXCHANGE);
}
@ -349,191 +407,134 @@ public class MessagingConfiguration {
@Bean
public Queue downloadQueue() {
public DirectExchange ocrStatusUpdateExchange() {
return QueueBuilder.durable(DOWNLOAD_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", DOWNLOAD_DLQ).build();
return new DirectExchange(OCR_STATUS_UPDATE_EXCHANGE);
}
@Bean
public Queue downloadDeadLetterQueue() {
public Queue ocrStatusUpdateDLQ() {
return QueueBuilder.durable(DOWNLOAD_DLQ).build();
return QueueBuilder.durable(OCR_STATUS_UPDATE_DLQ).build();
}
// ---- image-service ----
@Bean
public DirectExchange imageRequestExchange() {
return new DirectExchange(IMAGE_REQUEST_EXCHANGE);
}
@Bean
public Queue downloadCompressionQueue() {
public Queue imageDLQ() {
return QueueBuilder.durable(DOWNLOAD_COMPRESSION_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", DOWNLOAD_COMPRESSION_DLQ)
.build();
return QueueBuilder.durable(IMAGE_DLQ).build();
}
// ---- entity-recognition-service ----
@Bean
public DirectExchange nerRequestExchange() {
return new DirectExchange(ENTITY_REQUEST_EXCHANGE);
}
@Bean
public Queue downloadCompressionQueueDLQ() {
public Queue nerDLQ() {
return QueueBuilder.durable(DOWNLOAD_COMPRESSION_DLQ).build();
return QueueBuilder.durable(ENTITY_DLQ).build();
}
// ---- azure-ner-service ----
@Bean
public DirectExchange azureNerRequestExchange() {
return new DirectExchange(AZURE_ENTITY_REQUEST_EXCHANGE);
}
@Bean
public Queue exportDownloadQueue() {
public Queue azureNerDLQ() {
return QueueBuilder.durable(EXPORT_DOWNLOAD_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", EXPORT_DOWNLOAD_DLQ).build();
return QueueBuilder.durable(AZURE_ENTITY_DLQ).build();
}
// ---- cv-analysis-service ----
@Bean
public DirectExchange cvAnalysisRequestExchange() {
return new DirectExchange(CV_ANALYSIS_REQUEST_EXCHANGE);
}
@Bean
public Queue exportDownloadDeadLetterQueue() {
public Queue cvAnalysisDLQ() {
return QueueBuilder.durable(EXPORT_DOWNLOAD_DLQ).build();
return QueueBuilder.durable(CV_ANALYSIS_DLQ).build();
}
// ---- visual-layout-parsing-service ----
@Bean
public DirectExchange visualLayoutParsingRequestExchange() {
return new DirectExchange(VISUAL_LAYOUT_PARSING_REQUEST_EXCHANGE);
}
@Bean
public Queue reportQueue() {
public Queue visualLayoutParsingDLQ() {
return QueueBuilder.durable(REPORT_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", REPORT_DLQ)
.withArgument("x-max-priority", 2)
.maxPriority(2)
.build();
return QueueBuilder.durable(VISUAL_LAYOUT_PARSING_DLQ).build();
}
// ---- layoutparser-service ----
@Bean
public DirectExchange layoutParsingRequestExchange() {
return new DirectExchange(LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_EXCHANGE);
}
@Bean
public Queue reportDeadLetterQueue() {
return QueueBuilder.durable(REPORT_DLQ).build();
}
@Bean
public Queue reportResultQueue() {
return QueueBuilder.durable(REPORT_RESULT_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", REPORT_RESULT_DLQ).build();
}
@Bean
public Queue reportResultDeadLetterQueue() {
return QueueBuilder.durable(REPORT_RESULT_DLQ).build();
}
@Bean
public Queue analysisFlagCalculationQueue() {
return QueueBuilder.durable(ANALYSIS_FLAG_CALCULATION_QUEUE).build();
}
@Bean
public Queue indexingQueue() {
return QueueBuilder.durable(INDEXING_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", INDEXING_DQL).maxPriority(2).build();
}
@Bean
public Queue indexingDeadLetterQueue() {
return QueueBuilder.durable(INDEXING_DQL).build();
}
@Bean
public Queue deleteFromIndexQueue() {
return QueueBuilder.durable(DELETE_FROM_INDEX_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", DELETE_FROM_INDEX_DLQ)
.maxPriority(2)
.build();
}
@Bean
public Queue deleteFromIndexDLQ() {
return QueueBuilder.durable(DELETE_FROM_INDEX_DLQ).build();
}
@Bean
public Queue preprocessingQueue() {
return QueueBuilder.durable(PRE_PROCESSING_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", PRE_PROCESSING_DLQ)
.withArgument("x-max-priority", 2)
.maxPriority(2)
.build();
}
@Bean
public Queue preprocessingDeadLetterQueue() {
return QueueBuilder.durable(PRE_PROCESSING_DLQ).build();
}
@Bean
public Queue pdfTronQueue() {
return QueueBuilder.durable(PDFTRON_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", PDFTRON_DLQ)
.withArgument("x-max-priority", 2)
.maxPriority(2)
.build();
}
@Bean
public Queue pdfTronDlq() {
return QueueBuilder.durable(PDFTRON_DLQ).build();
}
@Bean
public Queue pdfTronResultQueue() {
return QueueBuilder.durable(PDFTRON_RESULT_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", PDFTRON_DLQ)
.withArgument("x-max-priority", 2)
.maxPriority(2)
.build();
}
@Bean
public Queue layoutparsingRequestQueue() {
return QueueBuilder.durable(LAYOUT_PARSING_REQUEST_QUEUE)//
.withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", LAYOUT_PARSING_DLQ).build();
}
@Bean
public Queue layoutparsingResponseQueue() {
return QueueBuilder.durable(LAYOUT_PARSING_FINISHED_EVENT_QUEUE).build();
}
@Bean
public Queue layoutparsingDLQ() {
public Queue layoutParsingDLQ() {
return QueueBuilder.durable(LAYOUT_PARSING_DLQ).build();
}
// ---- Saas Migration ----
@Bean
public Queue migrationQueue() {
return QueueBuilder.durable(MIGRATION_REQUEST_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", MIGRATION_DLQ)
.maxPriority(2)
.build();
}
@Bean
public Queue migrationDLQ() {
return QueueBuilder.durable(MIGRATION_DLQ).build();
}
@Bean
public Queue migrationResponseQueue() {
return QueueBuilder.durable(MIGRATION_RESPONSE_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", MIGRATION_DLQ)
.maxPriority(2)
.build();
}
}

View File

@ -0,0 +1,10 @@
package com.iqser.red.service.persistence.management.v1.processor.configuration;
import org.springframework.context.annotation.Configuration;
import com.knecon.fforesight.tenantcommons.queue.TenantMessagingConfiguration;
@Configuration
public class TenantMessagingConfigurationImpl extends TenantMessagingConfiguration {
}

View File

@ -22,7 +22,7 @@ public class UserMessagingConfiguration {
public static final String PERSISTENCE_SERVICE_USER_STATUS_CHANGED_QUEUE = "persistence-service-user-status-changed-queue";
public static final String PERSISTENCE_SERVICE_USER_ROLES_UPDATED_QUEUE = "persistence-service-user-roles-updated-queue";
public static final String PERSISTENCE_SERVICE_USER_OWN_PROFILE_UPDATED_QUEUE = "persistence-service-user-own-profile-updated-queue";
public static final String PERSISTENCE_SERVICE_USER_EVENTS_DQL = "persistence-service-user-events-dql";
public static final String PERSISTENCE_SERVICE_USER_EVENTS_DLQ = "persistence-service-user-events-dlq";
@Bean("persistenceServiceUserRolesUpdatedQueue")
@ -30,7 +30,7 @@ public class UserMessagingConfiguration {
return QueueBuilder.durable(PERSISTENCE_SERVICE_USER_ROLES_UPDATED_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DQL)
.withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DLQ)
.build();
}
@ -48,7 +48,7 @@ public class UserMessagingConfiguration {
return QueueBuilder.durable(PERSISTENCE_SERVICE_USER_STATUS_CHANGED_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DQL)
.withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DLQ)
.build();
}
@ -66,7 +66,7 @@ public class UserMessagingConfiguration {
return QueueBuilder.durable(PERSISTENCE_SERVICE_USER_UPDATED_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DQL)
.withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DLQ)
.build();
}
@ -84,7 +84,7 @@ public class UserMessagingConfiguration {
return QueueBuilder.durable(PERSISTENCE_SERVICE_USER_DELETED_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DQL)
.withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DLQ)
.build();
}
@ -102,7 +102,7 @@ public class UserMessagingConfiguration {
return QueueBuilder.durable(PERSISTENCE_SERVICE_USER_OWN_PROFILE_UPDATED_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DQL)
.withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DLQ)
.build();
}
@ -120,7 +120,7 @@ public class UserMessagingConfiguration {
return QueueBuilder.durable(PERSISTENCE_SERVICE_USER_CREATED_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DQL)
.withArgument("x-dead-letter-routing-key", PERSISTENCE_SERVICE_USER_EVENTS_DLQ)
.build();
}
@ -136,7 +136,7 @@ public class UserMessagingConfiguration {
@Bean
public Queue persistenceServiceUserEventsDLQ() {
return QueueBuilder.durable(PERSISTENCE_SERVICE_USER_EVENTS_DQL).build();
return QueueBuilder.durable(PERSISTENCE_SERVICE_USER_EVENTS_DLQ).build();
}

View File

@ -28,6 +28,7 @@ import lombok.extern.slf4j.Slf4j;
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
public class ExportDownloadMessageReceiver {
public static final String EXPORT_DOWNLOAD_LISTENER_ID = "export-download-listener";
static long MB = 1024 * 1024;
DossierTemplateExportService dossierTemplateService;
FileExchangeExportService fileExchangeExportService;
@ -36,7 +37,7 @@ public class ExportDownloadMessageReceiver {
@RabbitHandler
@RabbitListener(queues = MessagingConfiguration.EXPORT_DOWNLOAD_QUEUE)
@RabbitListener(id = EXPORT_DOWNLOAD_LISTENER_ID)
public void receive(ExportDownloadMessage downloadJob) throws JsonProcessingException {
var download = downloadStatusPersistenceService.getStatus(downloadJob.getStorageId());

View File

@ -64,6 +64,7 @@ import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemp
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.DossierStatusInfo;
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileAttributeConfig;
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.legalbasis.LegalBasis;
import com.knecon.fforesight.tenantcommons.TenantContext;
import io.micrometer.observation.annotation.Observed;
import lombok.AccessLevel;
@ -118,7 +119,7 @@ public class DossierTemplateExportService {
private void addToExportDownloadQueue(ExportDownloadMessage downloadJob, int priority) {
rabbitTemplate.convertAndSend(MessagingConfiguration.EXPORT_DOWNLOAD_QUEUE, downloadJob, message -> {
rabbitTemplate.convertAndSend(MessagingConfiguration.EXPORT_DOWNLOAD_EXCHANGE, TenantContext.getTenantId(), downloadJob, message -> {
message.getMessageProperties().setPriority(priority);
return message;
});

View File

@ -22,6 +22,7 @@ import com.iqser.red.service.persistence.service.v1.api.shared.model.DownloadRes
import com.iqser.red.service.persistence.service.v1.api.shared.model.FileExchangeExportRequest;
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.Dossier;
import com.knecon.fforesight.keycloakcommons.security.KeycloakSecurity;
import com.knecon.fforesight.tenantcommons.TenantContext;
import io.micrometer.observation.annotation.Observed;
import lombok.AccessLevel;
@ -74,7 +75,7 @@ public class FileExchangeExportService {
private void addToExportDownloadQueue(ExportDownloadMessage downloadJob) {
rabbitTemplate.convertAndSend(MessagingConfiguration.EXPORT_DOWNLOAD_QUEUE, downloadJob, message -> {
rabbitTemplate.convertAndSend(MessagingConfiguration.EXPORT_DOWNLOAD_EXCHANGE, TenantContext.getTenantId(), downloadJob, message -> {
message.getMessageProperties().setPriority(1);
return message;
});

View File

@ -1,7 +1,6 @@
package com.iqser.red.service.persistence.management.v1.processor.migration;
import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.MIGRATION_QUEUE;
import static com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_QUEUE;
import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.MIGRATION_REQUEST_QUEUE;
import java.time.OffsetDateTime;
import java.util.List;
@ -39,6 +38,7 @@ import com.iqser.red.storage.commons.exception.StorageObjectDoesNotExist;
import com.iqser.red.storage.commons.service.StorageService;
import com.knecon.fforesight.databasetenantcommons.providers.TenantSyncService;
import com.knecon.fforesight.databasetenantcommons.providers.events.TenantSyncEvent;
import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames;
import com.knecon.fforesight.tenantcommons.TenantContext;
import com.knecon.fforesight.tenantcommons.TenantProvider;
import com.knecon.fforesight.tenantcommons.model.UpdateDetailsRequest;
@ -125,7 +125,7 @@ public class SaasMigrationService implements TenantSyncService {
var layoutParsingRequest = layoutParsingRequestFactory.build(file.getDossierId(), file.getFileId(), false);
rabbitTemplate.convertAndSend(LAYOUT_PARSING_REQUEST_QUEUE, layoutParsingRequest);
rabbitTemplate.convertAndSend(LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_EXCHANGE, TenantContext.getTenantId(), layoutParsingRequest);
numberOfFiles++;
@ -160,7 +160,7 @@ public class SaasMigrationService implements TenantSyncService {
log.info("Starting Migration for dossierId {} and fileId {}", dossierId, fileId);
saasMigrationStatusPersistenceService.createMigrationRequiredStatus(dossierId, fileId);
var layoutParsingRequest = layoutParsingRequestFactory.build(dossierId, fileId, false);
rabbitTemplate.convertAndSend(LAYOUT_PARSING_REQUEST_QUEUE, layoutParsingRequest);
rabbitTemplate.convertAndSend(LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_EXCHANGE, TenantContext.getTenantId(), layoutParsingRequest);
}
@ -183,7 +183,7 @@ public class SaasMigrationService implements TenantSyncService {
String dossierTemplateId = dossierService.getDossierById(dossierId).getDossierTemplateId();
rabbitTemplate.convertAndSend(MIGRATION_QUEUE,
rabbitTemplate.convertAndSend(MIGRATION_REQUEST_QUEUE,
MigrationRequest.builder()
.dossierTemplateId(dossierTemplateId)
.dossierId(dossierId)
@ -230,13 +230,13 @@ public class SaasMigrationService implements TenantSyncService {
}
public void handleError(String dossierId, String fileId, String errorCause, String retryQueue) {
public void handleError(String dossierId, String fileId, String errorCause, String retryExchange) {
var migrationEntry = saasMigrationStatusPersistenceService.findById(fileId);
Integer numErrors = migrationEntry.getProcessingErrorCounter();
if (numErrors != null && numErrors <= settings.getMaxErrorRetries()) {
saasMigrationStatusPersistenceService.updateErrorCounter(fileId, numErrors + 1, errorCause);
rabbitTemplate.convertAndSend(retryQueue, MigrationRequest.builder().dossierId(dossierId).fileId(fileId).build());
rabbitTemplate.convertAndSend(retryExchange, TenantContext.getTenantId(), MigrationRequest.builder().dossierId(dossierId).fileId(fileId).build());
log.error("Retrying error during saas migration for tenant {} dossier {} and file {}, cause {}", TenantContext.getTenantId(), dossierId, fileId, errorCause);
} else {
saasMigrationStatusPersistenceService.updateErrorStatus(fileId, errorCause);

View File

@ -0,0 +1,101 @@
package com.iqser.red.service.persistence.management.v1.processor.migration.migrations;
import java.util.List;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.stereotype.Service;
import com.iqser.red.service.persistence.management.v1.processor.migration.Migration;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Setter
@Service
public class QueueRenameMigration23 extends Migration {
private final AmqpAdmin amqpAdmin;
private static final String NAME = "Migration for renaming of most queues";
private static final long VERSION = 23;
private static final List<String> queueNames = List.of("analysis_flag_calculation_queue",
"cv_analysis_dead_letter_queue",
"cv_analysis_request_queue",
"cv_analysis_response_queue",
"deleteFromIndexDLQ",
"deleteFromIndexQueue",
"downloadDLQ",
"downloadQueue",
"download_compression_dlq",
"download_compression_queue",
"entity_dead_letter_queue",
"entity_request_queue",
"entity_response_queue",
"exportDownloadDLQ",
"exportDownloadQueue",
"image_dead_letter_queue",
"image_request_queue",
"image_response_queue",
"indexingDQL",
"indexingQueue",
"layout_parsing_dead_letter_queue",
"layout_parsing_request_queue",
"layout_parsing_response_queue",
"migrationDLQ",
"migrationQueue",
"migrationResponseQueue",
"mongo-tenant-created",
"mongo-tenant-created-dlq",
"ocr_dead_letter_queue",
"ocr_request_queue",
"ocr_response_queue",
"ocr_status_update_dead_letter_queue",
"ocr_status_update_response_queue",
"pdftron_dlq",
"pdftron_queue",
"pdftron_result_queue",
"persistence-service-user-created-queue",
"persistence-service-user-deleted-queue",
"persistence-service-user-events-dql",
"persistence-service-user-own-profile-updated-queue",
"persistence-service-user-roles-updated-queue",
"persistence-service-user-status-changed-queue",
"persistence-service-user-updated-queue",
"preprocessingDLQ",
"preprocessingQueue",
"redactionAnalysisResponseQueue",
"redactionDQL",
"redactionPriorityQueue",
"redactionQueue",
"reportDLQ",
"reportQueue",
"reportResultDLQ",
"reportResultQueue",
"tenant-created",
"tenant-created-dlq",
"tenant-delete-dlq",
"tenant-delete-queue",
"tenant-sync",
"tenant-sync-dlq",
"visual_layout_parsing_service_dead_letter_queue",
"visual_layout_parsing_service_queue",
"visual_layout_parsing_service_response_queue");
public QueueRenameMigration23(AmqpAdmin amqpAdmin) {
super(NAME, VERSION);
this.amqpAdmin = amqpAdmin;
}
@Override
protected void migrate() {
log.info("Migration: Deleting queues...");
queueNames.forEach(amqpAdmin::deleteQueue);
}
}

View File

@ -28,6 +28,7 @@ import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemp
import com.iqser.red.service.persistence.service.v1.api.shared.model.download.DownloadRequest;
import com.iqser.red.service.persistence.service.v1.api.shared.model.download.DownloadStatus;
import com.iqser.red.service.persistence.service.v1.api.shared.model.download.DownloadWithOptionRequest;
import com.knecon.fforesight.tenantcommons.TenantContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -139,7 +140,7 @@ public class DownloadService {
private void addToDownloadQueue(DownloadJob downloadJob, int priority) {
rabbitTemplate.convertAndSend(MessagingConfiguration.DOWNLOAD_QUEUE, downloadJob, message -> {
rabbitTemplate.convertAndSend(MessagingConfiguration.DOWNLOAD_EXCHANGE, TenantContext.getTenantId(), downloadJob, message -> {
message.getMessageProperties().setPriority(priority);
return message;
});

View File

@ -1,46 +1,41 @@
package com.iqser.red.service.persistence.management.v1.processor.service;
import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.CHUNKING_SERVICE_QUEUE;
import static com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_QUEUE;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import com.iqser.red.service.pdftron.redaction.v1.api.model.ApplicationType;
import com.iqser.red.service.persistence.management.v1.processor.exception.BadRequestException;
import com.iqser.red.service.persistence.management.v1.processor.exception.NotFoundException;
import com.iqser.red.service.persistence.management.v1.processor.model.websocket.AnalyseStatus;
import com.iqser.red.service.persistence.management.v1.processor.model.websocket.FileEventType;
import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.ChunkingRequestFactory;
import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.QueueMessageIdentifierService;
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DossierTemplatePersistenceService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.google.common.collect.Sets;
import com.iqser.red.service.pdftron.redaction.v1.api.model.ApplicationType;
import com.iqser.red.service.pdftron.redaction.v1.api.model.ProcessUntouchedDocumentRequest;
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.FileAttributeEntity;
import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.FileEntity;
import com.iqser.red.service.persistence.management.v1.processor.exception.BadRequestException;
import com.iqser.red.service.persistence.management.v1.processor.exception.InternalServerErrorException;
import com.iqser.red.service.persistence.management.v1.processor.exception.NotFoundException;
import com.iqser.red.service.persistence.management.v1.processor.model.CvAnalysisServiceRequest;
import com.iqser.red.service.persistence.management.v1.processor.model.FileIdentifier;
import com.iqser.red.service.persistence.management.v1.processor.model.ManualChangesQueryOptions;
import com.iqser.red.service.persistence.service.v1.api.shared.model.NerServiceRequest;
import com.iqser.red.service.persistence.management.v1.processor.model.OCRStatusUpdateResponse;
import com.iqser.red.service.persistence.management.v1.processor.model.VisualLayoutParsingServiceRequest;
import com.iqser.red.service.persistence.management.v1.processor.model.image.ImageServiceRequest;
import com.iqser.red.service.persistence.management.v1.processor.model.websocket.AnalyseStatus;
import com.iqser.red.service.persistence.management.v1.processor.model.websocket.FileEventType;
import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.ChunkingRequestFactory;
import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.LayoutParsingRequestFactory;
import com.iqser.red.service.persistence.management.v1.processor.service.layoutparsing.QueueMessageIdentifierService;
import com.iqser.red.service.persistence.management.v1.processor.service.manualredactions.ManualRedactionProviderService;
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DossierPersistenceService;
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DossierTemplatePersistenceService;
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.FileAttributeConfigPersistenceService;
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.FileStatusPersistenceService;
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.ViewedPagesPersistenceService;
@ -59,6 +54,7 @@ import com.iqser.red.service.persistence.service.v1.api.shared.model.AnalyzeRequ
import com.iqser.red.service.persistence.service.v1.api.shared.model.AnalyzeResult;
import com.iqser.red.service.persistence.service.v1.api.shared.model.FileAttribute;
import com.iqser.red.service.persistence.service.v1.api.shared.model.MessageType;
import com.iqser.red.service.persistence.service.v1.api.shared.model.NerServiceRequest;
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileErrorInfo;
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileModel;
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileType;
@ -68,7 +64,9 @@ import com.iqser.red.service.persistence.service.v1.api.shared.mongo.service.Com
import com.iqser.red.service.persistence.service.v1.api.shared.mongo.service.EntityLogMongoService;
import com.knecon.fforesight.databasetenantcommons.providers.utils.MagicConverter;
import com.knecon.fforesight.llm.service.LlmNerMessage;
import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames;
import com.knecon.fforesight.service.ocr.v1.api.model.DocumentRequest;
import com.knecon.fforesight.tenantcommons.TenantContext;
import jakarta.transaction.Transactional;
import lombok.AccessLevel;
@ -279,7 +277,7 @@ public class FileStatusService {
var layoutParsingRequest = layoutParsingRequestFactory.build(dossierId, fileId, priority);
setStatusFullProcessing(fileId);
log.info("Add file: {} from dossier {} to layout parsing request queue", fileId, dossierId);
rabbitTemplate.convertAndSend(LAYOUT_PARSING_REQUEST_QUEUE, layoutParsingRequest);
rabbitTemplate.convertAndSend(LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_EXCHANGE, TenantContext.getTenantId(), layoutParsingRequest);
sendReadOnlyAnalysisEvent(dossierId, fileId, fileEntity);
return;
}
@ -351,9 +349,9 @@ public class FileStatusService {
}
if (priority) {
rabbitTemplate.convertAndSend(MessagingConfiguration.REDACTION_PRIORITY_QUEUE, analyseRequest);
rabbitTemplate.convertAndSend(MessagingConfiguration.REDACTION_PRIORITY_REQUEST_EXCHANGE, TenantContext.getTenantId(), analyseRequest);
} else {
rabbitTemplate.convertAndSend(MessagingConfiguration.REDACTION_QUEUE, analyseRequest);
rabbitTemplate.convertAndSend(MessagingConfiguration.REDACTION_REQUEST_EXCHANGE, TenantContext.getTenantId(), analyseRequest);
}
sendAnalysisEvent(dossierId, fileId, fileEntity);
}
@ -385,7 +383,8 @@ public class FileStatusService {
private void addToVisualLayoutParsingQueue(String dossierId, String fileId) {
fileStatusPersistenceService.updateProcessingStatus(fileId, ProcessingStatus.VISUAL_LAYOUT_PARSING_ANALYZING);
rabbitTemplate.convertAndSend(MessagingConfiguration.VISUAL_LAYOUT_PARSING_QUEUE,
rabbitTemplate.convertAndSend(MessagingConfiguration.VISUAL_LAYOUT_PARSING_REQUEST_EXCHANGE,
TenantContext.getTenantId(),
VisualLayoutParsingServiceRequest.builder()
.dossierId(dossierId)
.fileId(fileId)
@ -412,7 +411,7 @@ public class FileStatusService {
setStatusPreProcessingQueued(fileId);
rabbitTemplate.convertAndSend(MessagingConfiguration.PRE_PROCESSING_QUEUE, processUntouchedDocumentRequest);
rabbitTemplate.convertAndSend(MessagingConfiguration.PREPROCESSING_EXCHANGE, TenantContext.getTenantId(), processUntouchedDocumentRequest);
}
@ -426,7 +425,8 @@ public class FileStatusService {
fileStatusPersistenceService.updateProcessingStatus(fileId, ProcessingStatus.FIGURE_DETECTION_ANALYZING);
rabbitTemplate.convertAndSend(MessagingConfiguration.CV_ANALYSIS_QUEUE,
rabbitTemplate.convertAndSend(MessagingConfiguration.CV_ANALYSIS_REQUEST_EXCHANGE,
TenantContext.getTenantId(),
CvAnalysisServiceRequest.builder()
.dossierId(dossierId)
.fileId(fileId)
@ -445,7 +445,8 @@ public class FileStatusService {
fileStatusPersistenceService.updateProcessingStatus(fileId, ProcessingStatus.TABLE_PARSING_ANALYZING);
rabbitTemplate.convertAndSend(MessagingConfiguration.CV_ANALYSIS_QUEUE,
rabbitTemplate.convertAndSend(MessagingConfiguration.CV_ANALYSIS_REQUEST_EXCHANGE,
TenantContext.getTenantId(),
CvAnalysisServiceRequest.builder()
.dossierId(dossierId)
.fileId(fileId)
@ -463,7 +464,8 @@ public class FileStatusService {
public void addToImageQueue(String dossierId, String fileId) {
setStatusImageAnalyzing(fileId);
rabbitTemplate.convertAndSend(MessagingConfiguration.IMAGE_SERVICE_QUEUE,
rabbitTemplate.convertAndSend(MessagingConfiguration.IMAGE_REQUEST_EXCHANGE,
TenantContext.getTenantId(),
ImageServiceRequest.builder()
.dossierId(dossierId)
.fileId(fileId)
@ -496,7 +498,8 @@ public class FileStatusService {
protected void addToNerQueue(String dossierId, String fileId) {
setStatusNerAnalyzing(fileId);
rabbitTemplate.convertAndSend(MessagingConfiguration.NER_SERVICE_QUEUE,
rabbitTemplate.convertAndSend(MessagingConfiguration.ENTITY_REQUEST_EXCHANGE,
TenantContext.getTenantId(),
NerServiceRequest.builder()
.dossierId(dossierId)
.fileId(fileId)
@ -533,7 +536,8 @@ public class FileStatusService {
protected void addToAzureNerQueue(String dossierId, String fileId) {
setStatusNerAnalyzing(fileId);
rabbitTemplate.convertAndSend(MessagingConfiguration.AZURE_NER_SERVICE_QUEUE,
rabbitTemplate.convertAndSend(MessagingConfiguration.AZURE_ENTITY_REQUEST_EXCHANGE,
TenantContext.getTenantId(),
NerServiceRequest.builder()
.dossierId(dossierId)
.fileId(fileId)
@ -624,10 +628,13 @@ public class FileStatusService {
public void addToOcrQueue(String dossierId, String fileId, int priority) {
var removeWatermark = dossierTemplatePersistenceService.getDossierTemplate(dossierPersistenceService.getDossierTemplateId(dossierId)).isRemoveWatermark();
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_REQUEST_QUEUE, new DocumentRequest(dossierId, fileId, removeWatermark), message -> {
message.getMessageProperties().setPriority(priority);
return message;
});
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_REQUEST_EXCHANGE,
TenantContext.getTenantId(),
new DocumentRequest(dossierId, fileId, removeWatermark),
message -> {
message.getMessageProperties().setPriority(priority);
return message;
});
}

View File

@ -15,6 +15,7 @@ import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.FileStatusPersistenceService;
import com.iqser.red.service.search.v1.model.IndexMessage;
import com.iqser.red.service.search.v1.model.IndexMessageType;
import com.knecon.fforesight.tenantcommons.TenantContext;
import lombok.RequiredArgsConstructor;
@ -66,7 +67,8 @@ public class IndexingService {
public void addToIndexingQueue(IndexMessageType indexMessageType, String dossierTemplateId, String dossierId, String fileId, int priority) {
rabbitTemplate.convertAndSend(MessagingConfiguration.INDEXING_QUEUE,
rabbitTemplate.convertAndSend(MessagingConfiguration.INDEXING_EXCHANGE,
TenantContext.getTenantId(),
IndexMessage.builder().messageType(indexMessageType).dossierTemplateId(dossierTemplateId).dossierId(dossierId).fileId(fileId).build(),
message -> {
message.getMessageProperties().setPriority(priority);
@ -77,10 +79,13 @@ public class IndexingService {
public void addToDeleteFromIndexQueue(String dossierId, String fileId, int priority) {
rabbitTemplate.convertAndSend(MessagingConfiguration.DELETE_FROM_INDEX_QUEUE, IndexMessage.builder().dossierId(dossierId).fileId(fileId).build(), message -> {
message.getMessageProperties().setPriority(priority);
return message;
});
rabbitTemplate.convertAndSend(MessagingConfiguration.DELETE_FROM_INDEX_EXCHANGE,
TenantContext.getTenantId(),
IndexMessage.builder().dossierId(dossierId).fileId(fileId).build(),
message -> {
message.getMessageProperties().setPriority(priority);
return message;
});
}
}

View File

@ -8,6 +8,7 @@ import com.iqser.red.service.persistence.management.v1.processor.model.DownloadJ
import com.iqser.red.service.persistence.management.v1.processor.service.websocket.WebsocketService;
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.repository.DownloadStatusRepository;
import com.iqser.red.service.persistence.service.v1.api.shared.model.download.DownloadStatusValue;
import com.knecon.fforesight.tenantcommons.TenantContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -29,7 +30,9 @@ public class DownloadCompressingService {
if (updated == 1) {
websocketService.sendDownloadEvent(downloadId, userId, DownloadStatusValue.COMPRESSING);
rabbitTemplate.convertAndSend(MessagingConfiguration.DOWNLOAD_COMPRESSION_QUEUE, DownloadJob.builder().storageId(downloadId).userId(userId).build());
rabbitTemplate.convertAndSend(MessagingConfiguration.DOWNLOAD_COMPRESSION_EXCHANGE,
TenantContext.getTenantId(),
DownloadJob.builder().storageId(downloadId).userId(userId).build());
}
}

View File

@ -1,13 +1,11 @@
package com.iqser.red.service.persistence.management.v1.processor.service.download;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
import com.iqser.red.service.persistence.management.v1.processor.model.DownloadJob;
import lombok.RequiredArgsConstructor;
@ -19,13 +17,15 @@ import lombok.extern.slf4j.Slf4j;
@RequiredArgsConstructor
public class DownloadCompressionMessageReceiver {
public static final String DOWNLOAD_COMPRESSION_LISTENER_ID = "download-compression-listener";
private final DownloadCompressingService downloadCompressingService;
private final ObjectMapper objectMapper;
@SneakyThrows
@RabbitHandler
@RabbitListener(queues = MessagingConfiguration.DOWNLOAD_COMPRESSION_QUEUE)
@RabbitListener(id = DOWNLOAD_COMPRESSION_LISTENER_ID)
public void receive(DownloadJob downloadJob) throws JsonProcessingException {
downloadCompressingService.compressDownload(downloadJob.getStorageId());

View File

@ -56,7 +56,7 @@ public class DownloadDLQMessageReceiver {
@SneakyThrows
@RabbitListener(queues = MessagingConfiguration.REPORT_DLQ)
@RabbitListener(queues = MessagingConfiguration.REPORT_REQUEST_DLQ)
public void handleReportDlqMessage(Message failedMessage) {
var reportRequestMessage = objectMapper.readValue(failedMessage.getBody(), ReportRequestMessage.class);
@ -72,7 +72,7 @@ public class DownloadDLQMessageReceiver {
}
@RabbitListener(queues = MessagingConfiguration.REPORT_RESULT_DLQ)
@RabbitListener(queues = MessagingConfiguration.REPORT_RESPONSE_DLQ)
public void handleReportResponseDlqMessage(ReportResultMessage reportResultMessage) {
log.warn("Handling report request in DLQ userId: {} storageId: {} - setting status to error!", reportResultMessage.getUserId(), reportResultMessage.getDownloadId());

View File

@ -8,7 +8,6 @@ import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
import com.iqser.red.service.persistence.management.v1.processor.model.DownloadJob;
import lombok.RequiredArgsConstructor;
@ -20,13 +19,15 @@ import lombok.extern.slf4j.Slf4j;
@RequiredArgsConstructor
public class DownloadMessageReceiver {
public static final String DOWNLOAD_LISTENER_ID = "download-listener";
private final DownloadProcessorService downloadProcessorService;
private final ObjectMapper objectMapper;
@SneakyThrows
@RabbitHandler
@RabbitListener(queues = MessagingConfiguration.DOWNLOAD_QUEUE)
@RabbitListener(id = DOWNLOAD_LISTENER_ID)
public void receive(Message message) throws JsonProcessingException {
DownloadJob downloadJob = objectMapper.readValue(message.getBody(), DownloadJob.class);

View File

@ -86,7 +86,7 @@ public class DownloadPreparationService {
.forEach(fileEntity -> {
RedactionMessage message = messageBuilder.fileId(fileEntity.getId()).unapprovedFile(fileEntity.getWorkflowStatus() != WorkflowStatus.APPROVED).build();
log.info("Sending redaction request for downloadId:{} fileId:{} to pdftron-redaction-queue", downloadId, fileEntity.getId());
rabbitTemplate.convertAndSend(MessagingConfiguration.PDFTRON_QUEUE, message);
rabbitTemplate.convertAndSend(MessagingConfiguration.PDFTRON_REQUEST_EXCHANGE, TenantContext.getTenantId(), message);
});
}

View File

@ -17,6 +17,7 @@ import com.iqser.red.service.persistence.management.v1.processor.service.websock
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DownloadStatusPersistenceService;
import com.iqser.red.service.persistence.service.v1.api.shared.model.download.DownloadStatusValue;
import com.iqser.red.service.redaction.report.v1.api.model.ReportRequestMessage;
import com.knecon.fforesight.tenantcommons.TenantContext;
import jakarta.transaction.Transactional;
import lombok.RequiredArgsConstructor;
@ -64,7 +65,7 @@ public class DownloadProcessorService {
private void addReportQueue(ReportRequestMessage reportRequestMessage, int priority) {
rabbitTemplate.convertAndSend(MessagingConfiguration.REPORT_QUEUE, reportRequestMessage, message -> {
rabbitTemplate.convertAndSend(MessagingConfiguration.REPORT_REQUEST_EXCHANGE, TenantContext.getTenantId(), reportRequestMessage, message -> {
message.getMessageProperties().setPriority(priority);
return message;
});

View File

@ -23,13 +23,15 @@ import lombok.extern.slf4j.Slf4j;
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class DownloadReportMessageReceiver {
public static final String REPORT_RESPONSE_LISTENER_ID = "report-response-listener";
ObjectMapper objectMapper;
DownloadPreparationService downloadPreparationService;
@SneakyThrows
@RabbitHandler
@RabbitListener(queues = MessagingConfiguration.REPORT_RESULT_QUEUE)
@RabbitListener(id = REPORT_RESPONSE_LISTENER_ID)
public void receive(Message message) {
ReportResultMessage reportResultMessage = objectMapper.readValue(message.getBody(), ReportResultMessage.class);

View File

@ -8,7 +8,7 @@ import com.iqser.red.service.persistence.management.v1.processor.configuration.M
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DownloadStatusPersistenceService;
import com.iqser.red.service.persistence.management.v1.processor.settings.FileManagementServiceSettings;
import com.iqser.red.service.persistence.service.v1.api.shared.model.download.DownloadStatusValue;
import com.iqser.red.service.redaction.report.v1.api.model.ReportRequestMessage;
import com.knecon.fforesight.tenantcommons.TenantContext;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
@ -64,11 +64,11 @@ public class RedactionDlqMessageReceiver {
downloadPreparationService.clearRedactionStatusEntries(redactionMessage.getDownloadId());
} else {
downloadPreparationService.increaseProcessingErrorCounter(redactionMessage, numErrors);
rabbitTemplate.convertAndSend(MessagingConfiguration.PDFTRON_QUEUE, message);
rabbitTemplate.convertAndSend(MessagingConfiguration.PDFTRON_REQUEST_EXCHANGE, TenantContext.getTenantId(), message);
}
} else {
downloadPreparationService.markFileAsProcessingError(redactionMessage);
rabbitTemplate.convertAndSend(MessagingConfiguration.PDFTRON_QUEUE, message);
rabbitTemplate.convertAndSend(MessagingConfiguration.PDFTRON_REQUEST_EXCHANGE, TenantContext.getTenantId(), message);
}
}

View File

@ -23,13 +23,15 @@ import org.springframework.stereotype.Service;
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class RedactionResultMessageReceiver {
public static final String PDFTRON_RESPONSE_LISTENER_ID = "pdftron-response-listener";
ObjectMapper objectMapper;
DownloadPreparationService downloadPreparationService;
@SneakyThrows
@RabbitHandler
@RabbitListener(queues = MessagingConfiguration.PDFTRON_RESULT_QUEUE)
@RabbitListener(id = PDFTRON_RESPONSE_LISTENER_ID)
public void receive(Message message) throws JsonProcessingException {
RedactionResultMessage redactionResultMessage = objectMapper.readValue(message.getBody(), RedactionResultMessage.class);

View File

@ -51,7 +51,8 @@ import lombok.extern.slf4j.Slf4j;
log.debug("Found {} files which require analysis flag calculation. Tenant: {}", fileIdentifiers.size(), tenant.getDisplayName());
fileIdentifiers.forEach(fileIdentifier -> rabbitTemplate.convertAndSend(MessagingConfiguration.ANALYSIS_FLAG_CALCULATION_QUEUE,
fileIdentifiers.forEach(fileIdentifier -> rabbitTemplate.convertAndSend(MessagingConfiguration.ANALYSIS_FLAG_CALCULATION_EXCHANGE,
TenantContext.getTenantId(),
new AnalysisFlagCalculationMessage(fileIdentifier.dossierId(),
fileIdentifier.fileId())));
TenantContext.clear();

View File

@ -74,11 +74,12 @@ public class AutomaticAnalysisJob implements Job {
return;
}
var redactionQueueInfo = amqpAdmin.getQueueInfo(MessagingConfiguration.REDACTION_QUEUE);
String queueName = MessagingConfiguration.REDACTION_REQUEST_QUEUE_PREFIX + "_" + tenant.getTenantId();
var redactionQueueInfo = amqpAdmin.getQueueInfo(queueName);
if (redactionQueueInfo != null) {
log.debug("[Tenant:{}] Checking queue status to see if background analysis can happen. Currently {} holds {} elements and has {} consumers",
tenant.getTenantId(),
MessagingConfiguration.REDACTION_QUEUE,
queueName,
redactionQueueInfo.getMessageCount(),
redactionQueueInfo.getConsumerCount());
// only 1 file in queue
@ -105,7 +106,7 @@ public class AutomaticAnalysisJob implements Job {
}
} else {
log.info("[Tenant:{}] Failed to obtain queue info for queue: {}", TenantContext.getTenantId(), MessagingConfiguration.REDACTION_QUEUE);
log.info("[Tenant:{}] Failed to obtain queue info for queue: {}", TenantContext.getTenantId(), queueName);
}
});

View File

@ -81,6 +81,7 @@ import com.iqser.red.service.persistence.service.v1.api.shared.model.manual.Resi
import com.iqser.red.service.persistence.service.v1.api.shared.mongo.service.EntityLogMongoService;
import com.knecon.fforesight.databasetenantcommons.providers.utils.MagicConverter;
import com.knecon.fforesight.keycloakcommons.security.KeycloakSecurity;
import com.knecon.fforesight.tenantcommons.TenantContext;
import io.micrometer.observation.annotation.Observed;
import lombok.AccessLevel;
@ -667,7 +668,7 @@ public class ManualRedactionService {
.build();
log.info("Sending Surrounding Text Analysis for unprocessed manual redactions: {} for file: {} ", manualRedactions, fileId);
rabbitTemplate.convertAndSend(MessagingConfiguration.REDACTION_PRIORITY_QUEUE, analyseRequest);
rabbitTemplate.convertAndSend(MessagingConfiguration.REDACTION_PRIORITY_REQUEST_EXCHANGE, TenantContext.getTenantId(), analyseRequest);
}

View File

@ -6,7 +6,6 @@ import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
import com.iqser.red.service.persistence.management.v1.processor.model.AnalysisFlagCalculationMessage;
import com.iqser.red.service.persistence.management.v1.processor.service.AnalysisFlagsCalculationService;
@ -22,13 +21,15 @@ import lombok.extern.slf4j.Slf4j;
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
public class AnalysisFlagsCalculationMessageReceiver {
public static final String ANALYSIS_FLAG_CALCULATION_LISTENER_ID = "analysis-flag-calculation-listener";
AnalysisFlagsCalculationService analysisFlagsCalculationService;
ObjectMapper mapper;
@SneakyThrows
@RabbitHandler
@RabbitListener(queues = MessagingConfiguration.ANALYSIS_FLAG_CALCULATION_QUEUE)
@RabbitListener(id = ANALYSIS_FLAG_CALCULATION_LISTENER_ID)
public void receiveAnalysisFlagCalculationMessage(Message message) {
var analysisFlagCalculationMessage = mapper.readValue(message.getBody(), AnalysisFlagCalculationMessage.class);

View File

@ -27,14 +27,15 @@ import lombok.extern.slf4j.Slf4j;
@RequiredArgsConstructor
public class AzureNerMessageReceiver {
public static final String AZURE_ENTITY_RESPONSE_LISTENER_ID = "azure-entity-response-listener";
private final FileStatusService fileStatusService;
private final ObjectMapper objectMapper;
private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService;
@SneakyThrows
@RabbitHandler
@RabbitListener(queues = MessagingConfiguration.AZURE_NER_SERVICE_RESPONSE_QUEUE)
@RabbitListener(id = AZURE_ENTITY_RESPONSE_LISTENER_ID)
public void receive(Message message) {
HashMap<String, Object> entityResponse = objectMapper.readValue(message.getBody(), new TypeReference<>() {
@ -43,13 +44,12 @@ public class AzureNerMessageReceiver {
String dossierId = (String) entityResponse.get("dossierId");
String fileId = (String) entityResponse.get("fileId");
log.info("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.AZURE_NER_SERVICE_RESPONSE_QUEUE, dossierId, fileId);
log.info("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.AZURE_ENTITY_RESPONSE_EXCHANGE, dossierId, fileId);
fileStatusService.setStatusAnalyse(dossierId, fileId, false);
}
@RabbitHandler
@RabbitListener(queues = MessagingConfiguration.AZURE_NER_SERVICE_DLQ)
@RabbitListener(queues = MessagingConfiguration.AZURE_ENTITY_DLQ)
public void handleDLQMessage(Message failedMessage) throws IOException {
HashMap<String, Object> entityResponse = objectMapper.readValue(failedMessage.getBody(), new TypeReference<>() {
@ -57,11 +57,11 @@ public class AzureNerMessageReceiver {
String dossierId = (String) entityResponse.get("dossierId");
String fileId = (String) entityResponse.get("fileId");
log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.AZURE_NER_SERVICE_DLQ, dossierId, fileId);
log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.AZURE_ENTITY_DLQ, dossierId, fileId);
fileStatusProcessingUpdateService.analysisFailed(dossierId,
fileId,
new FileErrorInfo("azure ner service failed",
MessagingConfiguration.AZURE_NER_SERVICE_DLQ,
MessagingConfiguration.AZURE_ENTITY_DLQ,
"azure-ner-service",
OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS)));
}

View File

@ -24,6 +24,8 @@ import lombok.extern.slf4j.Slf4j;
@RequiredArgsConstructor
public class CvAnalysisMessageReceiver {
public static final String CV_ANALYSIS_RESPONSE_LISTENER_ID = "cv-analysis-response-listener";
private final ObjectMapper objectMapper;
private final FileStatusService fileStatusService;
private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService;
@ -31,13 +33,16 @@ public class CvAnalysisMessageReceiver {
@SneakyThrows
@RabbitListener(queues = MessagingConfiguration.CV_ANALYSIS_RESPONSE_QUEUE)
@RabbitListener(id = CV_ANALYSIS_RESPONSE_LISTENER_ID)
public void receive(CvAnalysisServiceResponse response) {
addFileIdToTrace(response.getFileId());
fileStatusService.setStatusAnalyse(response.getDossierId(), response.getFileId(), false);
log.info("Received message in {} for dossierId {} and fileId {}", MessagingConfiguration.CV_ANALYSIS_RESPONSE_QUEUE, response.getDossierId(), response.getFileId());
log.info("Received message from exchange {} for dossierId {} and fileId {}",
MessagingConfiguration.CV_ANALYSIS_RESPONSE_EXCHANGE,
response.getDossierId(),
response.getFileId());
}
@ -48,11 +53,11 @@ public class CvAnalysisMessageReceiver {
var response = objectMapper.readValue(failedMessage.getBody(), CvAnalysisServiceResponse.class);
addFileIdToTrace(response.getFileId());
log.warn("Received message from {} for dossierId {} and fileId {}", MessagingConfiguration.IMAGE_SERVICE_DLQ, response.getDossierId(), response.getFileId());
log.warn("Received message from {} for dossierId {} and fileId {}", MessagingConfiguration.IMAGE_DLQ, response.getDossierId(), response.getFileId());
fileStatusProcessingUpdateService.analysisFailed(response.getDossierId(),
response.getFileId(),
new FileErrorInfo("cv analysis failed",
MessagingConfiguration.IMAGE_SERVICE_DLQ,
MessagingConfiguration.IMAGE_DLQ,
"cv-analysis-service-v1",
OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS)));

View File

@ -27,6 +27,8 @@ import lombok.extern.slf4j.Slf4j;
@RequiredArgsConstructor
public class ImageMessageReceiver {
public static final String IMAGE_RESPONSE_LISTENER_ID = "image-response-listener";
private final FileStatusService fileStatusService;
private final ObjectMapper objectMapper;
private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService;
@ -34,7 +36,7 @@ public class ImageMessageReceiver {
@SneakyThrows
@RabbitListener(queues = MessagingConfiguration.IMAGE_SERVICE_RESPONSE_QUEUE)
@RabbitListener(id = IMAGE_RESPONSE_LISTENER_ID)
public void receive(Message message) {
JsonNode imageResponse = objectMapper.readTree(message.getBody());
@ -46,12 +48,11 @@ public class ImageMessageReceiver {
fileStatusService.setStatusAnalyse(dossierId, fileId, false);
log.info("Received message from {} for dossierId {} and fileId {}", MessagingConfiguration.IMAGE_SERVICE_RESPONSE_QUEUE, dossierId, fileId);
log.info("Received message from exchange {} for dossierId {} and fileId {}", MessagingConfiguration.IMAGE_RESPONSE_EXCHANGE, dossierId, fileId);
}
@RabbitListener(queues = MessagingConfiguration.IMAGE_SERVICE_DLQ)
@RabbitListener(queues = MessagingConfiguration.IMAGE_DLQ)
public void handleDLQMessage(Message failedMessage) throws IOException {
HashMap<String, Object> imageResponse = objectMapper.readValue(failedMessage.getBody(), new TypeReference<>() {
@ -60,11 +61,11 @@ public class ImageMessageReceiver {
String fileId = (String) imageResponse.get("fileId");
addFileIdToTrace(fileId);
log.warn("Received message from {} for dossierId {} and fileId {}", MessagingConfiguration.IMAGE_SERVICE_DLQ, dossierId, fileId);
log.warn("Received message from {} for dossierId {} and fileId {}", MessagingConfiguration.IMAGE_DLQ, dossierId, fileId);
fileStatusProcessingUpdateService.analysisFailed(dossierId,
fileId,
new FileErrorInfo("image analysis failed",
MessagingConfiguration.IMAGE_SERVICE_DLQ,
MessagingConfiguration.IMAGE_DLQ,
"image-service-v3",
OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS)));

View File

@ -1,6 +1,6 @@
package com.iqser.red.service.persistence.management.v1.processor.service.queue;
import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.LAYOUT_PARSING_DLQ;
import static com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames.LAYOUT_PARSING_DLQ;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
@ -35,6 +35,8 @@ import lombok.extern.slf4j.Slf4j;
@RequiredArgsConstructor
public class LayoutParsingFinishedMessageReceiver {
public static final String LAYOUT_PARSING_RESPONSE_LISTENER_ID = "layout-parsing-response-listener";
private final FileStatusService fileStatusService;
private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService;
private final ObjectMapper objectMapper;
@ -45,12 +47,12 @@ public class LayoutParsingFinishedMessageReceiver {
@SneakyThrows
@RabbitListener(queues = LayoutParsingQueueNames.LAYOUT_PARSING_FINISHED_EVENT_QUEUE)
@RabbitListener(id = LAYOUT_PARSING_RESPONSE_LISTENER_ID)
public void receive(LayoutParsingFinishedEvent response) {
var dossierId = QueueMessageIdentifierService.parseDossierId(response.identifier());
var fileId = QueueMessageIdentifierService.parseFileId(response.identifier());
log.info("Layout parsing has finished for {}/{} in {}", dossierId, fileId, LayoutParsingQueueNames.LAYOUT_PARSING_FINISHED_EVENT_QUEUE);
log.info("Layout parsing has finished for {}/{} in {}", dossierId, fileId, LayoutParsingQueueNames.LAYOUT_PARSING_RESPONSE_EXCHANGE);
if (saasMigrationStatusPersistenceService.isMigrating(QueueMessageIdentifierService.parseFileId(response.identifier()))) {
saasMigrationService.handleLayoutParsingFinished(QueueMessageIdentifierService.parseDossierId(response.identifier()),
QueueMessageIdentifierService.parseFileId(response.identifier()));
@ -74,6 +76,7 @@ public class LayoutParsingFinishedMessageReceiver {
throw e;
}
log.info("Received message {} from exchange {}", response, LayoutParsingQueueNames.LAYOUT_PARSING_RESPONSE_EXCHANGE);
}
@ -91,7 +94,7 @@ public class LayoutParsingFinishedMessageReceiver {
saasMigrationService.handleError(QueueMessageIdentifierService.parseDossierId(analyzeRequest.identifier()),
QueueMessageIdentifierService.parseFileId(analyzeRequest.identifier()),
errorCause,
LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_QUEUE);
LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_EXCHANGE);
return;
}

View File

@ -29,6 +29,8 @@ import lombok.extern.slf4j.Slf4j;
@RequiredArgsConstructor
public class NerMessageReceiver {
public static final String ENTITY_RESPONSE_LISTENER_ID = "entity-response-listener";
private final FileStatusService fileStatusService;
private final ObjectMapper objectMapper;
private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService;
@ -36,7 +38,7 @@ public class NerMessageReceiver {
@SneakyThrows
@RabbitListener(queues = MessagingConfiguration.NER_SERVICE_RESPONSE_QUEUE)
@RabbitListener(id = ENTITY_RESPONSE_LISTENER_ID)
public void receive(Message message) {
HashMap<String, Object> entityResponse = objectMapper.readValue(message.getBody(), new TypeReference<>() {
@ -46,7 +48,7 @@ public class NerMessageReceiver {
String fileId = (String) entityResponse.get("fileId");
addFileIdToTrace(fileId);
log.info("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.NER_SERVICE_RESPONSE_QUEUE, dossierId, fileId);
log.info("Received message from exchange {} for dossierId {} and fileId {}", MessagingConfiguration.ENTITY_RESPONSE_EXCHANGE, dossierId, fileId);
fileStatusService.setStatusAnalyse(dossierId, fileId, false);
}
@ -64,7 +66,7 @@ public class NerMessageReceiver {
}
@RabbitListener(queues = MessagingConfiguration.NER_SERVICE_DLQ)
@RabbitListener(queues = MessagingConfiguration.ENTITY_DLQ)
public void handleDLQMessage(Message failedMessage) throws IOException {
HashMap<String, Object> entityResponse = objectMapper.readValue(failedMessage.getBody(), new TypeReference<>() {
@ -73,11 +75,11 @@ public class NerMessageReceiver {
String fileId = (String) entityResponse.get("fileId");
addFileIdToTrace(fileId);
log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.NER_SERVICE_DLQ, dossierId, fileId);
log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.ENTITY_DLQ, dossierId, fileId);
fileStatusProcessingUpdateService.analysisFailed(dossierId,
fileId,
new FileErrorInfo("ner service failed",
MessagingConfiguration.NER_SERVICE_DLQ,
MessagingConfiguration.ENTITY_DLQ,
"entity-recognition-service-v3",
OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS)));
}
@ -87,7 +89,7 @@ public class NerMessageReceiver {
public void handleLLMDLQMessage(Message failedMessage) throws IOException {
LlmNerMessage entityResponse = objectMapper.readValue(failedMessage.getBody(), LlmNerMessage.class);
String dossierId = QueueMessageIdentifierService.parseDossierId(entityResponse.getIdentifier());
String fileId = QueueMessageIdentifierService.parseDossierId(entityResponse.getIdentifier());

View File

@ -29,6 +29,9 @@ import lombok.extern.slf4j.Slf4j;
@RequiredArgsConstructor
public class OCRProcessingMessageReceiver {
public static final String OCR_RESPONSE_LISTENER_ID = "ocr-response-listener";
public static final String OCR_STATUS_UPDATE_LISTENER_ID = "ocr-status-update-listener";
private final ObjectMapper objectMapper;
private final FileStatusService fileStatusService;
private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService;
@ -36,7 +39,8 @@ public class OCRProcessingMessageReceiver {
@SneakyThrows
@RabbitListener(queues = MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE)
@RabbitHandler
@RabbitListener(id = OCR_STATUS_UPDATE_LISTENER_ID)
public void handleOCRStatusUpdateMessage(OCRStatusUpdateResponse response) {
var fileModel = fileStatusService.getStatus(response.getFileId());
@ -53,22 +57,23 @@ public class OCRProcessingMessageReceiver {
response.getNumberOfOCRedPages());
}
log.debug("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE);
log.debug("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_EXCHANGE);
}
@SneakyThrows
@RabbitListener(queues = MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_DQL)
@RabbitHandler
@RabbitListener(queues = MessagingConfiguration.OCR_STATUS_UPDATE_DLQ)
public void handleOCRStatusUpdateDLQMessage(Message failedMessage) {
var response = objectMapper.readValue(failedMessage.getBody(), OCRStatusUpdateResponse.class);
log.debug("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_DQL);
log.debug("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_DLQ);
}
@RabbitHandler
@RabbitListener(queues = MessagingConfiguration.OCR_RESPONSE_QUEUE)
@RabbitListener(id = OCR_RESPONSE_LISTENER_ID)
public void handleOCRResponseMessage(Message successMessage) throws IOException {
DocumentRequest ocrResponseMessage = objectMapper.readValue(successMessage.getBody(), DocumentRequest.class);
@ -80,7 +85,7 @@ public class OCRProcessingMessageReceiver {
@RabbitHandler
@RabbitListener(queues = MessagingConfiguration.OCR_DLQ)
public void handleOCRDQLMessage(Message failedMessage) throws IOException {
public void handleOCRDLQMessage(Message failedMessage) throws IOException {
DocumentRequest ocrRequestMessage = objectMapper.readValue(failedMessage.getBody(), DocumentRequest.class);

View File

@ -11,7 +11,6 @@ import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
import com.iqser.red.service.persistence.management.v1.processor.entity.annotations.ManualRedactionEntryEntity;
import com.iqser.red.service.persistence.management.v1.processor.entity.annotations.ManualResizeRedactionEntity;
import com.iqser.red.service.persistence.management.v1.processor.entity.annotations.RectangleEntity;
@ -33,6 +32,8 @@ import lombok.extern.slf4j.Slf4j;
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class RedactionAnalysisResponseReceiver {
public static final String REDACTION_RESPONSE_LISTENER_ID = "redaction-response-listener";
ObjectMapper objectMapper;
AddRedactionPersistenceService addRedactionPersistenceService;
ResizeRedactionPersistenceService resizeRedactionPersistenceService;
@ -40,7 +41,7 @@ public class RedactionAnalysisResponseReceiver {
@SneakyThrows
@RabbitHandler
@RabbitListener(queues = MessagingConfiguration.REDACTION_ANALYSIS_RESPONSE_QUEUE)
@RabbitListener(id = REDACTION_RESPONSE_LISTENER_ID)
public void receive(Message message) throws JsonProcessingException {
AnalyzeResponse analyzeResponse = objectMapper.readValue(message.getBody(), new TypeReference<>() {

View File

@ -1,7 +1,7 @@
package com.iqser.red.service.persistence.management.v1.processor.service.queue;
import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.MIGRATION_DLQ;
import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.MIGRATION_QUEUE;
import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.MIGRATION_REQUEST_QUEUE;
import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.MIGRATION_RESPONSE_QUEUE;
import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.X_ERROR_INFO_HEADER;
@ -46,7 +46,7 @@ public class RedactionServiceSaasMigrationMessageReceiver {
if (errorCause == null) {
errorCause = "Error occured during entityLog migration!";
}
saasMigrationService.handleError(migrationRequest.getDossierId(), migrationRequest.getFileId(), errorCause, MIGRATION_QUEUE);
saasMigrationService.handleError(migrationRequest.getDossierId(), migrationRequest.getFileId(), errorCause, MIGRATION_REQUEST_QUEUE);
}

View File

@ -0,0 +1,166 @@
package com.iqser.red.service.persistence.management.v1.processor.service.queue;
import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.*;
import static com.iqser.red.service.persistence.management.v1.processor.dataexchange.ExportDownloadMessageReceiver.EXPORT_DOWNLOAD_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.download.DownloadCompressionMessageReceiver.DOWNLOAD_COMPRESSION_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.download.DownloadMessageReceiver.DOWNLOAD_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.download.DownloadReportMessageReceiver.REPORT_RESPONSE_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.download.RedactionResultMessageReceiver.PDFTRON_RESPONSE_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.AnalysisFlagsCalculationMessageReceiver.ANALYSIS_FLAG_CALCULATION_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.AzureNerMessageReceiver.AZURE_ENTITY_RESPONSE_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.CvAnalysisMessageReceiver.CV_ANALYSIS_RESPONSE_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.ImageMessageReceiver.IMAGE_RESPONSE_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.LayoutParsingFinishedMessageReceiver.LAYOUT_PARSING_RESPONSE_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.NerMessageReceiver.ENTITY_RESPONSE_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.OCRProcessingMessageReceiver.OCR_RESPONSE_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.OCRProcessingMessageReceiver.OCR_STATUS_UPDATE_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.RedactionAnalysisResponseReceiver.REDACTION_RESPONSE_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.VisualLayoutParsingMessageReceiver.VISUAL_LAYOUT_PARSING_RESPONSE_LISTENER_ID;
import java.util.Map;
import java.util.Set;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames;
import com.knecon.fforesight.tenantcommons.TenantProvider;
import com.knecon.fforesight.tenantcommons.model.TenantCreatedEvent;
import com.knecon.fforesight.tenantcommons.model.TenantQueueConfiguration;
import com.knecon.fforesight.tenantcommons.model.TenantResponse;
import com.knecon.fforesight.tenantcommons.queue.RabbitQueueFromExchangeService;
import com.knecon.fforesight.tenantcommons.queue.TenantExchangeMessageReceiver;
@Service
public class TenantExchangeMessageReceiverImpl extends TenantExchangeMessageReceiver {
public TenantExchangeMessageReceiverImpl(RabbitQueueFromExchangeService rabbitQueueService, TenantProvider tenantProvider) {
super(rabbitQueueService, tenantProvider);
}
@Override
protected Set<TenantQueueConfiguration> getTenantQueueConfigs() {
return Set.of(TenantQueueConfiguration.builder()
.listenerId(DOWNLOAD_LISTENER_ID)
.exchangeName(DOWNLOAD_EXCHANGE)
.queuePrefix(DOWNLOAD_QUEUE_PREFIX)
.dlqName(DOWNLOAD_DLQ)
.build(),
TenantQueueConfiguration.builder()
.listenerId(DOWNLOAD_COMPRESSION_LISTENER_ID)
.exchangeName(DOWNLOAD_COMPRESSION_EXCHANGE)
.queuePrefix(DOWNLOAD_COMPRESSION_QUEUE_PREFIX)
.dlqName(DOWNLOAD_COMPRESSION_DLQ)
.build(),
TenantQueueConfiguration.builder()
.listenerId(EXPORT_DOWNLOAD_LISTENER_ID)
.exchangeName(EXPORT_DOWNLOAD_EXCHANGE)
.queuePrefix(EXPORT_DOWNLOAD_QUEUE_PREFIX)
.dlqName(EXPORT_DOWNLOAD_DLQ)
.build(),
TenantQueueConfiguration.builder()
.listenerId(ANALYSIS_FLAG_CALCULATION_LISTENER_ID)
.exchangeName(ANALYSIS_FLAG_CALCULATION_EXCHANGE)
.queuePrefix(ANALYSIS_FLAG_CALCULATION_QUEUE_PREFIX)
.build(),
TenantQueueConfiguration.builder()
.listenerId(REDACTION_RESPONSE_LISTENER_ID)
.exchangeName(REDACTION_RESPONSE_EXCHANGE)
.queuePrefix(REDACTION_RESPONSE_QUEUE_PREFIX)
.dlqName(REDACTION_DLQ)
.arguments(Map.of("x-max-priority", 2))
.build(),
TenantQueueConfiguration.builder()
.listenerId(PDFTRON_RESPONSE_LISTENER_ID)
.exchangeName(PDFTRON_RESPONSE_EXCHANGE)
.queuePrefix(PDFTRON_RESPONSE_QUEUE_PREFIX)
.dlqName(PDFTRON_DLQ)
.arguments(Map.of("x-max-priority", 2))
.build(),
TenantQueueConfiguration.builder()
.listenerId(REPORT_RESPONSE_LISTENER_ID)
.exchangeName(REPORT_RESPONSE_EXCHANGE)
.queuePrefix(REPORT_RESPONSE_QUEUE_PREFIX)
.dlqName(REPORT_RESPONSE_DLQ)
.build(),
TenantQueueConfiguration.builder()
.listenerId(OCR_RESPONSE_LISTENER_ID)
.exchangeName(OCR_RESPONSE_EXCHANGE)
.queuePrefix(OCR_RESPONSE_QUEUE_PREFIX)
.build(),
TenantQueueConfiguration.builder()
.listenerId(OCR_STATUS_UPDATE_LISTENER_ID)
.exchangeName(OCR_STATUS_UPDATE_EXCHANGE)
.queuePrefix(OCR_STATUS_UPDATE_QUEUE_PREFIX)
.dlqName(OCR_STATUS_UPDATE_DLQ)
.build(),
TenantQueueConfiguration.builder()
.listenerId(IMAGE_RESPONSE_LISTENER_ID)
.exchangeName(IMAGE_RESPONSE_EXCHANGE)
.queuePrefix(IMAGE_RESPONSE_QUEUE_PREFIX)
.dlqName(IMAGE_DLQ)
.build(),
TenantQueueConfiguration.builder()
.listenerId(ENTITY_RESPONSE_LISTENER_ID)
.exchangeName(ENTITY_RESPONSE_EXCHANGE)
.queuePrefix(ENTITY_RESPONSE_QUEUE_PREFIX)
.dlqName(ENTITY_DLQ)
.build(),
TenantQueueConfiguration.builder()
.listenerId(AZURE_ENTITY_RESPONSE_LISTENER_ID)
.exchangeName(AZURE_ENTITY_RESPONSE_EXCHANGE)
.queuePrefix(AZURE_ENTITY_RESPONSE_QUEUE_PREFIX)
.dlqName(AZURE_ENTITY_DLQ)
.build(),
TenantQueueConfiguration.builder()
.listenerId(CV_ANALYSIS_RESPONSE_LISTENER_ID)
.exchangeName(CV_ANALYSIS_RESPONSE_EXCHANGE)
.queuePrefix(CV_ANALYSIS_RESPONSE_QUEUE_PREFIX)
.dlqName(CV_ANALYSIS_DLQ)
.build(),
TenantQueueConfiguration.builder()
.listenerId(VISUAL_LAYOUT_PARSING_RESPONSE_LISTENER_ID)
.exchangeName(VISUAL_LAYOUT_PARSING_RESPONSE_EXCHANGE)
.queuePrefix(VISUAL_LAYOUT_PARSING_RESPONSE_QUEUE_PREFIX)
.dlqName(VISUAL_LAYOUT_PARSING_DLQ)
.build(),
TenantQueueConfiguration.builder()
.listenerId(LAYOUT_PARSING_RESPONSE_LISTENER_ID)
.exchangeName(LayoutParsingQueueNames.LAYOUT_PARSING_RESPONSE_EXCHANGE)
.queuePrefix(LayoutParsingQueueNames.LAYOUT_PARSING_RESPONSE_QUEUE_PREFIX)
.build());
}
@EventListener(ApplicationReadyEvent.class)
public void onApplicationReady() {
System.out.println("application ready invoked");
super.initializeQueues();
}
@RabbitHandler
@RabbitListener(queues = "#{tenantMessagingConfigurationImpl.getTenantCreatedQueueName()}")
public void reactToTenantCreation(TenantCreatedEvent tenantCreatedEvent) {
super.reactToTenantCreation(tenantCreatedEvent);
}
@RabbitHandler
@RabbitListener(queues = "#{tenantMessagingConfigurationImpl.getTenantDeletedQueueName()}")
public void reactToTenantDeletion(TenantResponse tenantResponse) {
super.reactToTenantDeletion(tenantResponse);
}
}

View File

@ -23,32 +23,37 @@ import lombok.extern.slf4j.Slf4j;
@RequiredArgsConstructor
public class VisualLayoutParsingMessageReceiver {
public static final String VISUAL_LAYOUT_PARSING_RESPONSE_LISTENER_ID = "visual-layout-parsing-response-listener";
private final ObjectMapper objectMapper;
private final FileStatusService fileStatusService;
private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService;
@SneakyThrows
@RabbitListener(queues = MessagingConfiguration.VISUAL_LAYOUT_RESPONSE_QUEUE)
@RabbitListener(id = VISUAL_LAYOUT_PARSING_RESPONSE_LISTENER_ID)
public void receive(VisualLayoutParsingServiceResponse response) {
fileStatusService.setStatusAnalyse(response.getDossierId(), response.getFileId(), false);
log.info("Received message in {} for dossierId {} and fileId {}", MessagingConfiguration.VISUAL_LAYOUT_RESPONSE_QUEUE, response.getDossierId(), response.getFileId());
log.info("Received message from exchange {} for dossierId {} and fileId {}",
MessagingConfiguration.VISUAL_LAYOUT_PARSING_RESPONSE_EXCHANGE,
response.getDossierId(),
response.getFileId());
}
@SneakyThrows
@RabbitListener(queues = MessagingConfiguration.VISUAL_LAYOUT_DLQ)
@RabbitListener(queues = MessagingConfiguration.VISUAL_LAYOUT_PARSING_DLQ)
public void handleDLQMessage(Message failedMessage) {
var response = objectMapper.readValue(failedMessage.getBody(), VisualLayoutParsingServiceResponse.class);
log.warn("Received message from {} for dossierId {} and fileId {}", MessagingConfiguration.VISUAL_LAYOUT_DLQ, response.getDossierId(), response.getFileId());
log.warn("Received message from {} for dossierId {} and fileId {}", MessagingConfiguration.VISUAL_LAYOUT_PARSING_DLQ, response.getDossierId(), response.getFileId());
fileStatusProcessingUpdateService.analysisFailed(response.getDossierId(),
response.getFileId(),
new FileErrorInfo("table extractor failed",
MessagingConfiguration.VISUAL_LAYOUT_DLQ,
MessagingConfiguration.VISUAL_LAYOUT_PARSING_DLQ,
"table-extractor",
OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS)));

View File

@ -23,7 +23,9 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.quartz.Scheduler;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
@ -245,7 +247,9 @@ public abstract class AbstractPersistenceServerServiceTest {
@Autowired
protected PrometheusMeterRegistry prometheusMeterRegistry;
@MockBean
private AmqpAdmin amqpAdmin;
private RabbitAdmin rabbitAdmin;
@MockBean
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
@Autowired
private DossierStatusRepository dossierStatusRepository;
@Autowired
@ -351,7 +355,7 @@ public abstract class AbstractPersistenceServerServiceTest {
when(usersClient.getAllUsers(true)).thenReturn(allUsers);
// doNothing().when(pdfTronRedactionClient).testDigitalCurrentSignature(Mockito.any());
when(amqpAdmin.getQueueInfo(Mockito.any())).thenReturn(null);
when(rabbitAdmin.getQueueInfo(Mockito.any())).thenReturn(null);
when(entityLogService.getEntityLog(Mockito.any(), Mockito.any())).thenReturn(new EntityLog(1, 1, Lists.newArrayList(), null, 0, 0, 0, 0));
when(entityLogService.getEntityLog(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyBoolean())).thenReturn(new EntityLog(1,
1,
@ -590,7 +594,8 @@ public abstract class AbstractPersistenceServerServiceTest {
"MONGODB_USER=" + MONGO_USERNAME,
"MONGODB_PASSWORD=" + MONGO_PASSWORD,
"fforesight.jobs.enabled=false",
"fforesight.keycloak.enabled=false").applyTo(configurableApplicationContext.getEnvironment());
"fforesight.keycloak.enabled=false",
"POD_NAME=persistence-service").applyTo(configurableApplicationContext.getEnvironment());
}

View File

@ -9,7 +9,7 @@ dependencies {
api(project(":persistence-service-shared-api-v1"))
api("com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.17.2")
api("com.google.guava:guava:31.1-jre")
api("com.knecon.fforesight:mongo-database-commons:0.10.0")
api("com.knecon.fforesight:mongo-database-commons:0.11.0")
api("org.springframework.boot:spring-boot-starter-data-mongodb:${springBootStarterVersion}")
api("org.springframework.boot:spring-boot-starter-validation:3.1.3")
testImplementation("com.iqser.red.commons:test-commons:2.1.0")