RED-6224: Reimplemented client cache logic
This commit is contained in:
parent
0471b7f52c
commit
5da599bc9b
@ -0,0 +1,20 @@
|
||||
package com.iqser.red.service.search.v1.server.model;
|
||||
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.multitenancy.SearchConnection;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
|
||||
public class Connection {
|
||||
|
||||
@EqualsAndHashCode.Include
|
||||
private String hosts;
|
||||
private SearchConnection searchConnection;
|
||||
|
||||
}
|
||||
@ -14,6 +14,7 @@ import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import com.google.common.cache.RemovalListener;
|
||||
import com.iqser.red.service.search.v1.server.client.TenantsClient;
|
||||
import com.iqser.red.service.search.v1.server.model.Connection;
|
||||
import com.iqser.red.service.search.v1.server.multitenancy.EncryptionDecryptionService;
|
||||
import com.iqser.red.service.search.v1.server.multitenancy.TenantContext;
|
||||
|
||||
@ -37,49 +38,36 @@ public class EsClientCache {
|
||||
@Value("${multitenancy.client-cache.expireAfterAccess:10}")
|
||||
private Integer expireAfterAccess;
|
||||
|
||||
private LoadingCache<String, EsClient> clients;
|
||||
private LoadingCache<String, Connection> connections;
|
||||
private LoadingCache<Connection, EsClient> clients;
|
||||
|
||||
|
||||
@PostConstruct
|
||||
protected void createCache() {
|
||||
|
||||
connections = CacheBuilder.newBuilder().maximumSize(maximumSize).expireAfterAccess(expireAfterAccess, TimeUnit.MINUTES).build(new CacheLoader<>() {
|
||||
public Connection load(String tenantId) {
|
||||
|
||||
var tenant = tenantsClient.getTenant(tenantId);
|
||||
var hostsAsString = tenant.getSearchConnection().getHosts().stream().collect(Collectors.joining());
|
||||
return Connection.builder().hosts(hostsAsString).searchConnection(tenant.getSearchConnection()).build();
|
||||
}
|
||||
});
|
||||
|
||||
clients = CacheBuilder.newBuilder()
|
||||
.maximumSize(maximumSize)
|
||||
.expireAfterAccess(expireAfterAccess, TimeUnit.MINUTES)
|
||||
.removalListener((RemovalListener<String, EsClient>) removal -> {
|
||||
var clientToRemove = removal.getValue();
|
||||
int numberOfUsersForSameClient = 0;
|
||||
for (var client : clients.asMap().values()) {
|
||||
if (clientToRemove.getElasticsearchClient().equals(client.getElasticsearchClient())){
|
||||
numberOfUsersForSameClient++;
|
||||
}
|
||||
}
|
||||
if(numberOfUsersForSameClient == 0){
|
||||
clientToRemove.shutdown();
|
||||
log.info("Closed elasticsearch client for tenant {}", removal.getKey());
|
||||
} else {
|
||||
log.info("Keeping client open from {} because it is still used by {} other tenants", removal.getKey(), numberOfUsersForSameClient);
|
||||
}
|
||||
|
||||
.removalListener((RemovalListener<Connection, EsClient>) removal -> {
|
||||
removal.getValue().shutdown();
|
||||
log.info("Closed elasticsearch client for tenant {}", removal.getKey().getHosts());
|
||||
})
|
||||
.build(new CacheLoader<>() {
|
||||
public EsClient load(String key) {
|
||||
public EsClient load(Connection key) {
|
||||
|
||||
var tenant = tenantsClient.getTenant(key);
|
||||
|
||||
// Do not create new client if client with equal hosts is already available.
|
||||
var hostsAsString = tenant.getSearchConnection().getHosts().stream().collect(Collectors.joining());
|
||||
for (var client : clients.asMap().values()) {
|
||||
if (client.getSearchConnection().getHosts().stream().collect(Collectors.joining()).equals(hostsAsString)) {
|
||||
indexCreatorService.createIndex(client);
|
||||
return client;
|
||||
}
|
||||
if (key.getSearchConnection().getPassword() != null) {
|
||||
key.getSearchConnection().setPassword(encryptionDecryptionService.decrypt(key.getSearchConnection().getPassword()));
|
||||
}
|
||||
|
||||
if (tenant.getSearchConnection().getPassword() != null) {
|
||||
tenant.getSearchConnection().setPassword(encryptionDecryptionService.decrypt(tenant.getSearchConnection().getPassword()));
|
||||
}
|
||||
var client = new EsClient(tenant.getSearchConnection());
|
||||
var client = new EsClient(key.getSearchConnection());
|
||||
log.info("Initialized elasticsearch client for tenant {}", key);
|
||||
indexCreatorService.createIndex(client);
|
||||
return client;
|
||||
@ -91,7 +79,8 @@ public class EsClientCache {
|
||||
@SneakyThrows
|
||||
public EsClient getClient() {
|
||||
|
||||
return clients.get(TenantContext.getTenantId());
|
||||
var connection = connections.get(TenantContext.getTenantId());
|
||||
return clients.get(connection);
|
||||
}
|
||||
|
||||
}
|
||||
@ -14,6 +14,7 @@ import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import com.google.common.cache.RemovalListener;
|
||||
import com.iqser.red.service.search.v1.server.client.TenantsClient;
|
||||
import com.iqser.red.service.search.v1.server.model.Connection;
|
||||
import com.iqser.red.service.search.v1.server.multitenancy.EncryptionDecryptionService;
|
||||
import com.iqser.red.service.search.v1.server.multitenancy.TenantContext;
|
||||
|
||||
@ -37,50 +38,37 @@ public class OpensearchClientCache {
|
||||
@Value("${multitenancy.client-cache.expireAfterAccess:10}")
|
||||
private Integer expireAfterAccess;
|
||||
|
||||
private LoadingCache<String, OpensearchClient> clients;
|
||||
private LoadingCache<String, Connection> connections;
|
||||
private LoadingCache<Connection, OpensearchClient> clients;
|
||||
|
||||
|
||||
@PostConstruct
|
||||
protected void createCache() {
|
||||
|
||||
connections = CacheBuilder.newBuilder().maximumSize(maximumSize).expireAfterAccess(expireAfterAccess, TimeUnit.MINUTES).build(new CacheLoader<>() {
|
||||
public Connection load(String tenantId) {
|
||||
|
||||
var tenant = tenantsClient.getTenant(tenantId);
|
||||
var hostsAsString = tenant.getSearchConnection().getHosts().stream().collect(Collectors.joining());
|
||||
return Connection.builder().hosts(hostsAsString).searchConnection(tenant.getSearchConnection()).build();
|
||||
}
|
||||
});
|
||||
|
||||
clients = CacheBuilder.newBuilder()
|
||||
.maximumSize(maximumSize)
|
||||
.expireAfterAccess(expireAfterAccess, TimeUnit.MINUTES)
|
||||
.removalListener((RemovalListener<String, OpensearchClient>) removal -> {
|
||||
var clientToRemove = removal.getValue();
|
||||
int numberOfUsersForSameClient = 0;
|
||||
for (var client : clients.asMap().values()) {
|
||||
if (clientToRemove.getClient().equals(client.getClient())){
|
||||
numberOfUsersForSameClient++;
|
||||
}
|
||||
}
|
||||
if(numberOfUsersForSameClient == 0){
|
||||
clientToRemove.shutdown();
|
||||
log.info("Closed elasticsearch client for tenant {}", removal.getKey());
|
||||
} else {
|
||||
log.info("Keeping client open from {} because it is still used by {} other tenants", removal.getKey(), numberOfUsersForSameClient);
|
||||
}
|
||||
|
||||
.removalListener((RemovalListener<Connection, OpensearchClient>) removal -> {
|
||||
removal.getValue().shutdown();
|
||||
log.info("Closed opensearch client for tenant {}", removal.getKey().getHosts());
|
||||
})
|
||||
.build(new CacheLoader<>() {
|
||||
public OpensearchClient load(String key) {
|
||||
public OpensearchClient load(Connection key) {
|
||||
|
||||
var tenant = tenantsClient.getTenant(key);
|
||||
|
||||
// Do not create new client if client with equal hosts is already available.
|
||||
var hostsAsString = tenant.getSearchConnection().getHosts().stream().collect(Collectors.joining());
|
||||
for (var client : clients.asMap().values()) {
|
||||
if (client.getSearchConnection().getHosts().stream().collect(Collectors.joining()).equals(hostsAsString)) {
|
||||
indexCreatorService.createIndex(client);
|
||||
return client;
|
||||
}
|
||||
if (key.getSearchConnection().getPassword() != null) {
|
||||
key.getSearchConnection().setPassword(encryptionDecryptionService.decrypt(key.getSearchConnection().getPassword()));
|
||||
}
|
||||
|
||||
if (tenant.getSearchConnection().getPassword() != null) {
|
||||
tenant.getSearchConnection().setPassword(encryptionDecryptionService.decrypt(tenant.getSearchConnection().getPassword()));
|
||||
}
|
||||
var client = new OpensearchClient(tenant.getSearchConnection());
|
||||
log.info("Initialized elasticsearch client for tenant {}", key);
|
||||
var client = new OpensearchClient(key.getSearchConnection());
|
||||
log.info("Initialized opensearch client for tenant {}", key);
|
||||
indexCreatorService.createIndex(client);
|
||||
return client;
|
||||
}
|
||||
@ -91,7 +79,8 @@ public class OpensearchClientCache {
|
||||
@SneakyThrows
|
||||
public OpensearchClient getClient() {
|
||||
|
||||
return clients.get(TenantContext.getTenantId());
|
||||
var connection = connections.get(TenantContext.getTenantId());
|
||||
return clients.get(connection);
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user