RED-9872: Tenant Management issues
This commit is contained in:
parent
0a7b6cddb2
commit
8e717a5067
@ -92,10 +92,12 @@ public class IndexingMessageReceiver {
|
||||
|
||||
log.info("Processing indexing request: {}", indexRequest);
|
||||
|
||||
FileModel fileStatus;
|
||||
Dossier dossier;
|
||||
switch (indexRequest.getMessageType()) {
|
||||
case INSERT:
|
||||
var fileStatus = fileStatusClient.getFileStatus(indexRequest.getDossierId(), indexRequest.getFileId());
|
||||
var dossier = dossierClient.getDossierById(indexRequest.getDossierId(), true, true);
|
||||
fileStatus = fileStatusClient.getFileStatus(indexRequest.getDossierId(), indexRequest.getFileId());
|
||||
dossier = dossierClient.getDossierById(indexRequest.getDossierId(), true, true);
|
||||
indexFile(dossier, fileStatus);
|
||||
break;
|
||||
|
||||
@ -103,14 +105,18 @@ public class IndexingMessageReceiver {
|
||||
fileStatus = fileStatusClient.getFileStatus(indexRequest.getDossierId(), indexRequest.getFileId());
|
||||
dossier = dossierClient.getDossierById(indexRequest.getDossierId(), true, true);
|
||||
|
||||
var indexUpdateDocument = indexDocumentConverterService.convertUpdateDocument(fileStatus.getAssignee(),
|
||||
dossier.getSoftDeletedTime() != null,
|
||||
dossier.getArchivedTime() != null,
|
||||
fileStatus.getWorkflowStatus().name(),
|
||||
fileStatus.getFileAttributes());
|
||||
if(documentUpdateService.documentExists(indexRequest.getFileId())) {
|
||||
var indexUpdateDocument = indexDocumentConverterService.convertUpdateDocument(fileStatus.getAssignee(),
|
||||
dossier.getSoftDeletedTime() != null,
|
||||
dossier.getArchivedTime() != null,
|
||||
fileStatus.getWorkflowStatus().name(),
|
||||
fileStatus.getFileAttributes());
|
||||
|
||||
documentUpdateService.updateDocument(indexRequest.getFileId(), indexUpdateDocument);
|
||||
log.info("Successfully updated {}", indexRequest);
|
||||
documentUpdateService.updateDocument(indexRequest.getFileId(), indexUpdateDocument);
|
||||
log.info("Successfully updated {}", indexRequest);
|
||||
} else {
|
||||
indexFile(dossier, fileStatus);
|
||||
}
|
||||
break;
|
||||
|
||||
case DROP:
|
||||
|
||||
@ -30,6 +30,11 @@ public class MessagingConfiguration {
|
||||
@Value("${fforesight.multitenancy.tenant-delete-dlq:tenant-delete-dlq}")
|
||||
private String tenantDeleteDLQName;
|
||||
|
||||
@Value("${fforesight.multitenancy.tenant-updated-queue:tenant-updated-queue}")
|
||||
private String tenantUpdatedEventQueueName;
|
||||
@Value("${fforesight.multitenancy.tenant-updated-dlq:tenant-updated-dlq}")
|
||||
private String tenantUpdatedDLQName;
|
||||
|
||||
|
||||
@Bean
|
||||
public Binding tenantExchangeDeleteBinding(@Qualifier("tenantUserManagementTenantDeleteQueue") Queue tenantUserManagementTenantDeleteQueue,
|
||||
@ -39,6 +44,14 @@ public class MessagingConfiguration {
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Binding tenantExchangeUpdatedBinding(@Qualifier("tenantUserManagementTenantUpdatedQueue") Queue tenantUserManagementTenantUpdatedQueue,
|
||||
@Qualifier("tenantExchange") TopicExchange tenantExchange) {
|
||||
|
||||
return BindingBuilder.bind(tenantUserManagementTenantUpdatedQueue).to(tenantExchange).with("tenant.updated");
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue indexingQueue() {
|
||||
|
||||
@ -96,4 +109,21 @@ public class MessagingConfiguration {
|
||||
return QueueBuilder.durable(this.tenantDeleteDLQName).build();
|
||||
}
|
||||
|
||||
|
||||
@Bean("tenantUserManagementTenantUpdatedQueue")
|
||||
public Queue tenantUpdatedQueue() {
|
||||
|
||||
return QueueBuilder.durable(this.tenantUpdatedEventQueueName)
|
||||
.withArgument("x-dead-letter-exchange", "")
|
||||
.withArgument("x-dead-letter-routing-key", this.tenantUpdatedDLQName)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue tenantUpdatedDLQ() {
|
||||
|
||||
return QueueBuilder.durable(this.tenantUpdatedDLQName).build();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,48 @@
|
||||
package com.iqser.red.service.search.v1.server.queue;
|
||||
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.iqser.red.service.search.v1.server.service.IndexDeleteService;
|
||||
import com.iqser.red.service.search.v1.server.service.IndexQueryResult;
|
||||
import com.iqser.red.service.search.v1.server.service.IndexQueryService;
|
||||
import com.knecon.fforesight.tenantcommons.model.TenantResponse;
|
||||
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class UpdatedTenantMessageReceiver {
|
||||
|
||||
private final IndexQueryService indexQueryService;
|
||||
private final IndexDeleteService indexDeleteService;
|
||||
|
||||
@Value("${fforesight.multitenancy.tenant-updated-queue:tenant-updated-queue}")
|
||||
private String tenantUpdatedQueue;
|
||||
|
||||
|
||||
@PostConstruct
|
||||
public void postConstruct() {
|
||||
|
||||
log.info("Listener for tenant updated events started for queue: {}", this.tenantUpdatedQueue);
|
||||
}
|
||||
|
||||
|
||||
@RabbitListener(queues = "${fforesight.multitenancy.tenant-updated-queue:tenant-updated-queue}")
|
||||
public void updateTenant(TenantResponse tenant) {
|
||||
|
||||
String numberOfReplicas = tenant.getSearchConnection().getNumberOfReplicas();
|
||||
String numberOfShards = tenant.getSearchConnection().getNumberOfShards();
|
||||
IndexQueryResult queryResult = indexQueryService.getIndexQueryResult(tenant.getSearchConnection());
|
||||
|
||||
if (queryResult.isIndexFound() && (!numberOfReplicas.equals(queryResult.getNumberOfReplicas()) || !numberOfShards.equals(queryResult.getNumberOfShards()))) {
|
||||
log.info("Number of shards or replicas were changed during tenant update, indices will be recreated");
|
||||
indexDeleteService.recreateIndex(tenant.getSearchConnection());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -5,5 +5,6 @@ import com.iqser.red.service.search.v1.server.model.IndexDocumentUpdate;
|
||||
public interface DocumentUpdateService {
|
||||
|
||||
void updateDocument(String fileId, IndexDocumentUpdate indexDocumentUpdate);
|
||||
boolean documentExists(String fileId);
|
||||
|
||||
}
|
||||
@ -6,6 +6,8 @@ public interface IndexDeleteService {
|
||||
|
||||
void recreateIndex();
|
||||
|
||||
void recreateIndex(SearchConnection searchConnection);
|
||||
|
||||
|
||||
void closeIndex();
|
||||
|
||||
|
||||
@ -0,0 +1,17 @@
|
||||
package com.iqser.red.service.search.v1.server.service;
|
||||
|
||||
import lombok.AccessLevel;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.experimental.FieldDefaults;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
|
||||
public class IndexQueryResult {
|
||||
|
||||
boolean indexFound;
|
||||
String numberOfShards;
|
||||
String numberOfReplicas;
|
||||
|
||||
}
|
||||
@ -0,0 +1,9 @@
|
||||
package com.iqser.red.service.search.v1.server.service;
|
||||
|
||||
import com.knecon.fforesight.tenantcommons.model.SearchConnection;
|
||||
|
||||
public interface IndexQueryService {
|
||||
|
||||
IndexQueryResult getIndexQueryResult(SearchConnection searchConnection);
|
||||
|
||||
}
|
||||
@ -42,4 +42,12 @@ public class DocumentUpdateServiceImpl implements DocumentUpdateService {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@SneakyThrows
|
||||
@Timed("redactmanager_documentExists")
|
||||
public boolean documentExists(String fileId) {
|
||||
|
||||
return clientCache.getClient().exists(e -> e.index(IndexNameHelper.getSearchIndex(clientCache.getClient().getSearchConnection().getIndexPrefix())).id(fileId)).value();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -30,6 +30,17 @@ public class IndexDeleteServiceImpl implements IndexDeleteService {
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void recreateIndex(SearchConnection searchConnection) {
|
||||
|
||||
var client = new EsClient(searchConnection);
|
||||
closeIndex(client, searchConnection.getIndexPrefix());
|
||||
dropIndex(client, searchConnection.getIndexPrefix());
|
||||
indexCreatorService.createIndex(client);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@SneakyThrows
|
||||
public void closeIndex() {
|
||||
|
||||
|
||||
@ -0,0 +1,70 @@
|
||||
package com.iqser.red.service.search.v1.server.service.elasticsearch;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.iqser.red.service.search.v1.server.service.IndexQueryService;
|
||||
import com.iqser.red.service.search.v1.server.service.IndexQueryResult;
|
||||
import com.iqser.red.service.search.v1.server.utils.IndexNameHelper;
|
||||
import com.knecon.fforesight.tenantcommons.model.SearchConnection;
|
||||
|
||||
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
|
||||
import co.elastic.clients.elasticsearch.indices.GetIndicesSettingsResponse;
|
||||
import co.elastic.clients.elasticsearch.indices.IndexState;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@ConditionalOnProperty(prefix = "search", name = "backend", havingValue = "elasticsearch")
|
||||
@SuppressWarnings("PMD")
|
||||
public class IndexQueryServiceImpl implements IndexQueryService {
|
||||
|
||||
@SneakyThrows
|
||||
public IndexQueryResult getIndexQueryResult(SearchConnection searchConnection) {
|
||||
|
||||
IndexQueryResult.IndexQueryResultBuilder builder = IndexQueryResult.builder();
|
||||
|
||||
getIndexState(searchConnection).ifPresent(indexState -> {
|
||||
builder.indexFound(true);
|
||||
|
||||
var indexSettings = indexState.settings();
|
||||
if (indexSettings != null) {
|
||||
|
||||
String replicas = indexSettings.numberOfReplicas();
|
||||
String shards = indexSettings.numberOfShards();
|
||||
|
||||
if (indexSettings.index() != null) {
|
||||
|
||||
if (replicas == null) {
|
||||
replicas = indexSettings.index().numberOfReplicas();
|
||||
}
|
||||
if (shards == null) {
|
||||
shards = indexSettings.index().numberOfShards();
|
||||
}
|
||||
}
|
||||
builder.numberOfReplicas(replicas).numberOfShards(shards);
|
||||
}
|
||||
});
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
||||
@SneakyThrows
|
||||
private Optional<IndexState> getIndexState(SearchConnection searchConnection) {
|
||||
|
||||
var esClient = new EsClient(searchConnection);
|
||||
var indexName = IndexNameHelper.getSearchIndex(esClient.getSearchConnection().getIndexPrefix());
|
||||
try {
|
||||
GetIndicesSettingsResponse settings = esClient.indices().getSettings(i -> i.index(indexName));
|
||||
return Optional.ofNullable(settings.get(indexName));
|
||||
} catch (ElasticsearchException elasticsearchException) {
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -42,4 +42,12 @@ public class DocumentUpdateServiceImpl implements DocumentUpdateService {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@SneakyThrows
|
||||
@Timed("redactmanager_documentExists")
|
||||
public boolean documentExists(String fileId) {
|
||||
|
||||
return clientCache.getClient().exists(e -> e.index(IndexNameHelper.getSearchIndex(clientCache.getClient().getSearchConnection().getIndexPrefix())).id(fileId)).value();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -30,6 +30,17 @@ public class IndexDeleteServiceImpl implements IndexDeleteService {
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void recreateIndex(SearchConnection searchConnection) {
|
||||
|
||||
var client = new OpensearchClient(searchConnection);
|
||||
closeIndex(client, searchConnection.getIndexPrefix());
|
||||
dropIndex(client, searchConnection.getIndexPrefix());
|
||||
indexCreatorService.createIndex(client);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@SneakyThrows
|
||||
public void closeIndex() {
|
||||
|
||||
@ -55,7 +66,8 @@ public class IndexDeleteServiceImpl implements IndexDeleteService {
|
||||
@SneakyThrows
|
||||
private void closeIndex(OpensearchClient opensearchClient, String indexPrefix) {
|
||||
|
||||
var closeIndexResponse = opensearchClient.indices().close(i -> i.index(IndexNameHelper.getSearchIndex(indexPrefix)).timeout(t -> t.time("2m")));
|
||||
var closeIndexResponse = opensearchClient.indices()
|
||||
.close(i -> i.index(IndexNameHelper.getSearchIndex(indexPrefix)).timeout(t -> t.time("2m")));
|
||||
if (closeIndexResponse.acknowledged()) {
|
||||
log.info("Index is closed");
|
||||
} else {
|
||||
|
||||
@ -0,0 +1,56 @@
|
||||
package com.iqser.red.service.search.v1.server.service.opensearch;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.opensearch.client.opensearch._types.OpenSearchException;
|
||||
import org.opensearch.client.opensearch.indices.GetIndicesSettingsResponse;
|
||||
import org.opensearch.client.opensearch.indices.IndexState;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.iqser.red.service.search.v1.server.service.IndexQueryResult;
|
||||
import com.iqser.red.service.search.v1.server.service.IndexQueryService;
|
||||
import com.iqser.red.service.search.v1.server.utils.IndexNameHelper;
|
||||
import com.knecon.fforesight.tenantcommons.model.SearchConnection;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@ConditionalOnProperty(prefix = "search", name = "backend", havingValue = "opensearch")
|
||||
@SuppressWarnings("PMD")
|
||||
public class IndexQueryServiceImpl implements IndexQueryService {
|
||||
|
||||
@SneakyThrows
|
||||
public IndexQueryResult getIndexQueryResult(SearchConnection searchConnection) {
|
||||
|
||||
IndexQueryResult.IndexQueryResultBuilder builder = IndexQueryResult.builder();
|
||||
|
||||
Optional<IndexState> optionalIndexState = getIndexState(searchConnection);
|
||||
if (optionalIndexState.isPresent()) {
|
||||
builder.indexFound(true);
|
||||
var indexSettings = optionalIndexState.get().settings();
|
||||
if (indexSettings != null) {
|
||||
builder.numberOfReplicas(indexSettings.numberOfReplicas()).numberOfShards(indexSettings.numberOfShards());
|
||||
}
|
||||
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
private Optional<IndexState> getIndexState(SearchConnection searchConnection) {
|
||||
|
||||
var opensearchClient = new OpensearchClient(searchConnection);
|
||||
var indexName = IndexNameHelper.getSearchIndex(opensearchClient.getSearchConnection().getIndexPrefix());
|
||||
try {
|
||||
GetIndicesSettingsResponse settings = opensearchClient.indices().getSettings(i -> i.index(indexName));
|
||||
return Optional.ofNullable(settings.get(indexName));
|
||||
} catch (OpenSearchException openSearchException) {
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user