RED-7128: Handle delete tenant events

This commit is contained in:
Dominique Eifländer 2024-01-22 15:15:23 +01:00
parent 03c6248b84
commit 496c6a1da8
5 changed files with 124 additions and 14 deletions

View File

@ -0,0 +1,38 @@
package com.iqser.red.service.search.v1.server.queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.iqser.red.service.search.v1.server.service.IndexDeleteService;
import com.knecon.fforesight.tenantcommons.model.TenantResponse;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
@RequiredArgsConstructor
public class DeleteTenantListener {
private final IndexDeleteService indexDeleteService;
@Value("${fforesight.multitenancy.tenant-delete-queue:tenant-delete}")
private String tenantDeleteQueue;
@PostConstruct
public void postConstruct() {
log.info("Listener for tenant-delete started for queue: {}", this.tenantDeleteQueue);
}
@RabbitListener(queues = "${fforesight.multitenancy.tenant-delete-queue:tenant-delete}")
public void deleteTenant(TenantResponse tenant) {
indexDeleteService.dropIndex(tenant.getSearchConnection());
}
}

View File

@ -2,6 +2,8 @@ package com.iqser.red.service.search.v1.server.queue;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -21,6 +23,13 @@ public class MessagingConfiguration {
public static final String X_ERROR_INFO_TIMESTAMP_HEADER = "x-error-message-timestamp";
@Value("${fforesight.multitenancy.tenant-delete-queue:tenant-delete}")
private String tenantDeleteEventQueue;
@Value("${fforesight.multitenancy.tenant-delete-dlq:tenant-delete-dlq}")
private String tenantDeleteDLQ;
@Bean
public Queue indexingQueue() {
@ -52,4 +61,22 @@ public class MessagingConfiguration {
return QueueBuilder.durable(DELETE_FROM_INDEX_DLQ).build();
}
// Tentant Delete Event Queue
TopicExchange tenantExchange(@Value("${fforesight.tenant-exchange.name}") String tenantExchangeName) {
return new TopicExchange(tenantExchangeName);
}
public Queue tenantDeleteQueue() {
return QueueBuilder.durable(this.tenantDeleteEventQueue).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", this.tenantDeleteDLQ).build();
}
@Bean
public Queue tenantDeleteDLQ() {
return QueueBuilder.durable(this.tenantDeleteDLQ).build();
}
}

View File

@ -1,5 +1,7 @@
package com.iqser.red.service.search.v1.server.service;
import com.knecon.fforesight.tenantcommons.model.SearchConnection;
public interface IndexDeleteService {
void recreateIndex();
@ -10,4 +12,6 @@ public interface IndexDeleteService {
void dropIndex();
void dropIndex(SearchConnection searchConnection);
}

View File

@ -6,6 +6,7 @@ import org.springframework.stereotype.Service;
import com.iqser.red.service.search.v1.server.exception.IndexException;
import com.iqser.red.service.search.v1.server.service.IndexDeleteService;
import com.iqser.red.service.search.v1.server.utils.IndexNameHelper;
import com.knecon.fforesight.tenantcommons.model.SearchConnection;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
@ -32,9 +33,31 @@ public class IndexDeleteServiceImpl implements IndexDeleteService {
@SneakyThrows
public void closeIndex() {
var closeIndexResponse = clientCache.getClient()
.indices()
.close(i -> i.index(IndexNameHelper.getSearchIndex(clientCache.getClient().getSearchConnection().getIndexPrefix())).timeout(t -> t.time("2m")));
closeIndex(clientCache.getClient(), clientCache.getClient().getSearchConnection().getIndexPrefix());
}
@SneakyThrows
public void dropIndex() {
dropIndex(clientCache.getClient(), clientCache.getClient().getSearchConnection().getIndexPrefix());
}
public void dropIndex(SearchConnection searchConnection) {
var client = new EsClient(searchConnection);
closeIndex(client, searchConnection.getIndexPrefix());
dropIndex(client, searchConnection.getIndexPrefix());
}
@SneakyThrows
private void closeIndex(EsClient client, String indexPrefix) {
var closeIndexResponse = client.indices()
.close(i -> i.index(IndexNameHelper.getSearchIndex(indexPrefix)).timeout(t -> t.time("2m")));
if (closeIndexResponse.acknowledged()) {
log.info("Index is closed");
} else {
@ -44,12 +67,11 @@ public class IndexDeleteServiceImpl implements IndexDeleteService {
@SneakyThrows
public void dropIndex() {
private void dropIndex(EsClient client, String indexPrefix) {
log.info("Will drop index");
var deleteIndexResponse = clientCache.getClient()
.indices()
.delete(i -> i.index(IndexNameHelper.getSearchIndex(clientCache.getClient().getSearchConnection().getIndexPrefix())).timeout(t -> t.time("2m")));
var deleteIndexResponse = client.indices()
.delete(i -> i.index(IndexNameHelper.getSearchIndex(indexPrefix)).timeout(t -> t.time("2m")));
if (deleteIndexResponse.acknowledged()) {
log.info("Index is dropped");

View File

@ -6,6 +6,7 @@ import org.springframework.stereotype.Service;
import com.iqser.red.service.search.v1.server.exception.IndexException;
import com.iqser.red.service.search.v1.server.service.IndexDeleteService;
import com.iqser.red.service.search.v1.server.utils.IndexNameHelper;
import com.knecon.fforesight.tenantcommons.model.SearchConnection;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
@ -32,9 +33,29 @@ public class IndexDeleteServiceImpl implements IndexDeleteService {
@SneakyThrows
public void closeIndex() {
var closeIndexResponse = clientCache.getClient()
.indices()
.close(i -> i.index(IndexNameHelper.getSearchIndex(clientCache.getClient().getSearchConnection().getIndexPrefix())).timeout(t -> t.time("2m")));
closeIndex(clientCache.getClient(), clientCache.getClient().getSearchConnection().getIndexPrefix());
}
@SneakyThrows
public void dropIndex() {
dropIndex(clientCache.getClient(), clientCache.getClient().getSearchConnection().getIndexPrefix());
}
public void dropIndex(SearchConnection searchConnection) {
var client = new OpensearchClient(searchConnection);
closeIndex(client, searchConnection.getIndexPrefix());
dropIndex(client, searchConnection.getIndexPrefix());
}
@SneakyThrows
private void closeIndex(OpensearchClient opensearchClient, String indexPrefix) {
var closeIndexResponse = opensearchClient.indices().close(i -> i.index(IndexNameHelper.getSearchIndex(indexPrefix)).timeout(t -> t.time("2m")));
if (closeIndexResponse.acknowledged()) {
log.info("Index is closed");
} else {
@ -44,12 +65,10 @@ public class IndexDeleteServiceImpl implements IndexDeleteService {
@SneakyThrows
public void dropIndex() {
private void dropIndex(OpensearchClient opensearchClient, String indexPrefix) {
log.info("Will drop index");
var deleteIndexResponse = clientCache.getClient()
.indices()
.delete(i -> i.index(IndexNameHelper.getSearchIndex(clientCache.getClient().getSearchConnection().getIndexPrefix())).timeout(t -> t.time("2m")));
var deleteIndexResponse = opensearchClient.indices().delete(i -> i.index(IndexNameHelper.getSearchIndex(indexPrefix)).timeout(t -> t.time("2m")));
if (deleteIndexResponse.acknowledged()) {
log.info("Index is dropped");