diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentDeleteServiceImpl.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentDeleteServiceImpl.java index 12ee950..a5ae63f 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentDeleteServiceImpl.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentDeleteServiceImpl.java @@ -34,6 +34,7 @@ public class DocumentDeleteServiceImpl implements DocumentDeleteService { try { clientCache.getClient().delete(request); } catch (IOException | ElasticsearchException e) { + clientCache.isClientAliveOrTerminate(); throw IndexException.documentDeleteError(fileId, e); } } diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentIndexServiceImpl.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentIndexServiceImpl.java index 99db581..b46d73b 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentIndexServiceImpl.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentIndexServiceImpl.java @@ -37,6 +37,7 @@ public class DocumentIndexServiceImpl implements DocumentIndexService { .refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy())) .document(indexDocument)); } catch (IOException | ElasticsearchException e) { + clientCache.isClientAliveOrTerminate(); throw IndexException.documentIndexError(indexDocument.getFileId(), e); } } diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentUpdateServiceImpl.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentUpdateServiceImpl.java index bed0165..9702187 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentUpdateServiceImpl.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/DocumentUpdateServiceImpl.java @@ -37,6 +37,7 @@ public class DocumentUpdateServiceImpl implements DocumentUpdateService { .doc(indexDocumentUpdate) .refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy())), IndexDocumentUpdate.class); } catch (IOException | ElasticsearchException e) { + clientCache.isClientAliveOrTerminate(); throw IndexException.documentUpdateError(fileId, e); } } diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/EsClient.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/EsClient.java index 6a73b31..835dcf5 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/EsClient.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/EsClient.java @@ -2,6 +2,7 @@ package com.iqser.red.service.search.v1.server.service.elasticsearch; import java.util.stream.Collectors; +import lombok.SneakyThrows; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; @@ -24,7 +25,7 @@ import lombok.experimental.Delegate; public class EsClient { // Lower timeouts should be set per request. - private static final int ABSURD_HIGH_TIMEOUT = 90_000_000; + private static final int ABSURD_HIGH_TIMEOUT = 600_000; private SearchConnection searchConnection; @@ -37,11 +38,12 @@ public class EsClient { HttpHost[] httpHost = searchConnection.getHosts() .stream() .map(host -> new HttpHost(host, searchConnection.getPort(), searchConnection.getScheme())) - .collect(Collectors.toList()) + .toList() .toArray(new HttpHost[searchConnection.getHosts().size()]); - RestClientBuilder builder = RestClient.builder(httpHost) - .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(ABSURD_HIGH_TIMEOUT).setSocketTimeout(ABSURD_HIGH_TIMEOUT)); + var builder = RestClient.builder(httpHost) + .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(ABSURD_HIGH_TIMEOUT) + .setSocketTimeout(ABSURD_HIGH_TIMEOUT)); if (searchConnection.getUsername() != null && !searchConnection.getUsername().isEmpty()) { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); @@ -55,4 +57,10 @@ public class EsClient { this.elasticsearchClient = new ElasticsearchClient(transport); } + @SneakyThrows + public void terminate() { + + elasticsearchClient._transport().close(); + } + } diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/EsClientCache.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/EsClientCache.java index deb6ae2..210967f 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/EsClientCache.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/EsClientCache.java @@ -38,6 +38,30 @@ public class EsClientCache { private LoadingCache clients; + @SneakyThrows + public void isClientAliveOrTerminate() { + + try { + var client = clients.get(TenantContext.getTenantId()); + try { + + log.info("Checking if client is still alive: {}", client.info()); + } catch (Exception e) { + + try { + client.terminate(); + } catch (Exception e2) { + + log.info("Failed to terminate ES Client"); + clients.invalidate(TenantContext.getTenantId()); + } + } + }catch (Exception e){ + log.error("Failed to terminate/invalide client", e); + } + } + + @PostConstruct protected void createCache() { @@ -45,8 +69,12 @@ public class EsClientCache { .maximumSize(maximumSize) .expireAfterAccess(expireAfterAccess, TimeUnit.MINUTES) .removalListener((RemovalListener) removal -> { - removal.getValue().shutdown(); - log.info("Closed elasticsearch client for tenant {}", removal.getKey()); + try { + removal.getValue().terminate(); + log.info("Closed elasticsearch client for tenant {}", removal.getKey()); + } catch (Exception e) { + log.info("Failed to close elasticsearch client for tenant {}", removal.getKey()); + } }) .build(new CacheLoader<>() { public EsClient load(String tenantId) { diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/SearchServiceImpl.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/SearchServiceImpl.java index bc0235f..5c2f63d 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/SearchServiceImpl.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/elasticsearch/SearchServiceImpl.java @@ -109,6 +109,7 @@ public class SearchServiceImpl implements SearchService { try { return clientCache.getClient().search(searchRequest, IndexDocument.class); } catch (IOException e) { + clientCache.isClientAliveOrTerminate(); throw IndexException.searchFailed(e); } } diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentDeleteServiceImpl.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentDeleteServiceImpl.java index 76b8d34..fd6ab5d 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentDeleteServiceImpl.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentDeleteServiceImpl.java @@ -34,6 +34,7 @@ public class DocumentDeleteServiceImpl implements DocumentDeleteService { try { clientCache.getClient().delete(request); } catch (IOException | OpenSearchException e) { + clientCache.isClientAliveOrTerminate(); throw IndexException.documentDeleteError(fileId, e); } } diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentIndexServiceImpl.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentIndexServiceImpl.java index 563d7dc..55fed67 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentIndexServiceImpl.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentIndexServiceImpl.java @@ -36,6 +36,7 @@ public class DocumentIndexServiceImpl implements DocumentIndexService { .refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy())) .document(indexDocument)); } catch (IOException | OpenSearchException e) { + clientCache.isClientAliveOrTerminate(); throw IndexException.documentIndexError(indexDocument.getFileId(), e); } } diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentUpdateServiceImpl.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentUpdateServiceImpl.java index c16e3f2..c16c865 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentUpdateServiceImpl.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/DocumentUpdateServiceImpl.java @@ -37,6 +37,7 @@ public class DocumentUpdateServiceImpl implements DocumentUpdateService { .doc(indexDocumentUpdate) .refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy())), IndexDocumentUpdate.class); } catch (IOException | OpenSearchException e) { + clientCache.isClientAliveOrTerminate(); throw IndexException.documentUpdateError(fileId, e); } } diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/OpensearchClient.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/OpensearchClient.java index a6f50d4..81d0fed 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/OpensearchClient.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/OpensearchClient.java @@ -1,30 +1,25 @@ package com.iqser.red.service.search.v1.server.service.opensearch; -import java.util.stream.Collectors; - +import com.knecon.fforesight.tenantcommons.model.SearchConnection; +import lombok.Data; +import lombok.SneakyThrows; +import lombok.experimental.Delegate; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.opensearch.client.RestClient; -import org.opensearch.client.RestClientBuilder; import org.opensearch.client.json.jackson.JacksonJsonpMapper; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.transport.rest_client.RestClientTransport; -import com.knecon.fforesight.tenantcommons.model.SearchConnection; - -import jakarta.annotation.PreDestroy; -import lombok.Data; -import lombok.experimental.Delegate; - @Data @SuppressWarnings("PMD") public class OpensearchClient { // Lower timeouts should be set per request. - private static final int ABSURD_HIGH_TIMEOUT = 90_000_000; + private static final int ABSURD_HIGH_TIMEOUT = 600_000; private SearchConnection searchConnection; @@ -37,11 +32,13 @@ public class OpensearchClient { HttpHost[] httpHost = searchConnection.getHosts() .stream() .map(host -> new HttpHost(host, searchConnection.getPort(), searchConnection.getScheme())) - .collect(Collectors.toList()) + .toList() .toArray(new HttpHost[searchConnection.getHosts().size()]); - RestClientBuilder builder = RestClient.builder(httpHost) - .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(ABSURD_HIGH_TIMEOUT).setSocketTimeout(ABSURD_HIGH_TIMEOUT)); + var builder = RestClient.builder(httpHost) + .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(0) + .setConnectionRequestTimeout(ABSURD_HIGH_TIMEOUT) + .setSocketTimeout(ABSURD_HIGH_TIMEOUT)); if (searchConnection.getUsername() != null && !searchConnection.getUsername().isEmpty()) { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); @@ -56,10 +53,10 @@ public class OpensearchClient { } - @PreDestroy - public void onShutdown() { + @SneakyThrows + public void terminate() { - client.shutdown(); + client._transport().close(); } } diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/OpensearchClientCache.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/OpensearchClientCache.java index a5a118f..26f99f2 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/OpensearchClientCache.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/OpensearchClientCache.java @@ -38,6 +38,29 @@ public class OpensearchClientCache { private LoadingCache clients; + @SneakyThrows + public void isClientAliveOrTerminate() { + + try { + var client = clients.get(TenantContext.getTenantId()); + try { + + log.info("Checking if client is still alive: {}", client.info()); + } catch (Exception e) { + + try { + client.terminate(); + } catch (Exception e2) { + + log.info("Failed to terminate ES Client"); + clients.invalidate(TenantContext.getTenantId()); + } + } + }catch (Exception e){ + log.error("Failed to terminate/invalide client", e); + } + } + @PostConstruct protected void createCache() { @@ -45,8 +68,12 @@ public class OpensearchClientCache { .maximumSize(maximumSize) .expireAfterAccess(expireAfterAccess, TimeUnit.MINUTES) .removalListener((RemovalListener) removal -> { - removal.getValue().shutdown(); - log.info("Closed elasticsearch client for tenant {}", removal.getKey()); + try { + removal.getValue().terminate(); + log.info("Closed elasticsearch client for tenant {}", removal.getKey()); + } catch (Exception e) { + log.info("Failed to close elasticsearch client for tenant {}", removal.getKey()); + } }) .build(new CacheLoader<>() { public OpensearchClient load(String tenantId) { diff --git a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/SearchServiceImpl.java b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/SearchServiceImpl.java index 8e44bf2..e6684f8 100644 --- a/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/SearchServiceImpl.java +++ b/search-service-v1/search-service-server-v1/src/main/java/com/iqser/red/service/search/v1/server/service/opensearch/SearchServiceImpl.java @@ -110,6 +110,7 @@ public class SearchServiceImpl implements SearchService { try { return clientCache.getClient().search(searchRequest, IndexDocument.class); } catch (IOException e) { + clientCache.isClientAliveOrTerminate(); throw IndexException.searchFailed(e); } }