From fe28c1463a834d8183d0ed97e9d777bf1a9c2434 Mon Sep 17 00:00:00 2001 From: Timo Bejan Date: Thu, 28 Mar 2024 16:07:10 +0200 Subject: [PATCH 1/2] hotfix es client close/jvm error --- .../DocumentDeleteServiceImpl.java | 1 + .../DocumentIndexServiceImpl.java | 1 + .../DocumentUpdateServiceImpl.java | 1 + .../service/elasticsearch/EsClient.java | 17 ++++++---- .../service/elasticsearch/EsClientCache.java | 32 +++++++++++++++++-- .../elasticsearch/SearchServiceImpl.java | 1 + .../opensearch/DocumentDeleteServiceImpl.java | 1 + .../opensearch/DocumentIndexServiceImpl.java | 1 + .../opensearch/DocumentUpdateServiceImpl.java | 1 + .../service/opensearch/OpensearchClient.java | 26 +++++++-------- .../opensearch/OpensearchClientCache.java | 31 ++++++++++++++++-- .../service/opensearch/SearchServiceImpl.java | 1 + 12 files changed, 89 insertions(+), 25 deletions(-) diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentDeleteServiceImpl.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentDeleteServiceImpl.java index 12ee950..a5ae63f 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentDeleteServiceImpl.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentDeleteServiceImpl.java @@ -34,6 +34,7 @@ public class DocumentDeleteServiceImpl implements DocumentDeleteService { try { clientCache.getClient().delete(request); } catch (IOException | ElasticsearchException e) { + clientCache.isClientAliveOrTerminate(); throw IndexException.documentDeleteError(fileId, e); } } diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentIndexServiceImpl.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentIndexServiceImpl.java index 99db581..b46d73b 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentIndexServiceImpl.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentIndexServiceImpl.java @@ -37,6 +37,7 @@ public class DocumentIndexServiceImpl implements DocumentIndexService { .refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy())) .document(indexDocument)); } catch (IOException | ElasticsearchException e) { + clientCache.isClientAliveOrTerminate(); throw IndexException.documentIndexError(indexDocument.getFileId(), e); } } diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentUpdateServiceImpl.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentUpdateServiceImpl.java index bed0165..9702187 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentUpdateServiceImpl.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentUpdateServiceImpl.java @@ -37,6 +37,7 @@ public class DocumentUpdateServiceImpl implements DocumentUpdateService { .doc(indexDocumentUpdate) .refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy())), IndexDocumentUpdate.class); } catch (IOException | ElasticsearchException e) { + clientCache.isClientAliveOrTerminate(); throw IndexException.documentUpdateError(fileId, e); } } diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/EsClient.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/EsClient.java index 6a73b31..c28e29c 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/EsClient.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/EsClient.java @@ -2,6 +2,7 @@ package com.iqser.red.service.search.v1.server.service.elasticsearch; import java.util.stream.Collectors; +import lombok.SneakyThrows; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; @@ -23,9 +24,6 @@ import lombok.experimental.Delegate; @SuppressWarnings("PMD") public class EsClient { - // Lower timeouts should be set per request. - private static final int ABSURD_HIGH_TIMEOUT = 90_000_000; - private SearchConnection searchConnection; @Delegate @@ -37,11 +35,12 @@ public class EsClient { HttpHost[] httpHost = searchConnection.getHosts() .stream() .map(host -> new HttpHost(host, searchConnection.getPort(), searchConnection.getScheme())) - .collect(Collectors.toList()) + .toList() .toArray(new HttpHost[searchConnection.getHosts().size()]); - RestClientBuilder builder = RestClient.builder(httpHost) - .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(ABSURD_HIGH_TIMEOUT).setSocketTimeout(ABSURD_HIGH_TIMEOUT)); + var builder = RestClient.builder(httpHost) + .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(30_000) + .setSocketTimeout(120_000)); if (searchConnection.getUsername() != null && !searchConnection.getUsername().isEmpty()) { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); @@ -55,4 +54,10 @@ public class EsClient { this.elasticsearchClient = new ElasticsearchClient(transport); } + @SneakyThrows + public void terminate() { + + elasticsearchClient._transport().close(); + } + } diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/EsClientCache.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/EsClientCache.java index deb6ae2..210967f 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/EsClientCache.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/EsClientCache.java @@ -38,6 +38,30 @@ public class EsClientCache { private LoadingCache clients; + @SneakyThrows + public void isClientAliveOrTerminate() { + + try { + var client = clients.get(TenantContext.getTenantId()); + try { + + log.info("Checking if client is still alive: {}", client.info()); + } catch (Exception e) { + + try { + client.terminate(); + } catch (Exception e2) { + + log.info("Failed to terminate ES Client"); + clients.invalidate(TenantContext.getTenantId()); + } + } + }catch (Exception e){ + log.error("Failed to terminate/invalide client", e); + } + } + + @PostConstruct protected void createCache() { @@ -45,8 +69,12 @@ public class EsClientCache { .maximumSize(maximumSize) .expireAfterAccess(expireAfterAccess, TimeUnit.MINUTES) .removalListener((RemovalListener) removal -> { - removal.getValue().shutdown(); - log.info("Closed elasticsearch client for tenant {}", removal.getKey()); + try { + removal.getValue().terminate(); + log.info("Closed elasticsearch client for tenant {}", removal.getKey()); + } catch (Exception e) { + log.info("Failed to close elasticsearch client for tenant {}", removal.getKey()); + } }) .build(new CacheLoader<>() { public EsClient load(String tenantId) { diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/SearchServiceImpl.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/SearchServiceImpl.java index bc0235f..5c2f63d 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/SearchServiceImpl.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/SearchServiceImpl.java @@ -109,6 +109,7 @@ public class SearchServiceImpl implements SearchService { try { return clientCache.getClient().search(searchRequest, IndexDocument.class); } catch (IOException e) { + clientCache.isClientAliveOrTerminate(); throw IndexException.searchFailed(e); } } diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentDeleteServiceImpl.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentDeleteServiceImpl.java index 76b8d34..fd6ab5d 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentDeleteServiceImpl.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentDeleteServiceImpl.java @@ -34,6 +34,7 @@ public class DocumentDeleteServiceImpl implements DocumentDeleteService { try { clientCache.getClient().delete(request); } catch (IOException | OpenSearchException e) { + clientCache.isClientAliveOrTerminate(); throw IndexException.documentDeleteError(fileId, e); } } diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentIndexServiceImpl.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentIndexServiceImpl.java index 563d7dc..55fed67 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentIndexServiceImpl.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentIndexServiceImpl.java @@ -36,6 +36,7 @@ public class DocumentIndexServiceImpl implements DocumentIndexService { .refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy())) .document(indexDocument)); } catch (IOException | OpenSearchException e) { + clientCache.isClientAliveOrTerminate(); throw IndexException.documentIndexError(indexDocument.getFileId(), e); } } diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentUpdateServiceImpl.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentUpdateServiceImpl.java index c16e3f2..c16c865 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentUpdateServiceImpl.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentUpdateServiceImpl.java @@ -37,6 +37,7 @@ public class DocumentUpdateServiceImpl implements DocumentUpdateService { .doc(indexDocumentUpdate) .refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy())), IndexDocumentUpdate.class); } catch (IOException | OpenSearchException e) { + clientCache.isClientAliveOrTerminate(); throw IndexException.documentUpdateError(fileId, e); } } diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/OpensearchClient.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/OpensearchClient.java index a6f50d4..162867b 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/OpensearchClient.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/OpensearchClient.java @@ -1,24 +1,19 @@ package com.iqser.red.service.search.v1.server.service.opensearch; -import java.util.stream.Collectors; - +import com.knecon.fforesight.tenantcommons.model.SearchConnection; +import lombok.Data; +import lombok.SneakyThrows; +import lombok.experimental.Delegate; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.opensearch.client.RestClient; -import org.opensearch.client.RestClientBuilder; import org.opensearch.client.json.jackson.JacksonJsonpMapper; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.transport.rest_client.RestClientTransport; -import com.knecon.fforesight.tenantcommons.model.SearchConnection; - -import jakarta.annotation.PreDestroy; -import lombok.Data; -import lombok.experimental.Delegate; - @Data @SuppressWarnings("PMD") public class OpensearchClient { @@ -37,11 +32,12 @@ public class OpensearchClient { HttpHost[] httpHost = searchConnection.getHosts() .stream() .map(host -> new HttpHost(host, searchConnection.getPort(), searchConnection.getScheme())) - .collect(Collectors.toList()) + .toList() .toArray(new HttpHost[searchConnection.getHosts().size()]); - RestClientBuilder builder = RestClient.builder(httpHost) - .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(ABSURD_HIGH_TIMEOUT).setSocketTimeout(ABSURD_HIGH_TIMEOUT)); + var builder = RestClient.builder(httpHost) + .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(30_000) + .setSocketTimeout(120_000)); if (searchConnection.getUsername() != null && !searchConnection.getUsername().isEmpty()) { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); @@ -56,10 +52,10 @@ public class OpensearchClient { } - @PreDestroy - public void onShutdown() { + @SneakyThrows + public void terminate() { - client.shutdown(); + client._transport().close(); } } diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/OpensearchClientCache.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/OpensearchClientCache.java index a5a118f..26f99f2 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/OpensearchClientCache.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/OpensearchClientCache.java @@ -38,6 +38,29 @@ public class OpensearchClientCache { private LoadingCache clients; + @SneakyThrows + public void isClientAliveOrTerminate() { + + try { + var client = clients.get(TenantContext.getTenantId()); + try { + + log.info("Checking if client is still alive: {}", client.info()); + } catch (Exception e) { + + try { + client.terminate(); + } catch (Exception e2) { + + log.info("Failed to terminate ES Client"); + clients.invalidate(TenantContext.getTenantId()); + } + } + }catch (Exception e){ + log.error("Failed to terminate/invalide client", e); + } + } + @PostConstruct protected void createCache() { @@ -45,8 +68,12 @@ public class OpensearchClientCache { .maximumSize(maximumSize) .expireAfterAccess(expireAfterAccess, TimeUnit.MINUTES) .removalListener((RemovalListener) removal -> { - removal.getValue().shutdown(); - log.info("Closed elasticsearch client for tenant {}", removal.getKey()); + try { + removal.getValue().terminate(); + log.info("Closed elasticsearch client for tenant {}", removal.getKey()); + } catch (Exception e) { + log.info("Failed to close elasticsearch client for tenant {}", removal.getKey()); + } }) .build(new CacheLoader<>() { public OpensearchClient load(String tenantId) { diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/SearchServiceImpl.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/SearchServiceImpl.java index 8e44bf2..e6684f8 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/SearchServiceImpl.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/SearchServiceImpl.java @@ -110,6 +110,7 @@ public class SearchServiceImpl implements SearchService { try { return clientCache.getClient().search(searchRequest, IndexDocument.class); } catch (IOException e) { + clientCache.isClientAliveOrTerminate(); throw IndexException.searchFailed(e); } } From 18abdedaf877e888b9ba9fd2e59360a643b8f521 Mon Sep 17 00:00:00 2001 From: Timo Bejan Date: Thu, 28 Mar 2024 16:30:13 +0200 Subject: [PATCH 2/2] aligned timeouts with cache expire time --- .../search/v1/server/service/elasticsearch/EsClient.java | 7 +++++-- .../v1/server/service/opensearch/OpensearchClient.java | 7 ++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/EsClient.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/EsClient.java index c28e29c..835dcf5 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/EsClient.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/EsClient.java @@ -24,6 +24,9 @@ import lombok.experimental.Delegate; @SuppressWarnings("PMD") public class EsClient { + // Lower timeouts should be set per request. + private static final int ABSURD_HIGH_TIMEOUT = 600_000; + private SearchConnection searchConnection; @Delegate @@ -39,8 +42,8 @@ public class EsClient { .toArray(new HttpHost[searchConnection.getHosts().size()]); var builder = RestClient.builder(httpHost) - .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(30_000) - .setSocketTimeout(120_000)); + .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(ABSURD_HIGH_TIMEOUT) + .setSocketTimeout(ABSURD_HIGH_TIMEOUT)); if (searchConnection.getUsername() != null && !searchConnection.getUsername().isEmpty()) { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/OpensearchClient.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/OpensearchClient.java index 162867b..81d0fed 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/OpensearchClient.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/OpensearchClient.java @@ -19,7 +19,7 @@ import org.opensearch.client.transport.rest_client.RestClientTransport; public class OpensearchClient { // Lower timeouts should be set per request. - private static final int ABSURD_HIGH_TIMEOUT = 90_000_000; + private static final int ABSURD_HIGH_TIMEOUT = 600_000; private SearchConnection searchConnection; @@ -36,8 +36,9 @@ public class OpensearchClient { .toArray(new HttpHost[searchConnection.getHosts().size()]); var builder = RestClient.builder(httpHost) - .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(30_000) - .setSocketTimeout(120_000)); + .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(0) + .setConnectionRequestTimeout(ABSURD_HIGH_TIMEOUT) + .setSocketTimeout(ABSURD_HIGH_TIMEOUT)); if (searchConnection.getUsername() != null && !searchConnection.getUsername().isEmpty()) { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();