Merge branch 'hotfix-es-client-close' into 'master'

hotfix es client close/jvm error

See merge request redactmanager/search-service!24
This commit is contained in:
Timo Bejan 2024-04-05 14:14:37 +02:00
commit e0fb825cf7
12 changed files with 92 additions and 24 deletions

View File

@ -34,6 +34,7 @@ public class DocumentDeleteServiceImpl implements DocumentDeleteService {
try {
clientCache.getClient().delete(request);
} catch (IOException | ElasticsearchException e) {
clientCache.isClientAliveOrTerminate();
throw IndexException.documentDeleteError(fileId, e);
}
}

View File

@ -37,6 +37,7 @@ public class DocumentIndexServiceImpl implements DocumentIndexService {
.refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy()))
.document(indexDocument));
} catch (IOException | ElasticsearchException e) {
clientCache.isClientAliveOrTerminate();
throw IndexException.documentIndexError(indexDocument.getFileId(), e);
}
}

View File

@ -37,6 +37,7 @@ public class DocumentUpdateServiceImpl implements DocumentUpdateService {
.doc(indexDocumentUpdate)
.refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy())), IndexDocumentUpdate.class);
} catch (IOException | ElasticsearchException e) {
clientCache.isClientAliveOrTerminate();
throw IndexException.documentUpdateError(fileId, e);
}
}

View File

@ -2,6 +2,7 @@ package com.iqser.red.service.search.v1.server.service.elasticsearch;
import java.util.stream.Collectors;
import lombok.SneakyThrows;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
@ -24,7 +25,7 @@ import lombok.experimental.Delegate;
public class EsClient {
// Lower timeouts should be set per request.
private static final int ABSURD_HIGH_TIMEOUT = 90_000_000;
private static final int ABSURD_HIGH_TIMEOUT = 600_000;
private SearchConnection searchConnection;
@ -37,11 +38,12 @@ public class EsClient {
HttpHost[] httpHost = searchConnection.getHosts()
.stream()
.map(host -> new HttpHost(host, searchConnection.getPort(), searchConnection.getScheme()))
.collect(Collectors.toList())
.toList()
.toArray(new HttpHost[searchConnection.getHosts().size()]);
RestClientBuilder builder = RestClient.builder(httpHost)
.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(ABSURD_HIGH_TIMEOUT).setSocketTimeout(ABSURD_HIGH_TIMEOUT));
var builder = RestClient.builder(httpHost)
.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(ABSURD_HIGH_TIMEOUT)
.setSocketTimeout(ABSURD_HIGH_TIMEOUT));
if (searchConnection.getUsername() != null && !searchConnection.getUsername().isEmpty()) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
@ -55,4 +57,10 @@ public class EsClient {
this.elasticsearchClient = new ElasticsearchClient(transport);
}
@SneakyThrows
public void terminate() {
elasticsearchClient._transport().close();
}
}

View File

@ -38,6 +38,30 @@ public class EsClientCache {
private LoadingCache<String, EsClient> clients;
@SneakyThrows
public void isClientAliveOrTerminate() {
try {
var client = clients.get(TenantContext.getTenantId());
try {
log.info("Checking if client is still alive: {}", client.info());
} catch (Exception e) {
try {
client.terminate();
} catch (Exception e2) {
log.info("Failed to terminate ES Client");
clients.invalidate(TenantContext.getTenantId());
}
}
}catch (Exception e){
log.error("Failed to terminate/invalide client", e);
}
}
@PostConstruct
protected void createCache() {
@ -45,8 +69,12 @@ public class EsClientCache {
.maximumSize(maximumSize)
.expireAfterAccess(expireAfterAccess, TimeUnit.MINUTES)
.removalListener((RemovalListener<String, EsClient>) removal -> {
removal.getValue().shutdown();
log.info("Closed elasticsearch client for tenant {}", removal.getKey());
try {
removal.getValue().terminate();
log.info("Closed elasticsearch client for tenant {}", removal.getKey());
} catch (Exception e) {
log.info("Failed to close elasticsearch client for tenant {}", removal.getKey());
}
})
.build(new CacheLoader<>() {
public EsClient load(String tenantId) {

View File

@ -109,6 +109,7 @@ public class SearchServiceImpl implements SearchService {
try {
return clientCache.getClient().search(searchRequest, IndexDocument.class);
} catch (IOException e) {
clientCache.isClientAliveOrTerminate();
throw IndexException.searchFailed(e);
}
}

View File

@ -34,6 +34,7 @@ public class DocumentDeleteServiceImpl implements DocumentDeleteService {
try {
clientCache.getClient().delete(request);
} catch (IOException | OpenSearchException e) {
clientCache.isClientAliveOrTerminate();
throw IndexException.documentDeleteError(fileId, e);
}
}

View File

@ -36,6 +36,7 @@ public class DocumentIndexServiceImpl implements DocumentIndexService {
.refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy()))
.document(indexDocument));
} catch (IOException | OpenSearchException e) {
clientCache.isClientAliveOrTerminate();
throw IndexException.documentIndexError(indexDocument.getFileId(), e);
}
}

View File

@ -37,6 +37,7 @@ public class DocumentUpdateServiceImpl implements DocumentUpdateService {
.doc(indexDocumentUpdate)
.refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy())), IndexDocumentUpdate.class);
} catch (IOException | OpenSearchException e) {
clientCache.isClientAliveOrTerminate();
throw IndexException.documentUpdateError(fileId, e);
}
}

