Compare commits
10 Commits
release/2.
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f2e8020c9b | ||
|
|
e7f84c28d6 | ||
|
|
779acf6202 | ||
|
|
ff626afe78 | ||
|
|
ebf13d3be1 | ||
|
|
0693ddf197 | ||
|
|
c6d0361678 | ||
|
|
380b62333c | ||
|
|
f7a549c1a3 | ||
|
|
55e0386e31 |
@ -22,13 +22,13 @@ configurations {
|
|||||||
val springBootStarterVersion = "3.1.5"
|
val springBootStarterVersion = "3.1.5"
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
api("com.knecon.fforesight:tenant-commons:0.21.0")
|
api("com.knecon.fforesight:tenant-commons:0.30.0")
|
||||||
api("com.knecon.fforesight:tracing-commons:0.5.0")
|
api("com.knecon.fforesight:tracing-commons:0.5.0")
|
||||||
api("com.knecon.fforesight:lifecycle-commons:0.6.0")
|
api("com.knecon.fforesight:lifecycle-commons:0.6.0")
|
||||||
api("com.google.guava:guava:31.1-jre")
|
api("com.google.guava:guava:31.1-jre")
|
||||||
api("com.iqser.red.commons:storage-commons:2.45.0")
|
api("com.iqser.red.commons:storage-commons:2.45.0")
|
||||||
api(project(":search-service-api-v1"))
|
api(project(":search-service-api-v1"))
|
||||||
api("com.iqser.red.service:persistence-service-internal-api-v1:2.465.78")
|
api("com.iqser.red.service:persistence-service-internal-api-v1:2.576.0-RED10106.0")
|
||||||
api("com.iqser.red.commons:spring-commons:2.1.0")
|
api("com.iqser.red.commons:spring-commons:2.1.0")
|
||||||
api("com.iqser.red.commons:metric-commons:2.1.0")
|
api("com.iqser.red.commons:metric-commons:2.1.0")
|
||||||
api("com.iqser.red.commons:jackson-commons:2.1.0")
|
api("com.iqser.red.commons:jackson-commons:2.1.0")
|
||||||
|
|||||||
@ -1,7 +1,8 @@
|
|||||||
package com.iqser.red.service.search.v1.server.queue;
|
package com.iqser.red.service.search.v1.server.configuration;
|
||||||
|
|
||||||
import org.springframework.amqp.core.Binding;
|
import org.springframework.amqp.core.Binding;
|
||||||
import org.springframework.amqp.core.BindingBuilder;
|
import org.springframework.amqp.core.BindingBuilder;
|
||||||
|
import org.springframework.amqp.core.DirectExchange;
|
||||||
import org.springframework.amqp.core.Queue;
|
import org.springframework.amqp.core.Queue;
|
||||||
import org.springframework.amqp.core.QueueBuilder;
|
import org.springframework.amqp.core.QueueBuilder;
|
||||||
import org.springframework.amqp.core.TopicExchange;
|
import org.springframework.amqp.core.TopicExchange;
|
||||||
@ -16,64 +17,46 @@ import lombok.RequiredArgsConstructor;
|
|||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public class MessagingConfiguration {
|
public class MessagingConfiguration {
|
||||||
|
|
||||||
public static final String INDEXING_QUEUE = "indexingQueue";
|
public static final String INDEXING_REQUEST_QUEUE_PREFIX = "indexing_request";
|
||||||
public static final String INDEXING_DQL = "indexingDQL";
|
public static final String INDEXING_REQUEST_EXCHANGE = "indexing_request_exchange";
|
||||||
|
public static final String INDEXING_DLQ = "indexing_error";
|
||||||
|
|
||||||
public static final String DELETE_FROM_INDEX_QUEUE = "deleteFromIndexQueue";
|
public static final String DELETE_FROM_INDEX_REQUEST_QUEUE_PREFIX = "delete_from_index_request";
|
||||||
public static final String DELETE_FROM_INDEX_DLQ = "deleteFromIndexDLQ";
|
public static final String DELETE_FROM_INDEX_REQUEST_EXCHANGE = "delete_from_index_request_exchange";
|
||||||
|
public static final String DELETE_FROM_INDEX_DLQ = "delete_from_index_error";
|
||||||
|
|
||||||
public static final String X_ERROR_INFO_HEADER = "x-error-message";
|
public static final String X_ERROR_INFO_HEADER = "x-error-message";
|
||||||
public static final String X_ERROR_INFO_TIMESTAMP_HEADER = "x-error-message-timestamp";
|
public static final String X_ERROR_INFO_TIMESTAMP_HEADER = "x-error-message-timestamp";
|
||||||
|
|
||||||
@Value("${fforesight.multitenancy.tenant-delete-queue:tenant-delete-queue}")
|
@Value("${fforesight.multitenancy.tenant-delete-queue:search-service-tenant-delete}")
|
||||||
private String tenantDeleteEventQueueName;
|
private String tenantDeleteEventQueueName;
|
||||||
@Value("${fforesight.multitenancy.tenant-delete-dlq:tenant-delete-dlq}")
|
@Value("${fforesight.multitenancy.tenant-delete-dlq:search-service-tenant-delete-error}")
|
||||||
private String tenantDeleteDLQName;
|
private String tenantDeleteDLQName;
|
||||||
|
|
||||||
@Value("${fforesight.multitenancy.tenant-updated-queue:tenant-updated-queue}")
|
@Value("${fforesight.multitenancy.tenant-updated-queue:search-service-tenant-updated}")
|
||||||
private String tenantUpdatedEventQueueName;
|
private String tenantUpdatedEventQueueName;
|
||||||
@Value("${fforesight.multitenancy.tenant-updated-dlq:tenant-updated-dlq}")
|
@Value("${fforesight.multitenancy.tenant-updated-dlq:search-service-tenant-updated-error}")
|
||||||
private String tenantUpdatedDLQName;
|
private String tenantUpdatedDLQName;
|
||||||
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public Binding tenantExchangeDeleteBinding(@Qualifier("tenantUserManagementTenantDeleteQueue") Queue tenantUserManagementTenantDeleteQueue,
|
public DirectExchange indexingRequestExchange() {
|
||||||
@Qualifier("tenantExchange") TopicExchange tenantExchange) {
|
|
||||||
|
|
||||||
return BindingBuilder.bind(tenantUserManagementTenantDeleteQueue).to(tenantExchange).with("tenant.delete");
|
return new DirectExchange(INDEXING_REQUEST_EXCHANGE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public Binding tenantExchangeUpdatedBinding(@Qualifier("tenantUserManagementTenantUpdatedQueue") Queue tenantUserManagementTenantUpdatedQueue,
|
public Queue indexingDLQ() {
|
||||||
@Qualifier("tenantExchange") TopicExchange tenantExchange) {
|
|
||||||
|
|
||||||
return BindingBuilder.bind(tenantUserManagementTenantUpdatedQueue).to(tenantExchange).with("tenant.updated");
|
return QueueBuilder.durable(INDEXING_DLQ).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public Queue indexingQueue() {
|
public DirectExchange deleteFromIndexRequestExchange() {
|
||||||
|
|
||||||
return QueueBuilder.durable(INDEXING_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", INDEXING_DQL).maxPriority(2).build();
|
return new DirectExchange(DELETE_FROM_INDEX_REQUEST_EXCHANGE);
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public Queue indexingDeadLetterQueue() {
|
|
||||||
|
|
||||||
return QueueBuilder.durable(INDEXING_DQL).build();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public Queue deleteFromIndexQueue() {
|
|
||||||
|
|
||||||
return QueueBuilder.durable(DELETE_FROM_INDEX_QUEUE)
|
|
||||||
.withArgument("x-dead-letter-exchange", "")
|
|
||||||
.withArgument("x-dead-letter-routing-key", DELETE_FROM_INDEX_DLQ)
|
|
||||||
.maxPriority(2)
|
|
||||||
.build();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -83,13 +66,12 @@ public class MessagingConfiguration {
|
|||||||
return QueueBuilder.durable(DELETE_FROM_INDEX_DLQ).build();
|
return QueueBuilder.durable(DELETE_FROM_INDEX_DLQ).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tentant Delete Event Queue
|
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Binding tenantExchangeDeleteBinding(@Qualifier("tenantUserManagementTenantDeleteQueue") Queue tenantUserManagementTenantDeleteQueue,
|
||||||
|
@Qualifier("tenantExchange") TopicExchange tenantExchange) {
|
||||||
|
|
||||||
@Bean(name = "tenantExchange")
|
return BindingBuilder.bind(tenantUserManagementTenantDeleteQueue).to(tenantExchange).with("tenant.delete");
|
||||||
TopicExchange tenantExchange(@Value("${fforesight.tenant-exchange.name}") String tenantExchangeName) {
|
|
||||||
|
|
||||||
return new TopicExchange(tenantExchangeName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -110,6 +92,14 @@ public class MessagingConfiguration {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Binding tenantExchangeUpdatedBinding(@Qualifier("tenantUserManagementTenantUpdatedQueue") Queue tenantUserManagementTenantUpdatedQueue,
|
||||||
|
@Qualifier("tenantExchange") TopicExchange tenantExchange) {
|
||||||
|
|
||||||
|
return BindingBuilder.bind(tenantUserManagementTenantUpdatedQueue).to(tenantExchange).with("tenant.updated");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Bean("tenantUserManagementTenantUpdatedQueue")
|
@Bean("tenantUserManagementTenantUpdatedQueue")
|
||||||
public Queue tenantUpdatedQueue() {
|
public Queue tenantUpdatedQueue() {
|
||||||
|
|
||||||
@ -0,0 +1,11 @@
|
|||||||
|
package com.iqser.red.service.search.v1.server.configuration;
|
||||||
|
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
import com.knecon.fforesight.tenantcommons.queue.TenantMessagingConfiguration;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
public class TenantMessagingConfigurationImpl extends TenantMessagingConfiguration {
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
@ -1,14 +1,12 @@
|
|||||||
package com.iqser.red.service.search.v1.server.queue;
|
package com.iqser.red.service.search.v1.server.queue;
|
||||||
|
|
||||||
import static com.iqser.red.service.search.v1.server.queue.MessagingConfiguration.DELETE_FROM_INDEX_DLQ;
|
import static com.iqser.red.service.search.v1.server.configuration.MessagingConfiguration.DELETE_FROM_INDEX_DLQ;
|
||||||
import static com.iqser.red.service.search.v1.server.queue.MessagingConfiguration.DELETE_FROM_INDEX_QUEUE;
|
import static com.iqser.red.service.search.v1.server.configuration.MessagingConfiguration.INDEXING_DLQ;
|
||||||
import static com.iqser.red.service.search.v1.server.queue.MessagingConfiguration.INDEXING_DQL;
|
import static com.iqser.red.service.search.v1.server.configuration.MessagingConfiguration.INDEXING_REQUEST_EXCHANGE;
|
||||||
import static com.iqser.red.service.search.v1.server.queue.MessagingConfiguration.INDEXING_QUEUE;
|
import static com.iqser.red.service.search.v1.server.configuration.MessagingConfiguration.X_ERROR_INFO_HEADER;
|
||||||
import static com.iqser.red.service.search.v1.server.queue.MessagingConfiguration.X_ERROR_INFO_HEADER;
|
import static com.iqser.red.service.search.v1.server.configuration.MessagingConfiguration.X_ERROR_INFO_TIMESTAMP_HEADER;
|
||||||
import static com.iqser.red.service.search.v1.server.queue.MessagingConfiguration.X_ERROR_INFO_TIMESTAMP_HEADER;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.time.OffsetDateTime;
|
import java.time.OffsetDateTime;
|
||||||
import java.time.temporal.ChronoUnit;
|
import java.time.temporal.ChronoUnit;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -20,7 +18,6 @@ import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
|||||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.Dossier;
|
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.Dossier;
|
||||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileErrorInfo;
|
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileErrorInfo;
|
||||||
@ -38,6 +35,7 @@ import com.iqser.red.service.search.v1.server.service.IndexDeleteService;
|
|||||||
import com.iqser.red.service.search.v1.server.service.IndexDocumentConverterService;
|
import com.iqser.red.service.search.v1.server.service.IndexDocumentConverterService;
|
||||||
import com.iqser.red.service.search.v1.server.service.IndexInformationService;
|
import com.iqser.red.service.search.v1.server.service.IndexInformationService;
|
||||||
import com.iqser.red.service.search.v1.server.service.TextStorageService;
|
import com.iqser.red.service.search.v1.server.service.TextStorageService;
|
||||||
|
import com.knecon.fforesight.tenantcommons.TenantContext;
|
||||||
|
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
@ -48,6 +46,9 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public class IndexingMessageReceiver {
|
public class IndexingMessageReceiver {
|
||||||
|
|
||||||
|
public static final String INDEXING_LISTENER_ID = "indexing-listener";
|
||||||
|
public static final String DELETE_FROM_INDEX_LISTENER_ID = "delete-from-index-listener";
|
||||||
|
|
||||||
private final ObjectMapper objectMapper;
|
private final ObjectMapper objectMapper;
|
||||||
private final TextStorageService textStorageService;
|
private final TextStorageService textStorageService;
|
||||||
private final FileStatusClient fileStatusClient;
|
private final FileStatusClient fileStatusClient;
|
||||||
@ -65,7 +66,7 @@ public class IndexingMessageReceiver {
|
|||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
@RabbitHandler
|
@RabbitHandler
|
||||||
@RabbitListener(queues = INDEXING_QUEUE)
|
@RabbitListener(id = INDEXING_LISTENER_ID)
|
||||||
public void receiveIndexingRequest(Message message) {
|
public void receiveIndexingRequest(Message message) {
|
||||||
|
|
||||||
var indexRequest = objectMapper.readValue(message.getBody(), IndexMessage.class);
|
var indexRequest = objectMapper.readValue(message.getBody(), IndexMessage.class);
|
||||||
@ -73,8 +74,8 @@ public class IndexingMessageReceiver {
|
|||||||
// This prevents from endless retries oom errors.
|
// This prevents from endless retries oom errors.
|
||||||
if (message.getMessageProperties().isRedelivered()) {
|
if (message.getMessageProperties().isRedelivered()) {
|
||||||
throw new AmqpRejectAndDontRequeueException(String.format("Error during last processing of request with dossierId: %s and fileId: %s, do not retry.",
|
throw new AmqpRejectAndDontRequeueException(String.format("Error during last processing of request with dossierId: %s and fileId: %s, do not retry.",
|
||||||
indexRequest.getDossierId(),
|
indexRequest.getDossierId(),
|
||||||
indexRequest.getFileId()));
|
indexRequest.getFileId()));
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -105,12 +106,11 @@ public class IndexingMessageReceiver {
|
|||||||
fileStatus = fileStatusClient.getFileStatus(indexRequest.getDossierId(), indexRequest.getFileId());
|
fileStatus = fileStatusClient.getFileStatus(indexRequest.getDossierId(), indexRequest.getFileId());
|
||||||
dossier = dossierClient.getDossierById(indexRequest.getDossierId(), true, true);
|
dossier = dossierClient.getDossierById(indexRequest.getDossierId(), true, true);
|
||||||
|
|
||||||
if(documentUpdateService.documentExists(indexRequest.getFileId())) {
|
if(documentUpdateService.documentExists(indexRequest.getFileId())) {var indexUpdateDocument = indexDocumentConverterService.convertUpdateDocument(fileStatus.getAssignee(),
|
||||||
var indexUpdateDocument = indexDocumentConverterService.convertUpdateDocument(fileStatus.getAssignee(),
|
dossier.getSoftDeletedTime() != null,
|
||||||
dossier.getSoftDeletedTime() != null,
|
dossier.getArchivedTime() != null,
|
||||||
dossier.getArchivedTime() != null,
|
fileStatus.getWorkflowStatus().name(),
|
||||||
fileStatus.getWorkflowStatus().name(),
|
fileStatus.getFileAttributes());
|
||||||
fileStatus.getFileAttributes());
|
|
||||||
|
|
||||||
documentUpdateService.updateDocument(indexRequest.getFileId(), indexUpdateDocument);
|
documentUpdateService.updateDocument(indexRequest.getFileId(), indexUpdateDocument);
|
||||||
log.info("Successfully updated {}", indexRequest);
|
log.info("Successfully updated {}", indexRequest);
|
||||||
@ -133,7 +133,7 @@ public class IndexingMessageReceiver {
|
|||||||
|
|
||||||
|
|
||||||
@RabbitHandler
|
@RabbitHandler
|
||||||
@RabbitListener(queues = INDEXING_DQL)
|
@RabbitListener(queues = INDEXING_DLQ)
|
||||||
public void receiveIndexingRequestDQL(Message in) throws IOException {
|
public void receiveIndexingRequestDQL(Message in) throws IOException {
|
||||||
|
|
||||||
var indexRequest = objectMapper.readValue(in.getBody(), IndexMessage.class);
|
var indexRequest = objectMapper.readValue(in.getBody(), IndexMessage.class);
|
||||||
@ -142,13 +142,15 @@ public class IndexingMessageReceiver {
|
|||||||
String errorMessage = errorLog + in.getMessageProperties().getHeader(X_ERROR_INFO_HEADER);
|
String errorMessage = errorLog + in.getMessageProperties().getHeader(X_ERROR_INFO_HEADER);
|
||||||
OffsetDateTime timestamp = in.getMessageProperties().getHeader(X_ERROR_INFO_TIMESTAMP_HEADER);
|
OffsetDateTime timestamp = in.getMessageProperties().getHeader(X_ERROR_INFO_TIMESTAMP_HEADER);
|
||||||
timestamp = timestamp != null ? timestamp : OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS);
|
timestamp = timestamp != null ? timestamp : OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS);
|
||||||
fileStatusProcessingUpdateClient.indexingFailed(indexRequest.getDossierId(), indexRequest.getFileId(), new FileErrorInfo(errorMessage, INDEXING_DQL, "search-service", timestamp));
|
fileStatusProcessingUpdateClient.indexingFailed(indexRequest.getDossierId(),
|
||||||
|
indexRequest.getFileId(),
|
||||||
|
new FileErrorInfo(errorMessage, INDEXING_DLQ, "search-service", timestamp));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@RabbitHandler
|
@RabbitHandler
|
||||||
@RabbitListener(queues = DELETE_FROM_INDEX_QUEUE)
|
@RabbitListener(id = DELETE_FROM_INDEX_LISTENER_ID)
|
||||||
public void receiveDeleteDocumentRequest(Message in) throws IOException {
|
public void receiveDeleteDocumentRequest(Message in) throws IOException {
|
||||||
|
|
||||||
var indexRequest = objectMapper.readValue(in.getBody(), IndexMessage.class);
|
var indexRequest = objectMapper.readValue(in.getBody(), IndexMessage.class);
|
||||||
@ -177,7 +179,9 @@ public class IndexingMessageReceiver {
|
|||||||
String errorMessage = errorLog + in.getMessageProperties().getHeader(X_ERROR_INFO_HEADER);
|
String errorMessage = errorLog + in.getMessageProperties().getHeader(X_ERROR_INFO_HEADER);
|
||||||
OffsetDateTime timestamp = in.getMessageProperties().getHeader(X_ERROR_INFO_TIMESTAMP_HEADER);
|
OffsetDateTime timestamp = in.getMessageProperties().getHeader(X_ERROR_INFO_TIMESTAMP_HEADER);
|
||||||
timestamp = timestamp != null ? timestamp : OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS);
|
timestamp = timestamp != null ? timestamp : OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS);
|
||||||
fileStatusProcessingUpdateClient.indexingFailed(indexRequest.getDossierId(), indexRequest.getFileId(), new FileErrorInfo(errorMessage, INDEXING_DQL, "search-service", timestamp));
|
fileStatusProcessingUpdateClient.indexingFailed(indexRequest.getDossierId(),
|
||||||
|
indexRequest.getFileId(),
|
||||||
|
new FileErrorInfo(errorMessage, INDEXING_DLQ, "search-service", timestamp));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -188,15 +192,15 @@ public class IndexingMessageReceiver {
|
|||||||
Text text = textStorageService.getText(dossier.getId(), file.getId());
|
Text text = textStorageService.getText(dossier.getId(), file.getId());
|
||||||
|
|
||||||
var indexDocument = indexDocumentConverterService.convert(dossier.getDossierTemplateId(),
|
var indexDocument = indexDocumentConverterService.convert(dossier.getDossierTemplateId(),
|
||||||
dossier.getId(),
|
dossier.getId(),
|
||||||
file.getId(),
|
file.getId(),
|
||||||
file.getFilename(),
|
file.getFilename(),
|
||||||
text,
|
text,
|
||||||
file.getAssignee(),
|
file.getAssignee(),
|
||||||
dossier.getSoftDeletedTime() != null,
|
dossier.getSoftDeletedTime() != null,
|
||||||
dossier.getArchivedTime() != null,
|
dossier.getArchivedTime() != null,
|
||||||
file.getWorkflowStatus(),
|
file.getWorkflowStatus(),
|
||||||
file.getFileAttributes());
|
file.getFileAttributes());
|
||||||
|
|
||||||
documentIndexService.indexDocument(indexDocument);
|
documentIndexService.indexDocument(indexDocument);
|
||||||
fileStatusProcessingUpdateClient.indexingSuccessful(dossier.getId(), file.getId());
|
fileStatusProcessingUpdateClient.indexingSuccessful(dossier.getId(), file.getId());
|
||||||
@ -219,12 +223,13 @@ public class IndexingMessageReceiver {
|
|||||||
|
|
||||||
for (FileModel file : files) {
|
for (FileModel file : files) {
|
||||||
log.info("Will add dossier {} file {} to index queue", dossierId, file.getId());
|
log.info("Will add dossier {} file {} to index queue", dossierId, file.getId());
|
||||||
rabbitTemplate.convertAndSend(INDEXING_QUEUE,
|
rabbitTemplate.convertAndSend(INDEXING_REQUEST_EXCHANGE,
|
||||||
IndexMessage.builder().messageType(IndexMessageType.INSERT).dossierId(dossierId).fileId(file.getId()).build(),
|
TenantContext.getTenantId(),
|
||||||
message -> {
|
IndexMessage.builder().messageType(IndexMessageType.INSERT).dossierId(dossierId).fileId(file.getId()).build(),
|
||||||
message.getMessageProperties().setPriority(99);
|
message -> {
|
||||||
return message;
|
message.getMessageProperties().setPriority(99);
|
||||||
});
|
return message;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -0,0 +1,74 @@
|
|||||||
|
package com.iqser.red.service.search.v1.server.queue;
|
||||||
|
|
||||||
|
import static com.iqser.red.service.search.v1.server.configuration.MessagingConfiguration.*;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
|
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||||
|
import org.springframework.context.event.EventListener;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import com.knecon.fforesight.tenantcommons.TenantProvider;
|
||||||
|
import com.knecon.fforesight.tenantcommons.model.TenantCreatedEvent;
|
||||||
|
import com.knecon.fforesight.tenantcommons.model.TenantQueueConfiguration;
|
||||||
|
import com.knecon.fforesight.tenantcommons.model.TenantResponse;
|
||||||
|
import com.knecon.fforesight.tenantcommons.queue.RabbitQueueFromExchangeService;
|
||||||
|
import com.knecon.fforesight.tenantcommons.queue.TenantExchangeMessageReceiver;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class TenantExchangeMessageReceiverImpl extends TenantExchangeMessageReceiver {
|
||||||
|
|
||||||
|
public TenantExchangeMessageReceiverImpl(RabbitQueueFromExchangeService rabbitQueueService, TenantProvider tenantProvider) {
|
||||||
|
|
||||||
|
super(rabbitQueueService, tenantProvider);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Set<TenantQueueConfiguration> getTenantQueueConfigs() {
|
||||||
|
|
||||||
|
return Set.of(TenantQueueConfiguration.builder()
|
||||||
|
.listenerId(IndexingMessageReceiver.INDEXING_LISTENER_ID)
|
||||||
|
.exchangeName(INDEXING_REQUEST_EXCHANGE)
|
||||||
|
.queuePrefix(INDEXING_REQUEST_QUEUE_PREFIX)
|
||||||
|
.dlqName(INDEXING_DLQ)
|
||||||
|
.arguments(Map.of("x-max-priority", 2))
|
||||||
|
.build(),
|
||||||
|
TenantQueueConfiguration.builder()
|
||||||
|
.listenerId(IndexingMessageReceiver.DELETE_FROM_INDEX_LISTENER_ID)
|
||||||
|
.exchangeName(DELETE_FROM_INDEX_REQUEST_EXCHANGE)
|
||||||
|
.queuePrefix(DELETE_FROM_INDEX_REQUEST_QUEUE_PREFIX)
|
||||||
|
.dlqName(DELETE_FROM_INDEX_DLQ)
|
||||||
|
.arguments(Map.of("x-max-priority", 2))
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@EventListener(ApplicationReadyEvent.class)
|
||||||
|
public void onApplicationReady() {
|
||||||
|
|
||||||
|
System.out.println("application ready invoked");
|
||||||
|
super.initializeQueues();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@RabbitHandler
|
||||||
|
@RabbitListener(queues = "#{tenantMessagingConfigurationImpl.getTenantCreatedQueueName()}")
|
||||||
|
public void reactToTenantCreation(TenantCreatedEvent tenantCreatedEvent) {
|
||||||
|
|
||||||
|
super.reactToTenantCreation(tenantCreatedEvent);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@RabbitHandler
|
||||||
|
@RabbitListener(queues = "#{tenantMessagingConfigurationImpl.getTenantDeletedQueueName()}")
|
||||||
|
public void reactToTenantDeletion(TenantResponse tenantResponse) {
|
||||||
|
|
||||||
|
super.reactToTenantDeletion(tenantResponse);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -21,7 +21,7 @@ public class UpdatedTenantMessageReceiver {
|
|||||||
private final IndexQueryService indexQueryService;
|
private final IndexQueryService indexQueryService;
|
||||||
private final IndexDeleteService indexDeleteService;
|
private final IndexDeleteService indexDeleteService;
|
||||||
|
|
||||||
@Value("${fforesight.multitenancy.tenant-updated-queue:tenant-updated-queue}")
|
@Value("${fforesight.multitenancy.tenant-updated-queue:search-service-tenant-updated}")
|
||||||
private String tenantUpdatedQueue;
|
private String tenantUpdatedQueue;
|
||||||
|
|
||||||
|
|
||||||
@ -32,7 +32,7 @@ public class UpdatedTenantMessageReceiver {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@RabbitListener(queues = "${fforesight.multitenancy.tenant-updated-queue:tenant-updated-queue}")
|
@RabbitListener(queues = "${fforesight.multitenancy.tenant-updated-queue:search-service-tenant-updated}")
|
||||||
public void updateTenant(TenantResponse tenant) {
|
public void updateTenant(TenantResponse tenant) {
|
||||||
|
|
||||||
String numberOfReplicas = tenant.getSearchConnection().getNumberOfReplicas();
|
String numberOfReplicas = tenant.getSearchConnection().getNumberOfReplicas();
|
||||||
|
|||||||
@ -6,6 +6,8 @@ import java.util.Set;
|
|||||||
|
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||||
|
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||||
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
|
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
|
||||||
@ -52,6 +54,12 @@ public abstract class AbstractElasticsearchIntegrationTest {
|
|||||||
@MockBean
|
@MockBean
|
||||||
private TenantsClient tenantsClient;
|
private TenantsClient tenantsClient;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private RabbitAdmin rabbitAdmin;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
|
||||||
|
|
||||||
private static int port;
|
private static int port;
|
||||||
|
|
||||||
|
|
||||||
@ -60,15 +68,15 @@ public abstract class AbstractElasticsearchIntegrationTest {
|
|||||||
|
|
||||||
TenantContext.setTenantId("redaction");
|
TenantContext.setTenantId("redaction");
|
||||||
when(tenantsClient.getTenant("redaction")).thenReturn(TenantResponse.builder()
|
when(tenantsClient.getTenant("redaction")).thenReturn(TenantResponse.builder()
|
||||||
.searchConnection(SearchConnection.builder()
|
.searchConnection(SearchConnection.builder()
|
||||||
.hosts(Set.of("localhost"))
|
.hosts(Set.of("localhost"))
|
||||||
.port(port)
|
.port(port)
|
||||||
.scheme("http")
|
.scheme("http")
|
||||||
.numberOfShards("1")
|
.numberOfShards("1")
|
||||||
.numberOfReplicas("5")
|
.numberOfReplicas("5")
|
||||||
.indexPrefix("indexprefix")
|
.indexPrefix("indexprefix")
|
||||||
.build())
|
.build())
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -7,7 +7,9 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
|
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||||
import org.springframework.core.io.ClassPathResource;
|
import org.springframework.core.io.ClassPathResource;
|
||||||
|
|||||||
@ -3,7 +3,9 @@ package com.iqser.red.service.search.v1.server.service;
|
|||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
|
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||||
import org.springframework.core.io.ClassPathResource;
|
import org.springframework.core.io.ClassPathResource;
|
||||||
|
|||||||
@ -7,7 +7,9 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
|
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||||
import org.springframework.core.io.ClassPathResource;
|
import org.springframework.core.io.ClassPathResource;
|
||||||
@ -54,6 +56,12 @@ public class OpensearchTest extends AbstractOpensearchIntegrationTest {
|
|||||||
@MockBean
|
@MockBean
|
||||||
private RabbitTemplate rabbitTemplate;
|
private RabbitTemplate rabbitTemplate;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private RabbitAdmin rabbitAdmin;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
|
||||||
|
|
||||||
@MockBean
|
@MockBean
|
||||||
private IndexDeleteService indexDeleteService;
|
private IndexDeleteService indexDeleteService;
|
||||||
|
|
||||||
|
|||||||
@ -32,3 +32,5 @@ persistence-service.url: 'http://mock.url'
|
|||||||
|
|
||||||
server:
|
server:
|
||||||
port: 19547
|
port: 19547
|
||||||
|
|
||||||
|
POD_NAME: search-service
|
||||||
Loading…
x
Reference in New Issue
Block a user