Merge branch 'tenants-retry' into 'master'

Tenant retry and queue renames

See merge request redactmanager/persistence-service!710
This commit is contained in:
Maverick Studer 2024-08-29 15:11:04 +02:00
commit ff04d2a5ef
17 changed files with 285 additions and 233 deletions

View File

@ -30,7 +30,7 @@ dependencies {
exclude(group = "com.iqser.red.service", module = "persistence-service-internal-api-v1")
exclude(group = "com.iqser.red.service", module = "persistence-service-shared-api-v1")
}
api("com.knecon.fforesight:layoutparser-service-internal-api:0.161.0") {
api("com.knecon.fforesight:layoutparser-service-internal-api:0.163.0") {
exclude(group = "com.iqser.red.service", module = "persistence-service-internal-api-v1")
exclude(group = "com.iqser.red.service", module = "persistence-service-shared-api-v1")
}
@ -42,10 +42,15 @@ dependencies {
exclude(group = "com.iqser.red.service", module = "persistence-service-internal-api-v1")
exclude(group = "com.iqser.red.service", module = "persistence-service-shared-api-v1")
}
implementation("com.knecon.fforesight:llm-service-api:1.10.0")
implementation("com.knecon.fforesight:llm-service-api:1.13.0")
api("com.knecon.fforesight:jobs-commons:0.10.0")
api("com.knecon.fforesight:database-tenant-commons:0.24.0")
api("com.knecon.fforesight:keycloak-commons:0.30.0")
api("com.knecon.fforesight:tenant-commons:0.29.0")
api("com.knecon.fforesight:database-tenant-commons:0.24.0") {
exclude(group = "com.knecon.fforesight", module = "tenant-commons")
}
api("com.knecon.fforesight:keycloak-commons:0.30.0") {
exclude(group = "com.knecon.fforesight", module = "tenant-commons")
}
api("com.knecon.fforesight:tracing-commons:0.5.0")
api("com.knecon.fforesight:swagger-commons:0.7.0")
api("com.giffing.bucket4j.spring.boot.starter:bucket4j-spring-boot-starter:0.4.0")

View File

@ -18,107 +18,114 @@ import lombok.RequiredArgsConstructor;
public class MessagingConfiguration {
// persistence-service
public static final String DOWNLOAD_QUEUE_PREFIX = "download_queue";
public static final String DOWNLOAD_EXCHANGE = "download_exchange";
public static final String DOWNLOAD_DLQ = "download_dlq";
public static final String DOWNLOAD_REQUEST_QUEUE_PREFIX = "download_request";
public static final String DOWNLOAD_REQUEST_EXCHANGE = "download_request_exchange";
public static final String DOWNLOAD_DLQ = "download_error";
public static final String DOWNLOAD_COMPRESSION_QUEUE_PREFIX = "download_compression_queue";
public static final String DOWNLOAD_COMPRESSION_EXCHANGE = "download_compression_exchange";
public static final String DOWNLOAD_COMPRESSION_DLQ = "download_compression_dlq";
public static final String DOWNLOAD_COMPRESSION_REQUEST_QUEUE_PREFIX = "download_compression_request";
public static final String DOWNLOAD_COMPRESSION_REQUEST_EXCHANGE = "download_compression_request_exchange";
public static final String DOWNLOAD_COMPRESSION_DLQ = "download_compression_error";
public static final String EXPORT_DOWNLOAD_QUEUE_PREFIX = "export_download_queue";
public static final String EXPORT_DOWNLOAD_EXCHANGE = "export_download_exchange";
public static final String EXPORT_DOWNLOAD_DLQ = "export_download_dlq";
public static final String EXPORT_DOWNLOAD_REQUEST_QUEUE_PREFIX = "export_download_request";
public static final String EXPORT_DOWNLOAD_REQUEST_EXCHANGE = "export_download_request_exchange";
public static final String EXPORT_DOWNLOAD_DLQ = "export_download_error";
public static final String ANALYSIS_FLAG_CALCULATION_QUEUE_PREFIX = "analysis_flag_calculation_queue";
public static final String ANALYSIS_FLAG_CALCULATION_EXCHANGE = "analysis_flag_calculation_exchange";
public static final String ANALYSIS_FLAG_CALCULATION_REQUEST_QUEUE_PREFIX = "analysis_flag_calculation_request";
public static final String ANALYSIS_FLAG_CALCULATION_REQUEST_EXCHANGE = "analysis_flag_calculation_request_exchange";
public static final String REDACTION_RESPONSE_QUEUE_PREFIX = "redaction_response_queue";
public static final String REDACTION_RESPONSE_QUEUE_PREFIX = "redaction_response";
public static final String REDACTION_RESPONSE_EXCHANGE = "redaction_response_exchange";
public static final String PDFTRON_RESPONSE_QUEUE_PREFIX = "pdftron_response_queue";
public static final String PDFTRON_RESPONSE_QUEUE_PREFIX = "pdftron_response";
public static final String PDFTRON_RESPONSE_EXCHANGE = "pdftron_response_exchange";
public static final String REPORT_RESPONSE_QUEUE_PREFIX = "report_response_queue";
public static final String REPORT_RESPONSE_QUEUE_PREFIX = "report_response";
public static final String REPORT_RESPONSE_EXCHANGE = "report_response_exchange";
public static final String REPORT_RESPONSE_DLQ = "report_response_dlq";
public static final String REPORT_RESPONSE_DLQ = "report_response_error";
public static final String OCR_RESPONSE_QUEUE_PREFIX = "ocr_response_queue";
public static final String OCR_RESPONSE_QUEUE_PREFIX = "ocr_response";
public static final String OCR_RESPONSE_EXCHANGE = "ocr_response_exchange";
public static final String OCR_STATUS_UPDATE_QUEUE_PREFIX = "ocr_status_update_queue";
public static final String OCR_STATUS_UPDATE_EXCHANGE = "ocr_status_update_exchange";
public static final String OCR_STATUS_UPDATE_DLQ = "ocr_status_update_dlq";
public static final String OCR_STATUS_UPDATE_RESPONSE_QUEUE_PREFIX = "ocr_status_update_response";
public static final String OCR_STATUS_UPDATE_RESPONSE_EXCHANGE = "ocr_status_update_response_exchange";
public static final String OCR_STATUS_UPDATE_DLQ = "ocr_status_update_error";
public static final String IMAGE_RESPONSE_QUEUE_PREFIX = "image_response_queue";
public static final String IMAGE_RESPONSE_QUEUE_PREFIX = "image_response";
public static final String IMAGE_RESPONSE_EXCHANGE = "image_response_exchange";
public static final String ENTITY_RESPONSE_QUEUE_PREFIX = "entity_response_queue";
public static final String ENTITY_RESPONSE_QUEUE_PREFIX = "entity_response";
public static final String ENTITY_RESPONSE_EXCHANGE = "entity_response_exchange";
public static final String AZURE_ENTITY_RESPONSE_QUEUE_PREFIX = "azure_entity_response_queue";
public static final String AZURE_ENTITY_RESPONSE_QUEUE_PREFIX = "azure_entity_response";
public static final String AZURE_ENTITY_RESPONSE_EXCHANGE = "azure_entity_response_exchange";
public static final String CHUNKING_SERVICE_QUEUE = "chunking_service_request_queue";
public static final String CHUNKING_SERVICE_RESPONSE_QUEUE = "chunking_service_response_queue";
public static final String CHUNKING_SERVICE_DLQ = "chunking_service_dlq";
public static final String CHUNKING_RESPONSE_QUEUE_PREFIX = "chunking_response";
public static final String CHUNKING_RESPONSE_EXCHANGE = "chunking_response_exchange";
public static final String CHUNKING_DLQ = "chunking_error";
public static final String LLM_NER_SERVICE_QUEUE = QueueNames.LLM_NER_SERVICE_QUEUE;
public static final String LLM_NER_SERVICE_RESPONSE_QUEUE = QueueNames.LLM_NER_SERVICE_RESPONSE_QUEUE;
public static final String LLM_NER_SERVICE_DLQ = QueueNames.LLM_NER_SERVICE_DLQ;
public static final String LLM_NER_RESPONSE_QUEUE_PREFIX = QueueNames.LLM_NER_RESPONSE_QUEUE_PREFIX;
public static final String LLM_NER_RESPONSE_EXCHANGE = QueueNames.LLM_NER_RESPONSE_EXCHANGE;
public static final String LLM_NER_DLQ = QueueNames.LLM_NER_DLQ;
public static final String CV_ANALYSIS_RESPONSE_QUEUE_PREFIX = "cv_analysis_response_queue";
public static final String CV_ANALYSIS_RESPONSE_QUEUE_PREFIX = "cv_analysis_response";
public static final String CV_ANALYSIS_RESPONSE_EXCHANGE = "cv_analysis_response_exchange";
public static final String VISUAL_LAYOUT_PARSING_RESPONSE_QUEUE_PREFIX = "visual_layout_parsing_response_queue";
public static final String VISUAL_LAYOUT_PARSING_RESPONSE_QUEUE_PREFIX = "visual_layout_parsing_response";
public static final String VISUAL_LAYOUT_PARSING_RESPONSE_EXCHANGE = "visual_layout_parsing_response_exchange";
// pdftron-redaction-service
public static final String PREPROCESSING_EXCHANGE = "preprocessing_exchange";
public static final String PREPROCESSING_DLQ = "preprocessing_dlq";
public static final String PREPROCESSING_REQUEST_EXCHANGE = "preprocessing_request_exchange";
public static final String PREPROCESSING_DLQ = "preprocessing_error";
public static final String PDFTRON_REQUEST_EXCHANGE = "pdftron_request_exchange";
public static final String PDFTRON_DLQ = "pdftron_dlq";
public static final String PDFTRON_DLQ = "pdftron_error";
// redaction-service
public static final String REDACTION_REQUEST_QUEUE_PREFIX = "redaction_request_queue";
public static final String REDACTION_REQUEST_QUEUE_PREFIX = "redaction_request";
public static final String REDACTION_REQUEST_EXCHANGE = "redaction_request_exchange";
public static final String REDACTION_PRIORITY_REQUEST_QUEUE_PREFIX = "redaction_priority_request";
public static final String REDACTION_PRIORITY_REQUEST_EXCHANGE = "redaction_priority_request_exchange";
public static final String REDACTION_DLQ = "redaction_dlq";
public static final String REDACTION_DLQ = "redaction_error";
// redaction-report-service
public static final String REPORT_REQUEST_EXCHANGE = "report_request_exchange";
public static final String REPORT_REQUEST_DLQ = "report_request_dlq";
public static final String REPORT_REQUEST_DLQ = "report_request_error";
// search-service
public static final String INDEXING_EXCHANGE = "indexing_exchange";
public static final String INDEXING_DLQ = "indexing_dlq";
public static final String INDEXING_REQUEST_EXCHANGE = "indexing_request_exchange";
public static final String INDEXING_DLQ = "indexing_error";
public static final String DELETE_FROM_INDEX_EXCHANGE = "delete_from_index_exchange";
public static final String DELETE_FROM_INDEX_DLQ = "delete_from_index_dlq";
public static final String DELETE_FROM_INDEX_REQUEST_EXCHANGE = "delete_from_index_request_exchange";
public static final String DELETE_FROM_INDEX_DLQ = "delete_from_index_error";
// ocr-services
public static final String OCR_REQUEST_EXCHANGE = "ocr_request_exchange";
public static final String OCR_DLQ = "ocr_dlq";
public static final String OCR_DLQ = "ocr_error";
// image-service
public static final String IMAGE_REQUEST_EXCHANGE = "image_request_exchange";
public static final String IMAGE_DLQ = "image_dlq";
public static final String IMAGE_DLQ = "image_error";
// entity-recognition-service
public static final String ENTITY_REQUEST_EXCHANGE = "entity_request_exchange";
public static final String ENTITY_DLQ = "entity_dlq";
public static final String ENTITY_DLQ = "entity_error";
// azure-ner-service
public static final String AZURE_ENTITY_REQUEST_EXCHANGE = "azure_entity_request_exchange";
public static final String AZURE_ENTITY_DLQ = "azure_entity_dlq";
public static final String AZURE_ENTITY_DLQ = "azure_entity_error";
// cv-analysis-service
public static final String CV_ANALYSIS_REQUEST_EXCHANGE = "cv_analysis_request_exchange";
public static final String CV_ANALYSIS_DLQ = "cv_analysis_dlq";
public static final String CV_ANALYSIS_DLQ = "cv_analysis_error";
// visual-layout-parsing-service
public static final String VISUAL_LAYOUT_PARSING_REQUEST_EXCHANGE = "visual_layout_parsing_request_exchange";
public static final String VISUAL_LAYOUT_PARSING_DLQ = "visual_layout_parsing_dlq";
public static final String VISUAL_LAYOUT_PARSING_DLQ = "visual_layout_parsing_error";
//chunking-service
public static final String CHUNKING_REQUEST_EXCHANGE = "chunking_request_exchange";
//llm-ner-service
public static final String LLM_NER_REQUEST_EXCHANGE = QueueNames.LLM_NER_REQUEST_EXCHANGE;
public static final String X_ERROR_INFO_HEADER = "x-error-message";
public static final String X_ERROR_INFO_TIMESTAMP_HEADER = "x-error-message-timestamp";
@ -133,7 +140,7 @@ public class MessagingConfiguration {
@Bean
public DirectExchange downloadExchange() {
return new DirectExchange(DOWNLOAD_EXCHANGE);
return new DirectExchange(DOWNLOAD_REQUEST_EXCHANGE);
}
@ -147,7 +154,7 @@ public class MessagingConfiguration {
@Bean
public DirectExchange downloadCompressionExchange() {
return new DirectExchange(DOWNLOAD_COMPRESSION_EXCHANGE);
return new DirectExchange(DOWNLOAD_COMPRESSION_REQUEST_EXCHANGE);
}
@ -161,7 +168,7 @@ public class MessagingConfiguration {
@Bean
public DirectExchange exportDownloadExchange() {
return new DirectExchange(EXPORT_DOWNLOAD_EXCHANGE);
return new DirectExchange(EXPORT_DOWNLOAD_REQUEST_EXCHANGE);
}
@ -175,7 +182,7 @@ public class MessagingConfiguration {
@Bean
public DirectExchange analysisFlagCalculationExchange() {
return new DirectExchange(ANALYSIS_FLAG_CALCULATION_EXCHANGE);
return new DirectExchange(ANALYSIS_FLAG_CALCULATION_REQUEST_EXCHANGE);
}
@ -194,50 +201,30 @@ public class MessagingConfiguration {
@Bean
public Queue llmNerRequestQueue() {
public DirectExchange chunkingResponseExchange() {
return QueueBuilder.durable(LLM_NER_SERVICE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", LLM_NER_SERVICE_DLQ).build();
}
@Bean
public Queue llmNerResponseQueue() {
return QueueBuilder.durable(LLM_NER_SERVICE_RESPONSE_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", LLM_NER_SERVICE_DLQ)
.build();
}
@Bean
public Queue llmNerResponseDLQ() {
return QueueBuilder.durable(LLM_NER_SERVICE_DLQ).build();
}
@Bean
public Queue chunkingRequestQueue() {
return QueueBuilder.durable(CHUNKING_SERVICE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", CHUNKING_SERVICE_DLQ).build();
}
@Bean
public Queue chunkingResponseQueue() {
return QueueBuilder.durable(CHUNKING_SERVICE_RESPONSE_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", CHUNKING_SERVICE_DLQ)
.build();
return new DirectExchange(CHUNKING_RESPONSE_EXCHANGE);
}
@Bean
public Queue chunkingDLQ() {
return QueueBuilder.durable(CHUNKING_SERVICE_DLQ).build();
return QueueBuilder.durable(CHUNKING_DLQ).build();
}
@Bean
public DirectExchange llmNerResponseExchange() {
return new DirectExchange(LLM_NER_RESPONSE_EXCHANGE);
}
@Bean
public Queue llmNerDLQ() {
return QueueBuilder.durable(LLM_NER_DLQ).build();
}
@ -268,6 +255,7 @@ public class MessagingConfiguration {
return new DirectExchange(ENTITY_RESPONSE_EXCHANGE);
}
@Bean
public DirectExchange azureEntityResponseExchange() {
@ -298,19 +286,33 @@ public class MessagingConfiguration {
// ---- pdftron-redaction-service ----
@Bean
public DirectExchange preprocessingExchange() {
public DirectExchange preprocessingRequestExchange() {
return new DirectExchange(PREPROCESSING_EXCHANGE);
return new DirectExchange(PREPROCESSING_REQUEST_EXCHANGE);
}
@Bean
public Queue preprocessingDeadLetterQueue() {
public DirectExchange preprocessingResponseExchange() {
return new DirectExchange(PREPROCESSING_REQUEST_EXCHANGE);
}
@Bean
public Queue preprocessingDLQ() {
return QueueBuilder.durable(PREPROCESSING_DLQ).build();
}
@Bean
public DirectExchange pdftronRequestExchange() {
return new DirectExchange(PDFTRON_REQUEST_EXCHANGE);
}
@Bean
public DirectExchange pdftronResponseExchange() {
@ -364,9 +366,9 @@ public class MessagingConfiguration {
// ---- search-service ----
@Bean
public DirectExchange indexingExchange() {
public DirectExchange indexingRequestExchange() {
return new DirectExchange(INDEXING_EXCHANGE);
return new DirectExchange(INDEXING_REQUEST_EXCHANGE);
}
@ -378,9 +380,9 @@ public class MessagingConfiguration {
@Bean
public DirectExchange deleteFromIndexExchange() {
public DirectExchange deleteFromIndexRequestExchange() {
return new DirectExchange(DELETE_FROM_INDEX_EXCHANGE);
return new DirectExchange(DELETE_FROM_INDEX_REQUEST_EXCHANGE);
}
@ -407,9 +409,9 @@ public class MessagingConfiguration {
@Bean
public DirectExchange ocrStatusUpdateExchange() {
public DirectExchange ocrStatusUpdateResponseExchange() {
return new DirectExchange(OCR_STATUS_UPDATE_EXCHANGE);
return new DirectExchange(OCR_STATUS_UPDATE_RESPONSE_EXCHANGE);
}
@ -448,7 +450,10 @@ public class MessagingConfiguration {
return QueueBuilder.durable(ENTITY_DLQ).build();
}
// ---- azure-ner-service ----
@Bean
public DirectExchange azureNerRequestExchange() {
@ -508,6 +513,22 @@ public class MessagingConfiguration {
}
// ---- chunking-service ----
@Bean
public DirectExchange chunkingRequestExchange() {
return new DirectExchange(CHUNKING_REQUEST_EXCHANGE);
}
// ----- llm-ner-service ----
@Bean
public DirectExchange llmNerRequestExchange() {
return new DirectExchange(LLM_NER_REQUEST_EXCHANGE);
}
// ---- Saas Migration ----
@Bean
public Queue migrationQueue() {

View File

@ -16,13 +16,13 @@ import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class UserMessagingConfiguration {
public static final String PERSISTENCE_SERVICE_USER_CREATED_QUEUE = "persistence-service-user-created-queue";
public static final String PERSISTENCE_SERVICE_USER_DELETED_QUEUE = "persistence-service-user-deleted-queue";
public static final String PERSISTENCE_SERVICE_USER_UPDATED_QUEUE = "persistence-service-user-updated-queue";
public static final String PERSISTENCE_SERVICE_USER_STATUS_CHANGED_QUEUE = "persistence-service-user-status-changed-queue";
public static final String PERSISTENCE_SERVICE_USER_ROLES_UPDATED_QUEUE = "persistence-service-user-roles-updated-queue";
public static final String PERSISTENCE_SERVICE_USER_OWN_PROFILE_UPDATED_QUEUE = "persistence-service-user-own-profile-updated-queue";
public static final String PERSISTENCE_SERVICE_USER_EVENTS_DLQ = "persistence-service-user-events-dlq";
public static final String PERSISTENCE_SERVICE_USER_CREATED_QUEUE = "persistence-service-user-created";
public static final String PERSISTENCE_SERVICE_USER_DELETED_QUEUE = "persistence-service-user-deleted";
public static final String PERSISTENCE_SERVICE_USER_UPDATED_QUEUE = "persistence-service-user-updated";
public static final String PERSISTENCE_SERVICE_USER_STATUS_CHANGED_QUEUE = "persistence-service-user-status-changed";
public static final String PERSISTENCE_SERVICE_USER_ROLES_UPDATED_QUEUE = "persistence-service-user-roles-updated";
public static final String PERSISTENCE_SERVICE_USER_OWN_PROFILE_UPDATED_QUEUE = "persistence-service-user-own-profile-updated";
public static final String PERSISTENCE_SERVICE_USER_EVENTS_DLQ = "persistence-service-user-events-error";
@Bean("persistenceServiceUserRolesUpdatedQueue")

View File

@ -119,7 +119,7 @@ public class DossierTemplateExportService {
private void addToExportDownloadQueue(ExportDownloadMessage downloadJob, int priority) {
rabbitTemplate.convertAndSend(MessagingConfiguration.EXPORT_DOWNLOAD_EXCHANGE, TenantContext.getTenantId(), downloadJob, message -> {
rabbitTemplate.convertAndSend(MessagingConfiguration.EXPORT_DOWNLOAD_REQUEST_EXCHANGE, TenantContext.getTenantId(), downloadJob, message -> {
message.getMessageProperties().setPriority(priority);
return message;
});

View File

@ -75,7 +75,7 @@ public class FileExchangeExportService {
private void addToExportDownloadQueue(ExportDownloadMessage downloadJob) {
rabbitTemplate.convertAndSend(MessagingConfiguration.EXPORT_DOWNLOAD_EXCHANGE, TenantContext.getTenantId(), downloadJob, message -> {
rabbitTemplate.convertAndSend(MessagingConfiguration.EXPORT_DOWNLOAD_REQUEST_EXCHANGE, TenantContext.getTenantId(), downloadJob, message -> {
message.getMessageProperties().setPriority(1);
return message;
});

View File

@ -10,92 +10,99 @@ import com.iqser.red.service.persistence.management.v1.processor.migration.Migra
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Setter
@Service
public class QueueRenameMigration23 extends Migration {
private final AmqpAdmin amqpAdmin;
private static final String NAME = "Migration for renaming of most queues";
private static final long VERSION = 23;
private static final List<String> queueNames = List.of("analysis_flag_calculation_queue",
"cv_analysis_dead_letter_queue",
"cv_analysis_request_queue",
"cv_analysis_response_queue",
"deleteFromIndexDLQ",
"deleteFromIndexQueue",
"downloadDLQ",
"downloadQueue",
"download_compression_dlq",
"download_compression_queue",
"entity_dead_letter_queue",
"entity_request_queue",
"entity_response_queue",
"exportDownloadDLQ",
"exportDownloadQueue",
"image_dead_letter_queue",
"image_request_queue",
"image_response_queue",
"indexingDQL",
"indexingQueue",
"layout_parsing_dead_letter_queue",
"layout_parsing_request_queue",
"layout_parsing_response_queue",
"migrationDLQ",
"migrationQueue",
"migrationResponseQueue",
"mongo-tenant-created",
"mongo-tenant-created-dlq",
"ocr_dead_letter_queue",
"ocr_request_queue",
"ocr_response_queue",
"ocr_status_update_dead_letter_queue",
"ocr_status_update_response_queue",
"pdftron_dlq",
"pdftron_queue",
"pdftron_result_queue",
"persistence-service-user-created-queue",
"persistence-service-user-deleted-queue",
"persistence-service-user-events-dql",
"persistence-service-user-own-profile-updated-queue",
"persistence-service-user-roles-updated-queue",
"persistence-service-user-status-changed-queue",
"persistence-service-user-updated-queue",
"preprocessingDLQ",
"preprocessingQueue",
"redactionAnalysisResponseQueue",
"redactionDQL",
"redactionPriorityQueue",
"redactionQueue",
"reportDLQ",
"reportQueue",
"reportResultDLQ",
"reportResultQueue",
"tenant-created",
"tenant-created-dlq",
"tenant-delete-dlq",
"tenant-delete-queue",
"tenant-sync",
"tenant-sync-dlq",
"visual_layout_parsing_service_dead_letter_queue",
"visual_layout_parsing_service_queue",
"visual_layout_parsing_service_response_queue");
public QueueRenameMigration23(AmqpAdmin amqpAdmin) {
super(NAME, VERSION);
this.amqpAdmin = amqpAdmin;
}
@Override
protected void migrate() {
log.info("Migration: Deleting queues...");
queueNames.forEach(amqpAdmin::deleteQueue);
}
}
// TODO: run in a migration after 4.2 -> 4.3
//
//@Slf4j
//@Setter
//@Service
//public class QueueRenameMigration23 extends Migration {
//
// private final AmqpAdmin amqpAdmin;
//
// private static final String NAME = "Migration for renaming of most queues";
// private static final long VERSION = 23;
// private static final List<String> queueNames = List.of("analysis_flag_calculation_queue",
// "cv_analysis_dead_letter_queue",
// "cv_analysis_request_queue",
// "cv_analysis_response_queue",
// "deleteFromIndexDLQ",
// "deleteFromIndexQueue",
// "downloadDLQ",
// "downloadQueue",
// "download_compression_dlq",
// "download_compression_queue",
// "entity_dead_letter_queue",
// "entity_request_queue",
// "entity_response_queue",
// "exportDownloadDLQ",
// "exportDownloadQueue",
// "image_dead_letter_queue",
// "image_request_queue",
// "image_response_queue",
// "indexingDQL",
// "indexingQueue",
// "layout_parsing_dead_letter_queue",
// "layout_parsing_request_queue",
// "layout_parsing_response_queue",
// "migrationDLQ",
// "migrationQueue",
// "migrationResponseQueue",
// "mongo-tenant-created",
// "mongo-tenant-created-dlq",
// "ocr_dead_letter_queue",
// "ocr_request_queue",
// "ocr_response_queue",
// "ocr_status_update_dead_letter_queue",
// "ocr_status_update_response_queue",
// "pdftron_dlq",
// "pdftron_queue",
// "pdftron_result_queue",
// "persistence-service-user-created-queue",
// "persistence-service-user-deleted-queue",
// "persistence-service-user-events-dql",
// "persistence-service-user-own-profile-updated-queue",
// "persistence-service-user-roles-updated-queue",
// "persistence-service-user-status-changed-queue",
// "persistence-service-user-updated-queue",
// "preprocessingDLQ",
// "preprocessingQueue",
// "redactionAnalysisResponseQueue",
// "redactionDQL",
// "redactionPriorityQueue",
// "redactionQueue",
// "reportDLQ",
// "reportQueue",
// "reportResultDLQ",
// "reportResultQueue",
// "tenant-created",
// "tenant-created-dlq",
// "tenant-delete-dlq",
// "tenant-delete-queue",
// "tenant-updated-queue",
// "tenant-updated-dlq",
// "tenant-sync",
// "tenant-sync-dlq",
// "visual_layout_parsing_service_dead_letter_queue",
// "visual_layout_parsing_service_queue",
// "visual_layout_parsing_service_response_queue");
//
//
// public QueueRenameMigration23(AmqpAdmin amqpAdmin) {
//
// super(NAME, VERSION);
// this.amqpAdmin = amqpAdmin;
// }
//
//
// @Override
// protected void migrate() {
//
// log.info("Migration: Deleting queues...");
// for (String queueName : queueNames) {
// log.info("Migration: Deleting queue {}", queueName);
// amqpAdmin.deleteQueue(queueName);
// }
//
// }
//
//}

View File

@ -140,7 +140,7 @@ public class DownloadService {
private void addToDownloadQueue(DownloadJob downloadJob, int priority) {
rabbitTemplate.convertAndSend(MessagingConfiguration.DOWNLOAD_EXCHANGE, TenantContext.getTenantId(), downloadJob, message -> {
rabbitTemplate.convertAndSend(MessagingConfiguration.DOWNLOAD_REQUEST_EXCHANGE, TenantContext.getTenantId(), downloadJob, message -> {
message.getMessageProperties().setPriority(priority);
return message;
});

View File

@ -1,6 +1,6 @@
package com.iqser.red.service.persistence.management.v1.processor.service;
import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.CHUNKING_SERVICE_QUEUE;
import static com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration.CHUNKING_REQUEST_EXCHANGE;
import java.time.OffsetDateTime;
import java.util.ArrayList;
@ -299,14 +299,14 @@ public class FileStatusService {
if (settings.isLlmNerServiceEnabled() && !fileManagementStorageService.objectExists(dossierId, fileId, FileType.DOCUMENT_CHUNKS)) {
var chunkingRequest = ChunkingRequestFactory.createChunkingRequest(dossierId, fileId);
setStatusFullProcessing(fileId);
log.info("Sending request to {} for {}/{}", CHUNKING_SERVICE_QUEUE, dossierId, fileId);
rabbitTemplate.convertAndSend(CHUNKING_SERVICE_QUEUE, chunkingRequest);
log.info("Sending request to {} for {}/{}", CHUNKING_REQUEST_EXCHANGE, dossierId, fileId);
rabbitTemplate.convertAndSend(CHUNKING_REQUEST_EXCHANGE, TenantContext.getTenantId(), chunkingRequest);
sendReadOnlyAnalysisEvent(dossierId, fileId, fileEntity);
return;
}
if (settings.isLlmNerServiceEnabled() && !fileManagementStorageService.objectExists(dossierId, fileId, FileType.LLM_NER_ENTITIES)) {
log.info("Add file: {} from dossier {} to NER queue", fileId, dossierId);
log.info("Add file: {} from dossier {} to LLM NER queue", fileId, dossierId);
addToLLMNerQueue(dossierId, fileId);
sendReadOnlyAnalysisEvent(dossierId, fileId, fileEntity);
return;
@ -426,7 +426,7 @@ public class FileStatusService {
setStatusPreProcessingQueued(fileId);
rabbitTemplate.convertAndSend(MessagingConfiguration.PREPROCESSING_EXCHANGE, TenantContext.getTenantId(), processUntouchedDocumentRequest);
rabbitTemplate.convertAndSend(MessagingConfiguration.PREPROCESSING_REQUEST_EXCHANGE, TenantContext.getTenantId(), processUntouchedDocumentRequest);
}
@ -531,7 +531,8 @@ public class FileStatusService {
protected void addToLLMNerQueue(String dossierId, String fileId) {
setStatusNerAnalyzing(fileId);
rabbitTemplate.convertAndSend(MessagingConfiguration.LLM_NER_SERVICE_QUEUE,
rabbitTemplate.convertAndSend(MessagingConfiguration.LLM_NER_REQUEST_EXCHANGE,
TenantContext.getTenantId(),
LlmNerMessage.builder()
.identifier(QueueMessageIdentifierService.buildIdentifier(dossierId, fileId, false))
.chunksStorageId(StorageIdUtils.getStorageId(dossierId, fileId, FileType.DOCUMENT_CHUNKS))

View File

@ -67,7 +67,7 @@ public class IndexingService {
public void addToIndexingQueue(IndexMessageType indexMessageType, String dossierTemplateId, String dossierId, String fileId, int priority) {
rabbitTemplate.convertAndSend(MessagingConfiguration.INDEXING_EXCHANGE,
rabbitTemplate.convertAndSend(MessagingConfiguration.INDEXING_REQUEST_EXCHANGE,
TenantContext.getTenantId(),
IndexMessage.builder().messageType(indexMessageType).dossierTemplateId(dossierTemplateId).dossierId(dossierId).fileId(fileId).build(),
message -> {
@ -79,7 +79,7 @@ public class IndexingService {
public void addToDeleteFromIndexQueue(String dossierId, String fileId, int priority) {
rabbitTemplate.convertAndSend(MessagingConfiguration.DELETE_FROM_INDEX_EXCHANGE,
rabbitTemplate.convertAndSend(MessagingConfiguration.DELETE_FROM_INDEX_REQUEST_EXCHANGE,
TenantContext.getTenantId(),
IndexMessage.builder().dossierId(dossierId).fileId(fileId).build(),
message -> {

View File

@ -30,7 +30,7 @@ public class DownloadCompressingService {
if (updated == 1) {
websocketService.sendDownloadEvent(downloadId, userId, DownloadStatusValue.COMPRESSING);
rabbitTemplate.convertAndSend(MessagingConfiguration.DOWNLOAD_COMPRESSION_EXCHANGE,
rabbitTemplate.convertAndSend(MessagingConfiguration.DOWNLOAD_COMPRESSION_REQUEST_EXCHANGE,
TenantContext.getTenantId(),
DownloadJob.builder().storageId(downloadId).userId(userId).build());
}

View File

@ -75,7 +75,7 @@ public class DownloadDLQMessageReceiver {
@RabbitListener(queues = MessagingConfiguration.REPORT_RESPONSE_DLQ)
public void handleReportResponseDlqMessage(ReportResultMessage reportResultMessage) {
log.warn("Handling report request in DLQ userId: {} storageId: {} - setting status to error!", reportResultMessage.getUserId(), reportResultMessage.getDownloadId());
log.warn("Handling report response in DLQ userId: {} storageId: {} - setting status to error!", reportResultMessage.getUserId(), reportResultMessage.getDownloadId());
setDownloadFailed(reportResultMessage.getUserId(), reportResultMessage.getDownloadId());
}

View File

@ -2,7 +2,6 @@ package com.iqser.red.service.persistence.management.v1.processor.service.job;
import static com.iqser.red.service.persistence.management.v1.processor.utils.TenantUtils.isTenantReadyForPersistence;
import java.time.OffsetDateTime;
import java.util.Set;
import org.quartz.DisallowConcurrentExecution;
@ -51,7 +50,7 @@ import lombok.extern.slf4j.Slf4j;
log.debug("Found {} files which require analysis flag calculation. Tenant: {}", fileIdentifiers.size(), tenant.getDisplayName());
fileIdentifiers.forEach(fileIdentifier -> rabbitTemplate.convertAndSend(MessagingConfiguration.ANALYSIS_FLAG_CALCULATION_EXCHANGE,
fileIdentifiers.forEach(fileIdentifier -> rabbitTemplate.convertAndSend(MessagingConfiguration.ANALYSIS_FLAG_CALCULATION_REQUEST_EXCHANGE,
TenantContext.getTenantId(),
new AnalysisFlagCalculationMessage(fileIdentifier.dossierId(),
fileIdentifier.fileId())));

View File

@ -27,34 +27,36 @@ import lombok.extern.slf4j.Slf4j;
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class ChunkingMessageReceiver {
public static final String CHUNKING_RESPONSE_LISTENER_ID = "chunking-response-listener";
FileStatusService fileStatusService;
ObservationRegistry observationRegistry;
FileStatusProcessingUpdateService fileStatusProcessingUpdateService;
ObjectMapper mapper;
@RabbitListener(queues = MessagingConfiguration.CHUNKING_SERVICE_RESPONSE_QUEUE)
@RabbitListener(id = CHUNKING_RESPONSE_LISTENER_ID)
public void receive(Message message) {
PyInfraRequest pyInfraRequest = parsePyInfraRequest(message);
addFileIdToTrace(pyInfraRequest.getFileUuid());
log.info("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.CHUNKING_SERVICE_RESPONSE_QUEUE, pyInfraRequest.getDossierUuid(), pyInfraRequest.getFileUuid());
log.info("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.CHUNKING_RESPONSE_EXCHANGE, pyInfraRequest.getDossierUuid(), pyInfraRequest.getFileUuid());
fileStatusService.setStatusAnalyse(pyInfraRequest.getDossierUuid(), pyInfraRequest.getFileUuid(), false);
}
@RabbitListener(queues = MessagingConfiguration.CHUNKING_SERVICE_DLQ)
@RabbitListener(queues = MessagingConfiguration.CHUNKING_DLQ)
public void handleDLQ(Message message) {
PyInfraRequest pyInfraRequest = parsePyInfraRequest(message);
addFileIdToTrace(pyInfraRequest.getFileUuid());
log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.CHUNKING_SERVICE_DLQ, pyInfraRequest.getDossierUuid(), pyInfraRequest.getFileUuid());
log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.CHUNKING_DLQ, pyInfraRequest.getDossierUuid(), pyInfraRequest.getFileUuid());
fileStatusProcessingUpdateService.analysisFailed(pyInfraRequest.getDossierUuid(),
pyInfraRequest.getFileUuid(),
new FileErrorInfo("chunking service failed",
MessagingConfiguration.CHUNKING_SERVICE_DLQ,
MessagingConfiguration.CHUNKING_DLQ,
"junker-chunker-service",
OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS)));
}

View File

@ -30,6 +30,7 @@ import lombok.extern.slf4j.Slf4j;
public class NerMessageReceiver {
public static final String ENTITY_RESPONSE_LISTENER_ID = "entity-response-listener";
public static final String LLM_ENTITY_RESPONSE_LISTENER_ID = "llm-entity-response-listener";
private final FileStatusService fileStatusService;
private final ObjectMapper objectMapper;
@ -54,14 +55,14 @@ public class NerMessageReceiver {
@SneakyThrows
@RabbitListener(queues = MessagingConfiguration.LLM_NER_SERVICE_RESPONSE_QUEUE)
@RabbitListener(id = LLM_ENTITY_RESPONSE_LISTENER_ID)
public void receiveLLM(LlmNerResponseMessage message) {
String dossierId = QueueMessageIdentifierService.parseDossierId(message.getIdentifier());
String fileId = QueueMessageIdentifierService.parseFileId(message.getIdentifier());
addFileIdToTrace(fileId);
log.info("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.LLM_NER_SERVICE_RESPONSE_QUEUE, dossierId, fileId);
log.info("Received message from {} for dossierId {} and fileId {}", LLM_ENTITY_RESPONSE_LISTENER_ID, dossierId, fileId);
fileStatusService.setStatusAnalyse(dossierId, fileId, false);
}
@ -85,7 +86,7 @@ public class NerMessageReceiver {
}
@RabbitListener(queues = MessagingConfiguration.LLM_NER_SERVICE_DLQ)
@RabbitListener(queues = MessagingConfiguration.LLM_NER_DLQ)
public void handleLLMDLQMessage(Message failedMessage) throws IOException {
LlmNerMessage entityResponse = objectMapper.readValue(failedMessage.getBody(), LlmNerMessage.class);
@ -95,11 +96,11 @@ public class NerMessageReceiver {
addFileIdToTrace(fileId);
log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.LLM_NER_SERVICE_DLQ, dossierId, fileId);
log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.LLM_NER_DLQ, dossierId, fileId);
fileStatusProcessingUpdateService.analysisFailed(dossierId,
fileId,
new FileErrorInfo("llm ner service failed",
MessagingConfiguration.LLM_NER_SERVICE_DLQ,
MessagingConfiguration.LLM_NER_DLQ,
"entity-recognition-service-v3",
OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS)));
}

View File

@ -57,7 +57,7 @@ public class OCRProcessingMessageReceiver {
response.getNumberOfOCRedPages());
}
log.debug("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_EXCHANGE);
log.debug("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_EXCHANGE);
}

View File

@ -8,10 +8,12 @@ import static com.iqser.red.service.persistence.management.v1.processor.service.
import static com.iqser.red.service.persistence.management.v1.processor.service.download.RedactionResultMessageReceiver.PDFTRON_RESPONSE_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.AnalysisFlagsCalculationMessageReceiver.ANALYSIS_FLAG_CALCULATION_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.AzureNerMessageReceiver.AZURE_ENTITY_RESPONSE_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.ChunkingMessageReceiver.CHUNKING_RESPONSE_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.CvAnalysisMessageReceiver.CV_ANALYSIS_RESPONSE_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.ImageMessageReceiver.IMAGE_RESPONSE_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.LayoutParsingFinishedMessageReceiver.LAYOUT_PARSING_RESPONSE_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.NerMessageReceiver.ENTITY_RESPONSE_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.NerMessageReceiver.LLM_ENTITY_RESPONSE_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.OCRProcessingMessageReceiver.OCR_RESPONSE_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.OCRProcessingMessageReceiver.OCR_STATUS_UPDATE_LISTENER_ID;
import static com.iqser.red.service.persistence.management.v1.processor.service.queue.RedactionAnalysisResponseReceiver.REDACTION_RESPONSE_LISTENER_ID;
@ -48,26 +50,26 @@ public class TenantExchangeMessageReceiverImpl extends TenantExchangeMessageRece
return Set.of(TenantQueueConfiguration.builder()
.listenerId(DOWNLOAD_LISTENER_ID)
.exchangeName(DOWNLOAD_EXCHANGE)
.queuePrefix(DOWNLOAD_QUEUE_PREFIX)
.exchangeName(DOWNLOAD_REQUEST_EXCHANGE)
.queuePrefix(DOWNLOAD_REQUEST_QUEUE_PREFIX)
.dlqName(DOWNLOAD_DLQ)
.build(),
TenantQueueConfiguration.builder()
.listenerId(DOWNLOAD_COMPRESSION_LISTENER_ID)
.exchangeName(DOWNLOAD_COMPRESSION_EXCHANGE)
.queuePrefix(DOWNLOAD_COMPRESSION_QUEUE_PREFIX)
.exchangeName(DOWNLOAD_COMPRESSION_REQUEST_EXCHANGE)
.queuePrefix(DOWNLOAD_COMPRESSION_REQUEST_QUEUE_PREFIX)
.dlqName(DOWNLOAD_COMPRESSION_DLQ)
.build(),
TenantQueueConfiguration.builder()
.listenerId(EXPORT_DOWNLOAD_LISTENER_ID)
.exchangeName(EXPORT_DOWNLOAD_EXCHANGE)
.queuePrefix(EXPORT_DOWNLOAD_QUEUE_PREFIX)
.exchangeName(EXPORT_DOWNLOAD_REQUEST_EXCHANGE)
.queuePrefix(EXPORT_DOWNLOAD_REQUEST_QUEUE_PREFIX)
.dlqName(EXPORT_DOWNLOAD_DLQ)
.build(),
TenantQueueConfiguration.builder()
.listenerId(ANALYSIS_FLAG_CALCULATION_LISTENER_ID)
.exchangeName(ANALYSIS_FLAG_CALCULATION_EXCHANGE)
.queuePrefix(ANALYSIS_FLAG_CALCULATION_QUEUE_PREFIX)
.exchangeName(ANALYSIS_FLAG_CALCULATION_REQUEST_EXCHANGE)
.queuePrefix(ANALYSIS_FLAG_CALCULATION_REQUEST_QUEUE_PREFIX)
.build(),
TenantQueueConfiguration.builder()
.listenerId(REDACTION_RESPONSE_LISTENER_ID)
@ -96,8 +98,8 @@ public class TenantExchangeMessageReceiverImpl extends TenantExchangeMessageRece
.build(),
TenantQueueConfiguration.builder()
.listenerId(OCR_STATUS_UPDATE_LISTENER_ID)
.exchangeName(OCR_STATUS_UPDATE_EXCHANGE)
.queuePrefix(OCR_STATUS_UPDATE_QUEUE_PREFIX)
.exchangeName(OCR_STATUS_UPDATE_RESPONSE_EXCHANGE)
.queuePrefix(OCR_STATUS_UPDATE_RESPONSE_QUEUE_PREFIX)
.dlqName(OCR_STATUS_UPDATE_DLQ)
.build(),
TenantQueueConfiguration.builder()
@ -118,6 +120,18 @@ public class TenantExchangeMessageReceiverImpl extends TenantExchangeMessageRece
.queuePrefix(AZURE_ENTITY_RESPONSE_QUEUE_PREFIX)
.dlqName(AZURE_ENTITY_DLQ)
.build(),
TenantQueueConfiguration.builder()
.listenerId(CHUNKING_RESPONSE_LISTENER_ID)
.exchangeName(CHUNKING_RESPONSE_EXCHANGE)
.queuePrefix(CHUNKING_RESPONSE_QUEUE_PREFIX)
.dlqName(CHUNKING_DLQ)
.build(),
TenantQueueConfiguration.builder()
.listenerId(LLM_ENTITY_RESPONSE_LISTENER_ID)
.exchangeName(LLM_NER_RESPONSE_EXCHANGE)
.queuePrefix(LLM_NER_RESPONSE_QUEUE_PREFIX)
.dlqName(LLM_NER_DLQ)
.build(),
TenantQueueConfiguration.builder()
.listenerId(CV_ANALYSIS_RESPONSE_LISTENER_ID)
.exchangeName(CV_ANALYSIS_RESPONSE_EXCHANGE)

View File

@ -9,7 +9,9 @@ dependencies {
api(project(":persistence-service-shared-api-v1"))
api("com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.17.2")
api("com.google.guava:guava:31.1-jre")
api("com.knecon.fforesight:mongo-database-commons:0.11.0")
api("com.knecon.fforesight:mongo-database-commons:0.12.0") {
exclude(group = "com.knecon.fforesight", module = "tenant-commons")
}
api("org.springframework.boot:spring-boot-starter-data-mongodb:${springBootStarterVersion}")
api("org.springframework.boot:spring-boot-starter-validation:3.1.3")
testImplementation("com.iqser.red.commons:test-commons:2.1.0")