Merge branch 'RED-7128' into 'master'

RED-6668: Handle delete tenant events

Closes RED-7128

See merge request redactmanager/search-service!20
This commit is contained in:
Yannik Hampe 2024-01-26 09:29:56 +01:00
commit 1cef042c1e
7 changed files with 145 additions and 15 deletions

View File

@ -0,0 +1,38 @@
package com.iqser.red.service.search.v1.server.queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.iqser.red.service.search.v1.server.service.IndexDeleteService;
import com.knecon.fforesight.tenantcommons.model.TenantResponse;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
@RequiredArgsConstructor
public class DeleteTenantMessageReceiver {
private final IndexDeleteService indexDeleteService;
@Value("${fforesight.multitenancy.tenant-delete-queue:tenant-delete-queue}")
private String tenantDeleteQueue;
@PostConstruct
public void postConstruct() {
log.info("Listener for tenant-delete started for queue: {}", this.tenantDeleteQueue);
}
@RabbitListener(queues = "${fforesight.multitenancy.tenant-delete-queue:tenant-delete-queue}")
public void deleteTenant(TenantResponse tenant) {
indexDeleteService.dropIndex(tenant.getSearchConnection());
}
}

View File

@ -1,7 +1,12 @@
package com.iqser.red.service.search.v1.server.queue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -21,6 +26,20 @@ public class MessagingConfiguration {
public static final String X_ERROR_INFO_TIMESTAMP_HEADER = "x-error-message-timestamp";
@Value("${fforesight.multitenancy.tenant-delete-queue:tenant-delete-queue}")
private String tenantDeleteEventQueue;
@Value("${fforesight.multitenancy.tenant-delete-dlq:tenant-delete-dlq}")
private String tenantDeleteDLQ;
@Bean
public Binding tenantExchangeDeleteBinding(@Qualifier("tenantUserManagementTenantDeleteQueue") Queue tenantUserManagementTenantDeleteQueue,
@Qualifier("tenantExchange") TopicExchange tenantExchange) {
return BindingBuilder.bind(tenantUserManagementTenantDeleteQueue).to(tenantExchange).with("tenant.delete");
}
@Bean
public Queue indexingQueue() {
@ -52,4 +71,24 @@ public class MessagingConfiguration {
return QueueBuilder.durable(DELETE_FROM_INDEX_DLQ).build();
}
// Tentant Delete Event Queue
@Bean(name = "tenantExchange")
TopicExchange tenantExchange(@Value("${fforesight.tenant-exchange.name}") String tenantExchangeName) {
return new TopicExchange(tenantExchangeName);
}
@Bean("tenantUserManagementTenantDeleteQueue")
public Queue tenantDeleteQueue() {
return QueueBuilder.durable(this.tenantDeleteEventQueue).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", this.tenantDeleteDLQ).build();
}
@Bean
public Queue tenantDeleteDLQ() {
return QueueBuilder.durable(this.tenantDeleteDLQ).build();
}
}

View File

@ -1,5 +1,7 @@
package com.iqser.red.service.search.v1.server.service;
import com.knecon.fforesight.tenantcommons.model.SearchConnection;
public interface IndexDeleteService {
void recreateIndex();
@ -10,4 +12,6 @@ public interface IndexDeleteService {
void dropIndex();
void dropIndex(SearchConnection searchConnection);
}

View File

@ -6,6 +6,7 @@ import org.springframework.stereotype.Service;
import com.iqser.red.service.search.v1.server.exception.IndexException;
import com.iqser.red.service.search.v1.server.service.IndexDeleteService;
import com.iqser.red.service.search.v1.server.utils.IndexNameHelper;
import com.knecon.fforesight.tenantcommons.model.SearchConnection;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
@ -32,9 +33,31 @@ public class IndexDeleteServiceImpl implements IndexDeleteService {
@SneakyThrows
public void closeIndex() {
var closeIndexResponse = clientCache.getClient()
.indices()
.close(i -> i.index(IndexNameHelper.getSearchIndex(clientCache.getClient().getSearchConnection().getIndexPrefix())).timeout(t -> t.time("2m")));
closeIndex(clientCache.getClient(), clientCache.getClient().getSearchConnection().getIndexPrefix());
}
@SneakyThrows
public void dropIndex() {
dropIndex(clientCache.getClient(), clientCache.getClient().getSearchConnection().getIndexPrefix());
}
public void dropIndex(SearchConnection searchConnection) {
var client = new EsClient(searchConnection);
closeIndex(client, searchConnection.getIndexPrefix());
dropIndex(client, searchConnection.getIndexPrefix());
}
@SneakyThrows
private void closeIndex(EsClient client, String indexPrefix) {
var closeIndexResponse = client.indices()
.close(i -> i.index(IndexNameHelper.getSearchIndex(indexPrefix)).timeout(t -> t.time("2m")));
if (closeIndexResponse.acknowledged()) {
log.info("Index is closed");
} else {
@ -44,12 +67,11 @@ public class IndexDeleteServiceImpl implements IndexDeleteService {
@SneakyThrows
public void dropIndex() {
private void dropIndex(EsClient client, String indexPrefix) {
log.info("Will drop index");
var deleteIndexResponse = clientCache.getClient()
.indices()
.delete(i -> i.index(IndexNameHelper.getSearchIndex(clientCache.getClient().getSearchConnection().getIndexPrefix())).timeout(t -> t.time("2m")));
var deleteIndexResponse = client.indices()
.delete(i -> i.index(IndexNameHelper.getSearchIndex(indexPrefix)).timeout(t -> t.time("2m")));
if (deleteIndexResponse.acknowledged()) {
log.info("Index is dropped");

View File

@ -6,6 +6,7 @@ import org.springframework.stereotype.Service;
import com.iqser.red.service.search.v1.server.exception.IndexException;
import com.iqser.red.service.search.v1.server.service.IndexDeleteService;
import com.iqser.red.service.search.v1.server.utils.IndexNameHelper;
import com.knecon.fforesight.tenantcommons.model.SearchConnection;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
@ -32,9 +33,29 @@ public class IndexDeleteServiceImpl implements IndexDeleteService {
@SneakyThrows
public void closeIndex() {
var closeIndexResponse = clientCache.getClient()
.indices()
.close(i -> i.index(IndexNameHelper.getSearchIndex(clientCache.getClient().getSearchConnection().getIndexPrefix())).timeout(t -> t.time("2m")));
closeIndex(clientCache.getClient(), clientCache.getClient().getSearchConnection().getIndexPrefix());
}
@SneakyThrows
public void dropIndex() {
dropIndex(clientCache.getClient(), clientCache.getClient().getSearchConnection().getIndexPrefix());
}
public void dropIndex(SearchConnection searchConnection) {
var client = new OpensearchClient(searchConnection);
closeIndex(client, searchConnection.getIndexPrefix());
dropIndex(client, searchConnection.getIndexPrefix());
}
@SneakyThrows
private void closeIndex(OpensearchClient opensearchClient, String indexPrefix) {
var closeIndexResponse = opensearchClient.indices().close(i -> i.index(IndexNameHelper.getSearchIndex(indexPrefix)).timeout(t -> t.time("2m")));
if (closeIndexResponse.acknowledged()) {
log.info("Index is closed");
} else {
@ -44,12 +65,10 @@ public class IndexDeleteServiceImpl implements IndexDeleteService {
@SneakyThrows
public void dropIndex() {
private void dropIndex(OpensearchClient opensearchClient, String indexPrefix) {
log.info("Will drop index");
var deleteIndexResponse = clientCache.getClient()
.indices()
.delete(i -> i.index(IndexNameHelper.getSearchIndex(clientCache.getClient().getSearchConnection().getIndexPrefix())).timeout(t -> t.time("2m")));
var deleteIndexResponse = opensearchClient.indices().delete(i -> i.index(IndexNameHelper.getSearchIndex(indexPrefix)).timeout(t -> t.time("2m")));
if (deleteIndexResponse.acknowledged()) {
log.info("Index is dropped");

View File

@ -3,7 +3,10 @@ info:
persistence-service.url: "http://persistence-service-v1:8080"
tenant-user-management-service.url: "http://tenant-user-management-service:8080/internal"
fforesight.tenants.remote: true
fforesight:
tenants.remote: true
tenant-exchange.name: 'tenants-exchange'
logging.pattern.level: "%5p [${spring.application.name},%X{traceId:-},%X{spanId:-}]"

View File

@ -7,6 +7,11 @@ logging.type: ${LOGGING_TYPE:CONSOLE}
logging.level.root: INFO
fforesight:
tenant-exchange:
name: 'tenants-exchange'
spring:
main:
allow-bean-definition-overriding: true