View File

@ -1,30 +1,25 @@
package com.iqser.red.service.search.v1.server.service.opensearch;
import java.util.stream.Collectors;
import com.knecon.fforesight.tenantcommons.model.SearchConnection;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.experimental.Delegate;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.transport.rest_client.RestClientTransport;
import com.knecon.fforesight.tenantcommons.model.SearchConnection;
import jakarta.annotation.PreDestroy;
import lombok.Data;
import lombok.experimental.Delegate;
@Data
@SuppressWarnings("PMD")
public class OpensearchClient {
// Lower timeouts should be set per request.
private static final int ABSURD_HIGH_TIMEOUT = 90_000_000;
private static final int ABSURD_HIGH_TIMEOUT = 600_000;
private SearchConnection searchConnection;
@ -37,11 +32,13 @@ public class OpensearchClient {
HttpHost[] httpHost = searchConnection.getHosts()
.stream()
.map(host -> new HttpHost(host, searchConnection.getPort(), searchConnection.getScheme()))
.collect(Collectors.toList())
.toList()
.toArray(new HttpHost[searchConnection.getHosts().size()]);
RestClientBuilder builder = RestClient.builder(httpHost)
.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(ABSURD_HIGH_TIMEOUT).setSocketTimeout(ABSURD_HIGH_TIMEOUT));
var builder = RestClient.builder(httpHost)
.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(0)
.setConnectionRequestTimeout(ABSURD_HIGH_TIMEOUT)
.setSocketTimeout(ABSURD_HIGH_TIMEOUT));
if (searchConnection.getUsername() != null && !searchConnection.getUsername().isEmpty()) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
@ -56,10 +53,10 @@ public class OpensearchClient {
}
@PreDestroy
public void onShutdown() {
@SneakyThrows
public void terminate() {
client.shutdown();
client._transport().close();
}
}

View File

@ -38,6 +38,29 @@ public class OpensearchClientCache {
private LoadingCache<String, OpensearchClient> clients;
@SneakyThrows
public void isClientAliveOrTerminate() {
try {
var client = clients.get(TenantContext.getTenantId());
try {
log.info("Checking if client is still alive: {}", client.info());
} catch (Exception e) {
try {
client.terminate();
} catch (Exception e2) {
log.info("Failed to terminate ES Client");
clients.invalidate(TenantContext.getTenantId());
}
}
}catch (Exception e){
log.error("Failed to terminate/invalide client", e);
}
}
@PostConstruct
protected void createCache() {
@ -45,8 +68,12 @@ public class OpensearchClientCache {
.maximumSize(maximumSize)
.expireAfterAccess(expireAfterAccess, TimeUnit.MINUTES)
.removalListener((RemovalListener<String, OpensearchClient>) removal -> {
removal.getValue().shutdown();
log.info("Closed elasticsearch client for tenant {}", removal.getKey());
try {
removal.getValue().terminate();
log.info("Closed elasticsearch client for tenant {}", removal.getKey());
} catch (Exception e) {
log.info("Failed to close elasticsearch client for tenant {}", removal.getKey());
}
})
.build(new CacheLoader<>() {
public OpensearchClient load(String tenantId) {

View File

@ -110,6 +110,7 @@ public class SearchServiceImpl implements SearchService {
try {
return clientCache.getClient().search(searchRequest, IndexDocument.class);
} catch (IOException e) {
clientCache.isClientAliveOrTerminate();
throw IndexException.searchFailed(e);
}
}