Pull request #74: RED-4646: Multitenacy for elasticsearch and opensearch

Merge in RED/search-service from RED-4646 to master

* commit '27c636fb36a2e2e82029aff5c3aca9fb7401d40e':
  RED-4646: Added request interceptor that sends x-tenant-id on each client call
  RED-4646: Removed default tenant
  RED-4646: Run migration for all tenants
  RED-4646: Multitenacy for elasticsearch and opensearch
This commit is contained in:
Dominique Eiflaender 2023-03-09 16:18:48 +01:00
commit b89a04cd3c
34 changed files with 730 additions and 216 deletions

View File

@ -12,10 +12,15 @@
<artifactId>search-service-server-v1</artifactId>
<properties>
<persistence-service.version>1.356.0</persistence-service.version>
<persistence-service.version>1.369.0</persistence-service.version>
</properties>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
<dependency>
<groupId>com.iqser.red.commons</groupId>
<artifactId>storage-commons</artifactId>

View File

@ -3,23 +3,23 @@ package com.iqser.red.service.search.v1.server;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.actuate.autoconfigure.security.servlet.ManagementWebSecurityAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import com.iqser.red.commons.spring.DefaultWebMvcConfiguration;
import com.iqser.red.service.search.v1.server.client.FileStatusClient;
import com.iqser.red.service.search.v1.server.service.opensearch.OpensearchClient;
import com.iqser.red.service.search.v1.server.multitenancy.AsyncConfig;
import com.iqser.red.service.search.v1.server.multitenancy.MultiTenancyMessagingConfiguration;
import com.iqser.red.service.search.v1.server.multitenancy.MultiTenancyWebConfiguration;
import com.iqser.red.service.search.v1.server.settings.ElasticsearchSettings;
import com.iqser.red.service.search.v1.server.settings.SearchServiceSettings;
import io.micrometer.core.aop.TimedAspect;
import io.micrometer.core.instrument.MeterRegistry;
@Import({DefaultWebMvcConfiguration.class})
@Import({MultiTenancyWebConfiguration.class, AsyncConfig.class, MultiTenancyMessagingConfiguration.class})
@EnableFeignClients(basePackageClasses = FileStatusClient.class)
@EnableConfigurationProperties({ElasticsearchSettings.class, SearchServiceSettings.class})
@SpringBootApplication(exclude = {SecurityAutoConfiguration.class, ManagementWebSecurityAutoConfiguration.class})
@ -31,14 +31,6 @@ public class Application {
}
@Bean
@ConditionalOnMissingBean
public OpensearchClient elasticsearchClient(ElasticsearchSettings elasticsearchSettings) {
return new OpensearchClient(elasticsearchSettings);
}
@Bean
public TimedAspect timedAspect(MeterRegistry registry) {

View File

@ -0,0 +1,10 @@
package com.iqser.red.service.search.v1.server.client;
import org.springframework.cloud.openfeign.FeignClient;
import com.iqser.red.service.persistence.service.v1.api.resources.TenantsResource;
@FeignClient(name = "TenantsResource", url = "${persistence-service.url}")
public interface TenantsClient extends TenantsResource {
}

View File

@ -8,6 +8,8 @@ import org.springframework.stereotype.Service;
import com.iqser.red.service.search.v1.model.IndexMessage;
import com.iqser.red.service.search.v1.model.IndexMessageType;
import com.iqser.red.service.search.v1.server.client.TenantsClient;
import com.iqser.red.service.search.v1.server.multitenancy.TenantContext;
import com.iqser.red.service.search.v1.server.queue.IndexingMessageReceiver;
import com.iqser.red.service.search.v1.server.service.IndexInformationService;
import com.iqser.red.service.search.v1.server.settings.SearchServiceSettings;
@ -24,6 +26,7 @@ public class MigrationStarterService {
private final IndexInformationService indexInformationService;
private final IndexingMessageReceiver indexingMessageReceiver;
private final SearchServiceSettings settings;
private final TenantsClient tenantsClient;
@EventListener(ApplicationReadyEvent.class)
@ -31,10 +34,13 @@ public class MigrationStarterService {
// This can only run in post upgrade hook, because otherwise the old service is still runnnig.
if (settings.isMigrateOnly()) {
if (indexInformationService.hasIndexChanged()) {
log.info("Index has changed and will be closed, dropped, recreated and all files will be indexed");
indexingMessageReceiver.receiveIndexingRequest(IndexMessage.builder().messageType(IndexMessageType.DROP).build());
}
tenantsClient.getTenants().forEach(tenant -> {
TenantContext.setTenantId(tenant.getTenantId());
if (indexInformationService.hasIndexChanged()) {
log.info("Index has changed and will be closed, dropped, recreated and all files will be indexed");
indexingMessageReceiver.receiveIndexingRequest(IndexMessage.builder().messageType(IndexMessageType.DROP).build());
}
});
System.exit(SpringApplication.exit(ctx, () -> 0));
}
}

View File

@ -0,0 +1,27 @@
package com.iqser.red.service.search.v1.server.multitenancy;
import java.util.concurrent.Executor;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurerSupport;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class AsyncConfig extends AsyncConfigurerSupport {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(7);
executor.setMaxPoolSize(42);
executor.setQueueCapacity(11);
executor.setThreadNamePrefix("TenantAwareTaskExecutor-");
executor.setTaskDecorator(new TenantAwareTaskDecorator());
executor.initialize();
return executor;
}
}

View File

@ -0,0 +1,105 @@
package com.iqser.red.service.search.v1.server.multitenancy;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.security.spec.KeySpec;
import java.util.Base64;
import javax.annotation.PostConstruct;
import javax.crypto.Cipher;
import javax.crypto.SecretKey;
import javax.crypto.SecretKeyFactory;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.PBEKeySpec;
import javax.crypto.spec.SecretKeySpec;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import lombok.SneakyThrows;
@Service
public class EncryptionDecryptionService {
@Value("${search-service.crypto.key:redaction}")
private String key;
private SecretKey secretKey;
private byte[] iv;
@SneakyThrows
@PostConstruct
protected void postConstruct() {
SecureRandom secureRandom = new SecureRandom();
iv = new byte[12];
secureRandom.nextBytes(iv);
secretKey = generateSecretKey(key, iv);
}
@SneakyThrows
public String encrypt(String strToEncrypt) {
return Base64.getEncoder().encodeToString(encrypt(strToEncrypt.getBytes()));
}
@SneakyThrows
public String decrypt(String strToDecrypt) {
byte[] bytes = Base64.getDecoder().decode(strToDecrypt);
return new String(decrypt(bytes), StandardCharsets.UTF_8);
}
@SneakyThrows
public byte[] encrypt(byte[] data) {
Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
GCMParameterSpec parameterSpec = new GCMParameterSpec(128, iv);
cipher.init(Cipher.ENCRYPT_MODE, secretKey, parameterSpec);
byte[] encryptedData = cipher.doFinal(data);
ByteBuffer byteBuffer = ByteBuffer.allocate(4 + iv.length + encryptedData.length);
byteBuffer.putInt(iv.length);
byteBuffer.put(iv);
byteBuffer.put(encryptedData);
return byteBuffer.array();
}
@SneakyThrows
public byte[] decrypt(byte[] encryptedData) {
ByteBuffer byteBuffer = ByteBuffer.wrap(encryptedData);
int noonceSize = byteBuffer.getInt();
if (noonceSize < 12 || noonceSize >= 16) {
throw new IllegalArgumentException("Nonce size is incorrect. Make sure that the incoming data is an AES encrypted file.");
}
byte[] iv = new byte[noonceSize];
byteBuffer.get(iv);
SecretKey secretKey = generateSecretKey(key, iv);
byte[] cipherBytes = new byte[byteBuffer.remaining()];
byteBuffer.get(cipherBytes);
Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
GCMParameterSpec parameterSpec = new GCMParameterSpec(128, iv);
cipher.init(Cipher.DECRYPT_MODE, secretKey, parameterSpec);
return cipher.doFinal(cipherBytes);
}
@SneakyThrows
public SecretKey generateSecretKey(String password, byte[] iv) {
KeySpec spec = new PBEKeySpec(password.toCharArray(), iv, 65536, 128); // AES-128
SecretKeyFactory secretKeyFactory = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA1");
byte[] key = secretKeyFactory.generateSecret(spec).getEncoded();
return new SecretKeySpec(key, "AES");
}
}

View File

@ -0,0 +1,18 @@
package com.iqser.red.service.search.v1.server.multitenancy;
import org.springframework.stereotype.Component;
import feign.RequestInterceptor;
import feign.RequestTemplate;
@Component
public class ForwardTenantInterceptor implements RequestInterceptor {
public static final String TENANT_HEADER_NAME = "X-TENANT-ID";
@Override
public void apply(RequestTemplate template) {
// do something
template.header(TENANT_HEADER_NAME, TenantContext.getTenantId());
}
}

View File

@ -0,0 +1,49 @@
package com.iqser.red.service.search.v1.server.multitenancy;
import static com.iqser.red.service.search.v1.server.multitenancy.TenantInterceptor.TENANT_HEADER_NAME;
import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MultiTenancyMessagingConfiguration {
@Bean
public static BeanPostProcessor multitenancyBeanPostProcessor() {
return new BeanPostProcessor() {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof RabbitTemplate) {
((RabbitTemplate) bean).setBeforePublishPostProcessors(m -> {
m.getMessageProperties().setHeader(TENANT_HEADER_NAME, TenantContext.getTenantId());
return m;
});
} else if (bean instanceof AbstractRabbitListenerContainerFactory) {
((AbstractRabbitListenerContainerFactory<?>) bean).setAfterReceivePostProcessors(m -> {
String tenant = m.getMessageProperties().getHeader(TENANT_HEADER_NAME);
if (tenant != null) {
TenantContext.setTenantId(tenant);
} else {
throw new RuntimeException("No Tenant is set queue message");
}
return m;
});
}
return bean;
}
};
}
}

View File

@ -0,0 +1,28 @@
package com.iqser.red.service.search.v1.server.multitenancy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import com.iqser.red.commons.spring.DefaultWebMvcConfiguration;
@Configuration
public class MultiTenancyWebConfiguration extends DefaultWebMvcConfiguration {
private final TenantInterceptor tenantInterceptor;
@Autowired
public MultiTenancyWebConfiguration(TenantInterceptor tenantInterceptor) {
this.tenantInterceptor = tenantInterceptor;
}
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addWebRequestInterceptor(tenantInterceptor);
}
}

View File

@ -0,0 +1,23 @@
package com.iqser.red.service.search.v1.server.multitenancy;
import org.springframework.core.task.TaskDecorator;
import org.springframework.lang.NonNull;
public class TenantAwareTaskDecorator implements TaskDecorator {
@Override
@NonNull
public Runnable decorate(@NonNull Runnable runnable) {
String tenantId = TenantContext.getTenantId();
return () -> {
try {
TenantContext.setTenantId(tenantId);
runnable.run();
} finally {
TenantContext.setTenantId(null);
}
};
}
}

View File

@ -0,0 +1,29 @@
package com.iqser.red.service.search.v1.server.multitenancy;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public final class TenantContext {
private static InheritableThreadLocal<String> currentTenant = new InheritableThreadLocal<>();
public static void setTenantId(String tenantId) {
log.debug("Setting tenantId to " + tenantId);
currentTenant.set(tenantId);
}
public static String getTenantId() {
return currentTenant.get();
}
public static void clear() {
currentTenant.remove();
}
}

View File

@ -0,0 +1,35 @@
package com.iqser.red.service.search.v1.server.multitenancy;
import org.springframework.stereotype.Component;
import org.springframework.ui.ModelMap;
import org.springframework.web.context.request.WebRequest;
import org.springframework.web.context.request.WebRequestInterceptor;
@Component
public class TenantInterceptor implements WebRequestInterceptor {
public static final String TENANT_HEADER_NAME = "X-TENANT-ID";
@Override
public void preHandle(WebRequest request) {
if (request.getHeader(TENANT_HEADER_NAME) != null) {
TenantContext.setTenantId(request.getHeader(TENANT_HEADER_NAME));
}
}
@Override
public void postHandle(WebRequest request, ModelMap model) {
TenantContext.clear();
}
@Override
public void afterCompletion(WebRequest request, Exception ex) {
}
}

View File

@ -27,7 +27,6 @@ import com.iqser.red.service.search.v1.server.model.Text;
import com.iqser.red.service.search.v1.server.service.DocumentDeleteService;
import com.iqser.red.service.search.v1.server.service.DocumentIndexService;
import com.iqser.red.service.search.v1.server.service.DocumentUpdateService;
import com.iqser.red.service.search.v1.server.service.IndexCreatorService;
import com.iqser.red.service.search.v1.server.service.IndexDeleteService;
import com.iqser.red.service.search.v1.server.service.IndexDocumentConverterService;
import com.iqser.red.service.search.v1.server.service.IndexInformationService;
@ -51,7 +50,6 @@ public class IndexingMessageReceiver {
private final DocumentDeleteService documentDeleteService;
private final DocumentUpdateService documentUpdateService;
private final IndexCreatorService indexCreatorService;
private final DocumentIndexService documentIndexService;
private final IndexDeleteService indexDeleteService;
private final IndexInformationService indexInformationService;
@ -102,9 +100,7 @@ public class IndexingMessageReceiver {
break;
case DROP:
indexDeleteService.closeIndex();
indexDeleteService.dropIndex();
indexCreatorService.createIndex();
indexDeleteService.recreateIndex();
addAllDocumentsToIndexQueue();
try {
indexInformationService.updateIndexInformation();

View File

@ -1,7 +0,0 @@
package com.iqser.red.service.search.v1.server.service;
public interface IndexCreatorService {
void createIndex();
}

View File

@ -2,6 +2,9 @@ package com.iqser.red.service.search.v1.server.service;
public interface IndexDeleteService {
void recreateIndex();
void closeIndex();

View File

@ -1,13 +1,12 @@
package com.iqser.red.service.search.v1.server.service.elasticsearch;
import static com.iqser.red.service.search.v1.server.service.elasticsearch.IndexCreatorServiceImpl.INDEX_NAME;
import java.io.IOException;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import com.iqser.red.service.search.v1.server.exception.IndexException;
import com.iqser.red.service.search.v1.server.multitenancy.TenantContext;
import com.iqser.red.service.search.v1.server.service.DocumentDeleteService;
import com.iqser.red.service.search.v1.server.settings.ElasticsearchSettings;
@ -21,16 +20,16 @@ import lombok.RequiredArgsConstructor;
@ConditionalOnProperty(prefix = "search", name = "backend", havingValue = "elasticsearch")
public class DocumentDeleteServiceImpl implements DocumentDeleteService {
private final EsClient client;
private final EsClientCache clientCache;
private final ElasticsearchSettings settings;
public void deleteDocument(String fileId) {
DeleteRequest request = new DeleteRequest.Builder().index(INDEX_NAME).id(fileId).refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy())).build();
DeleteRequest request = new DeleteRequest.Builder().index(TenantContext.getTenantId()).id(fileId).refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy())).build();
try {
client.delete(request);
clientCache.getClient().delete(request);
} catch (IOException | ElasticsearchException e) {
throw IndexException.documentDeleteError(fileId, e);
}

View File

@ -1,7 +1,5 @@
package com.iqser.red.service.search.v1.server.service.elasticsearch;
import static com.iqser.red.service.search.v1.server.service.elasticsearch.IndexCreatorServiceImpl.INDEX_NAME;
import java.io.IOException;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@ -9,6 +7,7 @@ import org.springframework.stereotype.Service;
import com.iqser.red.service.search.v1.server.exception.IndexException;
import com.iqser.red.service.search.v1.server.model.IndexDocument;
import com.iqser.red.service.search.v1.server.multitenancy.TenantContext;
import com.iqser.red.service.search.v1.server.service.DocumentIndexService;
import com.iqser.red.service.search.v1.server.settings.ElasticsearchSettings;
@ -24,7 +23,7 @@ import lombok.extern.slf4j.Slf4j;
@ConditionalOnProperty(prefix = "search", name = "backend", havingValue = "elasticsearch")
public class DocumentIndexServiceImpl implements DocumentIndexService {
private final EsClient client;
private final EsClientCache clientCache;
private final ElasticsearchSettings settings;
@ -32,10 +31,11 @@ public class DocumentIndexServiceImpl implements DocumentIndexService {
public void indexDocument(IndexDocument indexDocument) {
try {
client.index(i -> i.index(INDEX_NAME)
.id(indexDocument.getFileId())
.refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy()))
.document(indexDocument));
clientCache.getClient()
.index(i -> i.index(TenantContext.getTenantId())
.id(indexDocument.getFileId())
.refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy()))
.document(indexDocument));
} catch (IOException | ElasticsearchException e) {
throw IndexException.documentIndexError(indexDocument.getFileId(), e);
}

View File

@ -1,7 +1,5 @@
package com.iqser.red.service.search.v1.server.service.elasticsearch;
import static com.iqser.red.service.search.v1.server.service.elasticsearch.IndexCreatorServiceImpl.INDEX_NAME;
import java.io.IOException;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@ -9,6 +7,7 @@ import org.springframework.stereotype.Service;
import com.iqser.red.service.search.v1.server.exception.IndexException;
import com.iqser.red.service.search.v1.server.model.IndexDocumentUpdate;
import com.iqser.red.service.search.v1.server.multitenancy.TenantContext;
import com.iqser.red.service.search.v1.server.service.DocumentUpdateService;
import com.iqser.red.service.search.v1.server.settings.ElasticsearchSettings;
@ -23,7 +22,7 @@ import lombok.SneakyThrows;
@ConditionalOnProperty(prefix = "search", name = "backend", havingValue = "elasticsearch")
public class DocumentUpdateServiceImpl implements DocumentUpdateService {
private final EsClient client;
private final EsClientCache clientCache;
private final ElasticsearchSettings settings;
@ -32,7 +31,9 @@ public class DocumentUpdateServiceImpl implements DocumentUpdateService {
public void updateDocument(String fileId, IndexDocumentUpdate indexDocumentUpdate) {
try {
client.update(u -> u.index(INDEX_NAME).id(fileId).doc(indexDocumentUpdate).refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy())), IndexDocumentUpdate.class);
clientCache.getClient()
.update(u -> u.index(TenantContext.getTenantId()).id(fileId).doc(indexDocumentUpdate).refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy())),
IndexDocumentUpdate.class);
} catch (IOException | ElasticsearchException e) {
throw IndexException.documentUpdateError(fileId, e);
}

View File

@ -2,9 +2,6 @@ package com.iqser.red.service.search.v1.server.service.elasticsearch;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
@ -12,62 +9,49 @@ import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import com.iqser.red.service.search.v1.server.settings.ElasticsearchSettings;
import com.iqser.red.service.persistence.service.v1.api.model.multitenancy.SearchConnection;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import lombok.RequiredArgsConstructor;
import lombok.Data;
import lombok.experimental.Delegate;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "search", name = "backend", havingValue = "elasticsearch")
@Data
public class EsClient {
// Lower timeouts should be set per request.
private static final int ABSURD_HIGH_TIMEOUT = 90_000_000;
private final ElasticsearchSettings settings;
private SearchConnection searchConnection;
@Delegate
private co.elastic.clients.elasticsearch.ElasticsearchClient client;
private ElasticsearchClient elasticsearchClient;
@PostConstruct
public void init() {
public EsClient(SearchConnection searchConnection) {
HttpHost[] httpHost = settings.getHosts()
HttpHost[] httpHost = searchConnection.getHosts()
.stream()
.map(host -> new HttpHost(host, settings.getPort(), settings.getScheme()))
.map(host -> new HttpHost(host, searchConnection.getPort(), searchConnection.getScheme()))
.collect(Collectors.toList())
.toArray(new HttpHost[settings.getHosts().size()]);
.toArray(new HttpHost[searchConnection.getHosts().size()]);
RestClientBuilder builder = RestClient.builder(httpHost)
.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(ABSURD_HIGH_TIMEOUT).setSocketTimeout(ABSURD_HIGH_TIMEOUT));
if (settings.getUsername() != null && !settings.getUsername().isEmpty()) {
if (searchConnection.getUsername() != null && !searchConnection.getUsername().isEmpty()) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(settings.getUsername(), settings.getPassword()));
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(searchConnection.getUsername(), searchConnection.getPassword()));
builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
ElasticsearchTransport transport = new RestClientTransport(builder.build(), new JacksonJsonpMapper());
this.client = new co.elastic.clients.elasticsearch.ElasticsearchClient(transport);
this.searchConnection = searchConnection;
this.elasticsearchClient = new ElasticsearchClient(transport);
}
@PreDestroy
public void onShutdown() {
client.shutdown();
}
}
}

View File

@ -0,0 +1,86 @@
package com.iqser.red.service.search.v1.server.service.elasticsearch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.iqser.red.service.search.v1.server.client.TenantsClient;
import com.iqser.red.service.search.v1.server.multitenancy.EncryptionDecryptionService;
import com.iqser.red.service.search.v1.server.multitenancy.TenantContext;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "search", name = "backend", havingValue = "elasticsearch")
public class EsClientCache {
private final TenantsClient tenantsClient;
private final EncryptionDecryptionService encryptionDecryptionService;
private final IndexCreatorServiceImpl indexCreatorService;
@Value("${multitenancy.client-cache.maximumSize:100}")
private Long maximumSize;
@Value("${multitenancy.client-cache.expireAfterAccess:10}")
private Integer expireAfterAccess;
private LoadingCache<String, EsClient> clients;
@PostConstruct
protected void createCache() {
clients = CacheBuilder.newBuilder()
.maximumSize(maximumSize)
.expireAfterAccess(expireAfterAccess, TimeUnit.MINUTES)
.removalListener((RemovalListener<String, EsClient>) removal -> {
var elasticsearchClient = removal.getValue();
elasticsearchClient.shutdown();
log.info("Closed elasticsearch client for tenant {}", removal.getKey());
})
.build(new CacheLoader<>() {
public EsClient load(String key) {
var tenant = tenantsClient.getTenant(key);
// Do not create new client if client with equal hosts is already available.
var hostsAsString = tenant.getSearchConnection().getHosts().stream().collect(Collectors.joining());
for (var client : clients.asMap().values()) {
if (client.getSearchConnection().getHosts().stream().collect(Collectors.joining()).equals(hostsAsString)) {
indexCreatorService.createIndex(client);
return client;
}
}
if (tenant.getSearchConnection().getPassword() != null) {
tenant.getSearchConnection().setPassword(encryptionDecryptionService.decrypt(tenant.getSearchConnection().getPassword()));
}
var client = new EsClient(tenant.getSearchConnection());
log.info("Initialized elasticsearch client for tenant {}", key);
indexCreatorService.createIndex(client);
return client;
}
});
}
@SneakyThrows
public EsClient getClient() {
return clients.get(TenantContext.getTenantId());
}
}

View File

@ -9,52 +9,42 @@ import org.springframework.core.io.ResourceLoader;
import org.springframework.stereotype.Service;
import com.iqser.red.service.search.v1.server.exception.IndexException;
import com.iqser.red.service.search.v1.server.service.IndexCreatorService;
import com.iqser.red.service.search.v1.server.multitenancy.TenantContext;
import com.iqser.red.service.search.v1.server.settings.ElasticsearchSettings;
import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
import co.elastic.clients.elasticsearch.indices.IndexSettings;
import co.elastic.clients.elasticsearch.indices.MappingLimitSettingsNestedObjects;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "search", name = "backend", havingValue = "elasticsearch")
public class IndexCreatorServiceImpl implements IndexCreatorService {
public class IndexCreatorServiceImpl {
public static final String INDEX_NAME = "redaction";
private final EsClient client;
private final ElasticsearchSettings settings;
public IndexCreatorServiceImpl(EsClient client, ElasticsearchSettings settings) {
public void createIndex(EsClient esClient) {
this.client = client;
this.settings = settings;
if (!indexExists()) {
createIndex();
if (!indexExists(esClient)) {
try {
var response = esClient.indices().create(i -> i.index(TenantContext.getTenantId()).settings(createIndexSettings(esClient)).mappings(createIndexMapping()));
log.info("Successfully created index: {}", response.index());
} catch (IOException e) {
log.error("Failed to create index.", e);
}
}
}
public void createIndex() {
private boolean indexExists(EsClient esClient) {
try {
var response = client.indices().create(i -> i.index(INDEX_NAME).settings(createIndexSettings()).mappings(createIndexMapping()));
log.info("Successfully created index: {}", response.index());
} catch (IOException e) {
log.error("Failed to create index.", e);
}
}
private boolean indexExists() {
try {
var response = client.indices().exists(i -> i.index(INDEX_NAME));
var response = esClient.indices().exists(i -> i.index(TenantContext.getTenantId()));
return response.value();
} catch (IOException e) {
throw IndexException.indexExists(e);
@ -74,14 +64,14 @@ public class IndexCreatorServiceImpl implements IndexCreatorService {
@SneakyThrows
private IndexSettings createIndexSettings() {
private IndexSettings createIndexSettings(EsClient esClient) {
URL resource = ResourceLoader.class.getClassLoader().getResource("index/settings.json");
try (InputStream is = resource.openStream()) {
return new IndexSettings.Builder().withJson(is)
.numberOfShards(settings.getNumberOfShards())
.numberOfReplicas(settings.getNumberOfReplicas())
.numberOfShards(esClient.getSearchConnection().getNumberOfShards())
.numberOfReplicas(esClient.getSearchConnection().getNumberOfReplicas())
.mapping(m -> m.nestedObjects(MappingLimitSettingsNestedObjects.of(a -> a.limit(settings.getNumberOfNestedObjectLimit()))))
.build();
}

View File

@ -1,11 +1,10 @@
package com.iqser.red.service.search.v1.server.service.elasticsearch;
import static com.iqser.red.service.search.v1.server.service.elasticsearch.IndexCreatorServiceImpl.INDEX_NAME;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import com.iqser.red.service.search.v1.server.exception.IndexException;
import com.iqser.red.service.search.v1.server.multitenancy.TenantContext;
import com.iqser.red.service.search.v1.server.service.IndexDeleteService;
import lombok.RequiredArgsConstructor;
@ -18,13 +17,22 @@ import lombok.extern.slf4j.Slf4j;
@ConditionalOnProperty(prefix = "search", name = "backend", havingValue = "elasticsearch")
public class IndexDeleteServiceImpl implements IndexDeleteService {
private final EsClient client;
private final EsClientCache clientCache;
private final IndexCreatorServiceImpl indexCreatorService;
public void recreateIndex() {
closeIndex();
dropIndex();
indexCreatorService.createIndex(clientCache.getClient());
}
@SneakyThrows
public void closeIndex() {
var closeIndexResponse = client.indices().close(i -> i.index(INDEX_NAME).timeout(t -> t.time("2m")));
var closeIndexResponse = clientCache.getClient().indices().close(i -> i.index(TenantContext.getTenantId()).timeout(t -> t.time("2m")));
if (closeIndexResponse.acknowledged()) {
log.info("Index is closed");
} else {
@ -37,7 +45,7 @@ public class IndexDeleteServiceImpl implements IndexDeleteService {
public void dropIndex() {
log.info("Will drop index");
var deleteIndexResponse = client.indices().delete(i -> i.index(INDEX_NAME).timeout(t -> t.time("2m")));
var deleteIndexResponse = clientCache.getClient().indices().delete(i -> i.index(TenantContext.getTenantId()).timeout(t -> t.time("2m")));
if (deleteIndexResponse.acknowledged()) {
log.info("Index is dropped");

View File

@ -46,7 +46,7 @@ import lombok.extern.slf4j.Slf4j;
@ConditionalOnProperty(prefix = "search", name = "backend", havingValue = "elasticsearch")
public class SearchServiceImpl implements SearchService {
private final EsClient client;
private final EsClientCache clientCache;
@Timed("redactmanager_search")
@ -105,7 +105,7 @@ public class SearchServiceImpl implements SearchService {
protected SearchResponse<IndexDocument> execute(SearchRequest searchRequest) {
try {
return client.search(searchRequest, IndexDocument.class);
return clientCache.getClient().search(searchRequest, IndexDocument.class);
} catch (IOException e) {
throw IndexException.searchFailed(e);
}
@ -306,7 +306,7 @@ public class SearchServiceImpl implements SearchService {
var pages = IntStream.range(0, jsonArray.size()).mapToObj(i -> jsonArray.getInt(i)).collect(Collectors.toSet());
return MatchedSection.builder()
.headline(indexSection.get("headline") != null ? indexSection.getString("headline"): null)
.headline(indexSection.get("headline") != null ? indexSection.getString("headline") : null)
.sectionNumber(indexSection.getInt("sectionNumber"))
.pages(pages)
.matchedTerms(hit.matchedQueries().stream().collect(Collectors.toSet()))

View File

@ -1,7 +1,5 @@
package com.iqser.red.service.search.v1.server.service.opensearch;
import static com.iqser.red.service.search.v1.server.service.opensearch.IndexCreatorServiceImpl.INDEX_NAME;
import java.io.IOException;
import org.opensearch.client.opensearch._types.OpenSearchException;
@ -11,6 +9,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import com.iqser.red.service.search.v1.server.exception.IndexException;
import com.iqser.red.service.search.v1.server.multitenancy.TenantContext;
import com.iqser.red.service.search.v1.server.service.DocumentDeleteService;
import com.iqser.red.service.search.v1.server.settings.ElasticsearchSettings;
@ -21,16 +20,16 @@ import lombok.RequiredArgsConstructor;
@ConditionalOnProperty(prefix = "search", name = "backend", havingValue = "opensearch")
public class DocumentDeleteServiceImpl implements DocumentDeleteService {
private final OpensearchClient client;
private final OpensearchClientCache clientCache;
private final ElasticsearchSettings settings;
public void deleteDocument(String fileId) {
DeleteRequest request = new DeleteRequest.Builder().index(INDEX_NAME).id(fileId).refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy())).build();
DeleteRequest request = new DeleteRequest.Builder().index(TenantContext.getTenantId()).id(fileId).refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy())).build();
try {
client.delete(request);
clientCache.getClient().delete(request);
} catch (IOException | OpenSearchException e) {
throw IndexException.documentDeleteError(fileId, e);
}

View File

@ -1,7 +1,5 @@
package com.iqser.red.service.search.v1.server.service.opensearch;
import static com.iqser.red.service.search.v1.server.service.opensearch.IndexCreatorServiceImpl.INDEX_NAME;
import java.io.IOException;
import org.opensearch.client.opensearch._types.OpenSearchException;
@ -11,6 +9,7 @@ import org.springframework.stereotype.Service;
import com.iqser.red.service.search.v1.server.exception.IndexException;
import com.iqser.red.service.search.v1.server.model.IndexDocument;
import com.iqser.red.service.search.v1.server.multitenancy.TenantContext;
import com.iqser.red.service.search.v1.server.service.DocumentIndexService;
import com.iqser.red.service.search.v1.server.settings.ElasticsearchSettings;
@ -24,7 +23,7 @@ import lombok.extern.slf4j.Slf4j;
@ConditionalOnProperty(prefix = "search", name = "backend", havingValue = "opensearch")
public class DocumentIndexServiceImpl implements DocumentIndexService {
private final OpensearchClient client;
private final OpensearchClientCache clientCache;
private final ElasticsearchSettings settings;
@ -32,7 +31,10 @@ public class DocumentIndexServiceImpl implements DocumentIndexService {
public void indexDocument(IndexDocument indexDocument) {
try {
client.index(i -> i.index(INDEX_NAME).id(indexDocument.getFileId()).refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy())).document(indexDocument));
clientCache.getClient().index(i -> i.index(TenantContext.getTenantId())
.id(indexDocument.getFileId())
.refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy()))
.document(indexDocument));
} catch (IOException | OpenSearchException e) {
throw IndexException.documentIndexError(indexDocument.getFileId(), e);
}

View File

@ -1,7 +1,5 @@
package com.iqser.red.service.search.v1.server.service.opensearch;
import static com.iqser.red.service.search.v1.server.service.opensearch.IndexCreatorServiceImpl.INDEX_NAME;
import java.io.IOException;
import org.opensearch.client.opensearch._types.OpenSearchException;
@ -11,6 +9,7 @@ import org.springframework.stereotype.Service;
import com.iqser.red.service.search.v1.server.exception.IndexException;
import com.iqser.red.service.search.v1.server.model.IndexDocumentUpdate;
import com.iqser.red.service.search.v1.server.multitenancy.TenantContext;
import com.iqser.red.service.search.v1.server.service.DocumentUpdateService;
import com.iqser.red.service.search.v1.server.settings.ElasticsearchSettings;
@ -23,7 +22,7 @@ import lombok.SneakyThrows;
@ConditionalOnProperty(prefix = "search", name = "backend", havingValue = "opensearch")
public class DocumentUpdateServiceImpl implements DocumentUpdateService {
private final OpensearchClient client;
private final OpensearchClientCache clientCache;
private final ElasticsearchSettings settings;
@ -32,8 +31,9 @@ public class DocumentUpdateServiceImpl implements DocumentUpdateService {
public void updateDocument(String fileId, IndexDocumentUpdate indexDocumentUpdate) {
try {
client.update(u -> u.index(INDEX_NAME).id(fileId).doc(indexDocumentUpdate).refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy())),
IndexDocumentUpdate.class);
clientCache.getClient()
.update(u -> u.index(TenantContext.getTenantId()).id(fileId).doc(indexDocumentUpdate).refresh(Refresh._DESERIALIZER.parse(settings.getRefreshPolicy())),
IndexDocumentUpdate.class);
} catch (IOException | OpenSearchException e) {
throw IndexException.documentUpdateError(fileId, e);
}

View File

@ -12,50 +12,41 @@ import org.springframework.core.io.ResourceLoader;
import org.springframework.stereotype.Service;
import com.iqser.red.service.search.v1.server.exception.IndexException;
import com.iqser.red.service.search.v1.server.service.IndexCreatorService;
import com.iqser.red.service.search.v1.server.multitenancy.TenantContext;
import com.iqser.red.service.search.v1.server.settings.ElasticsearchSettings;
import jakarta.json.stream.JsonParser;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "search", name = "backend", havingValue = "opensearch")
public class IndexCreatorServiceImpl implements IndexCreatorService {
public class IndexCreatorServiceImpl {
public static final String INDEX_NAME = "redaction";
private final OpensearchClient client;
private final ElasticsearchSettings settings;
public IndexCreatorServiceImpl(OpensearchClient client, ElasticsearchSettings settings) {
public void createIndex(OpensearchClient client) {
this.client = client;
this.settings = settings;
if (!indexExists(client)) {
if (!indexExists()) {
createIndex();
try {
var response = client.indices().create(i -> i.index(TenantContext.getTenantId()).settings(createIndexSettings(client)).mappings(createIndexMapping(client)));
log.info("Successfully created index: {}", response.index());
} catch (IOException e) {
log.error("Failed to create index.", e);
}
}
}
public void createIndex() {
private boolean indexExists(OpensearchClient client) {
try {
var response = client.indices().create(i -> i.index(INDEX_NAME).settings(createIndexSettings()).mappings(createIndexMapping()));
log.info("Successfully created index: {}", response.index());
} catch (IOException e) {
log.error("Failed to create index.", e);
}
}
private boolean indexExists() {
try {
var response = client.indices().exists(i -> i.index(INDEX_NAME));
var response = client.indices().exists(i -> i.index(TenantContext.getTenantId()));
return response.value();
} catch (IOException e) {
throw IndexException.indexExists(e);
@ -64,7 +55,7 @@ public class IndexCreatorServiceImpl implements IndexCreatorService {
@SneakyThrows
private TypeMapping createIndexMapping() {
private TypeMapping createIndexMapping(OpensearchClient client) {
URL resource = ResourceLoader.class.getClassLoader().getResource("index/mapping.json");
@ -79,7 +70,7 @@ public class IndexCreatorServiceImpl implements IndexCreatorService {
@SneakyThrows
private IndexSettings createIndexSettings() {
private IndexSettings createIndexSettings(OpensearchClient client) {
URL resource = ResourceLoader.class.getClassLoader().getResource("index/settings.json");
@ -94,8 +85,8 @@ public class IndexCreatorServiceImpl implements IndexCreatorService {
// Hopefully they don't hava a limit for this, I was not able to find anything.
// As elasticsearch has a limit for this, and we can't set it, it seems this is the only reason for now to have both clients.
var indexSettings = new IndexSettings.Builder().index(indexSettingsFromJson.index())
.numberOfReplicas(settings.getNumberOfReplicas())
.numberOfShards(settings.getNumberOfShards())
.numberOfReplicas(client.getSearchConnection().getNumberOfReplicas())
.numberOfShards(client.getSearchConnection().getNumberOfShards())
.analysis(indexSettingsFromJson.analysis())
.build();

View File

@ -1,11 +1,10 @@
package com.iqser.red.service.search.v1.server.service.opensearch;
import static com.iqser.red.service.search.v1.server.service.opensearch.IndexCreatorServiceImpl.INDEX_NAME;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import com.iqser.red.service.search.v1.server.exception.IndexException;
import com.iqser.red.service.search.v1.server.multitenancy.TenantContext;
import com.iqser.red.service.search.v1.server.service.IndexDeleteService;
import lombok.RequiredArgsConstructor;
@ -18,13 +17,22 @@ import lombok.extern.slf4j.Slf4j;
@ConditionalOnProperty(prefix = "search", name = "backend", havingValue = "opensearch")
public class IndexDeleteServiceImpl implements IndexDeleteService {
private final OpensearchClient client;
private final OpensearchClientCache clientCache;
private final IndexCreatorServiceImpl indexCreatorService;
public void recreateIndex() {
closeIndex();
dropIndex();
indexCreatorService.createIndex(clientCache.getClient());
}
@SneakyThrows
public void closeIndex() {
var closeIndexResponse = client.indices().close(i -> i.index(INDEX_NAME).timeout(t -> t.time("2m")));
var closeIndexResponse = clientCache.getClient().indices().close(i -> i.index(TenantContext.getTenantId()).timeout(t -> t.time("2m")));
if (closeIndexResponse.acknowledged()) {
log.info("Index is closed");
} else {
@ -37,7 +45,7 @@ public class IndexDeleteServiceImpl implements IndexDeleteService {
public void dropIndex() {
log.info("Will drop index");
var deleteIndexResponse = client.indices().delete(i -> i.index(INDEX_NAME).timeout(t -> t.time("2m")));
var deleteIndexResponse = clientCache.getClient().indices().delete(i -> i.index(TenantContext.getTenantId()).timeout(t -> t.time("2m")));
if (deleteIndexResponse.acknowledged()) {
log.info("Index is dropped");

View File

@ -2,7 +2,6 @@ package com.iqser.red.service.search.v1.server.service.opensearch;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.http.HttpHost;
@ -15,52 +14,45 @@ import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.transport.rest_client.RestClientTransport;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import com.iqser.red.service.search.v1.server.settings.ElasticsearchSettings;
import com.iqser.red.service.persistence.service.v1.api.model.multitenancy.SearchConnection;
import lombok.RequiredArgsConstructor;
import lombok.Data;
import lombok.experimental.Delegate;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "search", name = "backend", havingValue = "opensearch")
@Data
public class OpensearchClient {
// Lower timeouts should be set per request.
private static final int ABSURD_HIGH_TIMEOUT = 90_000_000;
private final ElasticsearchSettings settings;
private SearchConnection searchConnection;
@Delegate
private OpenSearchClient client;
@PostConstruct
public void init() {
public OpensearchClient(SearchConnection searchConnection) {
HttpHost[] httpHost = settings.getHosts()
HttpHost[] httpHost = searchConnection.getHosts()
.stream()
.map(host -> new HttpHost(host, settings.getPort(), settings.getScheme()))
.map(host -> new HttpHost(host, searchConnection.getPort(), searchConnection.getScheme()))
.collect(Collectors.toList())
.toArray(new HttpHost[settings.getHosts().size()]);
.toArray(new HttpHost[searchConnection.getHosts().size()]);
RestClientBuilder builder = RestClient.builder(httpHost)
.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(ABSURD_HIGH_TIMEOUT).setSocketTimeout(ABSURD_HIGH_TIMEOUT));
if (settings.getUsername() != null && !settings.getUsername().isEmpty()) {
if (searchConnection.getUsername() != null && !searchConnection.getUsername().isEmpty()) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(settings.getUsername(), settings.getPassword()));
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(searchConnection.getUsername(), searchConnection.getPassword()));
builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
var transport = new RestClientTransport(builder.build(), new JacksonJsonpMapper());
this.searchConnection = searchConnection;
this.client = new OpenSearchClient(transport);
}

View File

@ -0,0 +1,86 @@
package com.iqser.red.service.search.v1.server.service.opensearch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.iqser.red.service.search.v1.server.client.TenantsClient;
import com.iqser.red.service.search.v1.server.multitenancy.EncryptionDecryptionService;
import com.iqser.red.service.search.v1.server.multitenancy.TenantContext;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "search", name = "backend", havingValue = "opensearch")
public class OpensearchClientCache {
private final TenantsClient tenantsClient;
private final EncryptionDecryptionService encryptionDecryptionService;
private final IndexCreatorServiceImpl indexCreatorService;
@Value("${multitenancy.client-cache.maximumSize:100}")
private Long maximumSize;
@Value("${multitenancy.client-cache.expireAfterAccess:10}")
private Integer expireAfterAccess;
private LoadingCache<String, OpensearchClient> clients;
@PostConstruct
protected void createCache() {
clients = CacheBuilder.newBuilder()
.maximumSize(maximumSize)
.expireAfterAccess(expireAfterAccess, TimeUnit.MINUTES)
.removalListener((RemovalListener<String, OpensearchClient>) removal -> {
var client = removal.getValue();
client.shutdown();
log.info("Closed opensearch client for tenant {}", removal.getKey());
})
.build(new CacheLoader<>() {
public OpensearchClient load(String key) {
var tenant = tenantsClient.getTenant(key);
// Do not create new client if client with equal hosts is already available.
var hostsAsString = tenant.getSearchConnection().getHosts().stream().collect(Collectors.joining());
for (var client : clients.asMap().values()) {
if (client.getSearchConnection().getHosts().stream().collect(Collectors.joining()).equals(hostsAsString)) {
indexCreatorService.createIndex(client);
return client;
}
}
if (tenant.getSearchConnection().getPassword() != null) {
tenant.getSearchConnection().setPassword(encryptionDecryptionService.decrypt(tenant.getSearchConnection().getPassword()));
}
var client = new OpensearchClient(tenant.getSearchConnection());
log.info("Initialized elasticsearch client for tenant {}", key);
indexCreatorService.createIndex(client);
return client;
}
});
}
@SneakyThrows
public OpensearchClient getClient() {
return clients.get(TenantContext.getTenantId());
}
}

View File

@ -47,7 +47,7 @@ import lombok.extern.slf4j.Slf4j;
@ConditionalOnProperty(prefix = "search", name = "backend", havingValue = "opensearch")
public class SearchServiceImpl implements SearchService {
private final OpensearchClient client;
private final OpensearchClientCache clientCache;
@Timed("redactmanager_search")
@ -106,7 +106,7 @@ public class SearchServiceImpl implements SearchService {
protected SearchResponse<IndexDocument> execute(SearchRequest searchRequest) {
try {
return client.search(searchRequest, IndexDocument.class);
return clientCache.getClient().search(searchRequest, IndexDocument.class);
} catch (IOException e) {
throw IndexException.searchFailed(e);
}
@ -131,9 +131,12 @@ public class SearchServiceImpl implements SearchService {
var textPhraseQuery = QueryBuilders.matchPhrase().field("sections.text").query(must.toLowerCase(Locale.ROOT)).queryName(must).build()._toQuery();
var filenamePhraseQuery = QueryBuilders.matchPhrasePrefix().field("filename").query(must.toLowerCase(Locale.ROOT)).queryName("filename." + must).build()._toQuery();
var fileAttributesPhraseQuery = QueryBuilders.matchPhrase().field("fileAttributes.value")
var fileAttributesPhraseQuery = QueryBuilders.matchPhrase()
.field("fileAttributes.value")
.query(must.toLowerCase(Locale.ROOT))
.queryName("fileAttributes." + must).build()._toQuery();
.queryName("fileAttributes." + must)
.build()
._toQuery();
var filenameOrTextMustQuery = QueryBuilders.bool().should(textPhraseQuery).should(filenamePhraseQuery).should(fileAttributesPhraseQuery).build()._toQuery();
entireQuery.must(filenameOrTextMustQuery);
@ -143,9 +146,12 @@ public class SearchServiceImpl implements SearchService {
var textTermQuery = QueryBuilders.matchPhrase().field("sections.text").query(should.toLowerCase(Locale.ROOT)).queryName(should).build()._toQuery();
var filenameTermQuery = QueryBuilders.matchPhrasePrefix().field("filename").query(should.toLowerCase(Locale.ROOT)).queryName("filename." + should).build()._toQuery();
var fileAttributesPhraseQuery = QueryBuilders.matchPhrase().field("fileAttributes.value")
var fileAttributesPhraseQuery = QueryBuilders.matchPhrase()
.field("fileAttributes.value")
.query(should.toLowerCase(Locale.ROOT))
.queryName("fileAttributes." + should).build()._toQuery();
.queryName("fileAttributes." + should)
.build()
._toQuery();
entireQuery.should(textTermQuery);
entireQuery.should(filenameTermQuery);
entireQuery.should(fileAttributesPhraseQuery);
@ -153,11 +159,14 @@ public class SearchServiceImpl implements SearchService {
}
if (returnSections) {
var nestedQuery = QueryBuilders.nested().scoreMode(ChildScoreMode.Avg)
var nestedQuery = QueryBuilders.nested()
.scoreMode(ChildScoreMode.Avg)
.queryName("sections")
.query(sectionsQueries.build()._toQuery())
.path("sections")
.innerHits(i -> i.size(100)).build()._toQuery();
.innerHits(i -> i.size(100))
.build()
._toQuery();
entireQuery.should(nestedQuery);
}
@ -169,7 +178,11 @@ public class SearchServiceImpl implements SearchService {
for (var dossierTemplateId : dossierTemplateIds) {
if (StringUtils.isNotEmpty(dossierTemplateId)) {
dossierTemplateIdQueryBuilder = dossierTemplateIdQueryBuilder.should(QueryBuilders.match().field("dossierTemplateId").query(q -> q.stringValue(dossierTemplateId)).build()._toQuery());
dossierTemplateIdQueryBuilder = dossierTemplateIdQueryBuilder.should(QueryBuilders.match()
.field("dossierTemplateId")
.query(q -> q.stringValue(dossierTemplateId))
.build()
._toQuery());
}
}
@ -198,15 +211,21 @@ public class SearchServiceImpl implements SearchService {
}
if (includeArchivedDossiers) {
filterQuery.must(QueryBuilders.terms().field("dossierArchived")
.terms(t -> t.value(List.of(new FieldValue.Builder().booleanValue(true).build(), new FieldValue.Builder().booleanValue(false).build()))).build()._toQuery());
filterQuery.must(QueryBuilders.terms()
.field("dossierArchived")
.terms(t -> t.value(List.of(new FieldValue.Builder().booleanValue(true).build(), new FieldValue.Builder().booleanValue(false).build())))
.build()
._toQuery());
} else {
filterQuery.must(QueryBuilders.terms().field("dossierArchived").terms(t -> t.value(List.of(new FieldValue.Builder().booleanValue(false).build()))).build()._toQuery());
}
if (includeDeletedDossiers) {
filterQuery.must(QueryBuilders.terms().field("dossierDeleted")
.terms(t -> t.value(List.of(new FieldValue.Builder().booleanValue(true).build(), new FieldValue.Builder().booleanValue(false).build()))).build()._toQuery());
filterQuery.must(QueryBuilders.terms()
.field("dossierDeleted")
.terms(t -> t.value(List.of(new FieldValue.Builder().booleanValue(true).build(), new FieldValue.Builder().booleanValue(false).build())))
.build()
._toQuery());
} else {
filterQuery.must(QueryBuilders.terms().field("dossierDeleted").terms(t -> t.value(List.of(new FieldValue.Builder().booleanValue(false).build()))).build()._toQuery());
}
@ -241,7 +260,7 @@ public class SearchServiceImpl implements SearchService {
return SearchResult.builder()
.matchedDocuments(hits.stream().map(hit -> convertSearchHit((Hit) hit, query)).collect(Collectors.toList()))
.maxScore(response.maxScore() == null ? 0 :response.maxScore().floatValue())
.maxScore(response.maxScore() == null ? 0 : response.maxScore().floatValue())
.total(response.hits().total().value())
.build();
}
@ -307,7 +326,7 @@ public class SearchServiceImpl implements SearchService {
var pages = IntStream.range(0, jsonArray.size()).mapToObj(i -> jsonArray.getInt(i)).collect(Collectors.toSet());
return MatchedSection.builder()
.headline(indexSection.get("headline") != null ? indexSection.getString("headline"): null)
.headline(indexSection.get("headline") != null ? indexSection.getString("headline") : null)
.sectionNumber(indexSection.getInt("sectionNumber"))
.pages(pages)
.matchedTerms(hit.matchedQueries().stream().collect(Collectors.toSet()))

View File

@ -1,8 +1,5 @@
package com.iqser.red.service.search.v1.server.settings;
import java.util.ArrayList;
import java.util.List;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@ -15,19 +12,6 @@ import lombok.Data;
@ConfigurationProperties("elasticsearch")
public class ElasticsearchSettings {
private List<String> hosts = new ArrayList<>();
private int port = 9300;
private String scheme = "http";
private String apiKeyAuth;
private String username;
private String password;
private String numberOfShards = "5";
private String numberOfReplicas = "1";
private int numberOfNestedObjectLimit = 100000;
/**

View File

@ -1,11 +1,16 @@
package com.iqser.red.service.search.v1.server.service;
import static org.mockito.Mockito.when;
import java.util.Set;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
@ -19,7 +24,11 @@ import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.utility.DockerImageName;
import com.iqser.red.service.persistence.service.v1.api.model.multitenancy.SearchConnection;
import com.iqser.red.service.persistence.service.v1.api.model.multitenancy.TenantResponse;
import com.iqser.red.service.search.v1.server.Application;
import com.iqser.red.service.search.v1.server.client.TenantsClient;
import com.iqser.red.service.search.v1.server.multitenancy.TenantContext;
import com.iqser.red.storage.commons.StorageAutoConfiguration;
import com.iqser.red.storage.commons.service.StorageService;
@ -34,6 +43,22 @@ public abstract class AbstractElasticsearchIntegrationTest {
public static final String WAIT_FOR_WRITE_REQUESTS = "elasticsearch.refreshPolicy=wait_for";
public static final String SEARCH_BACKEND = "search.backend=elasticsearch";
@MockBean
private TenantsClient tenantsClient;
private static int port;
@BeforeEach
public void setupOptimize() {
TenantContext.setTenantId("redaction");
when(tenantsClient.getTenant("redaction")).thenReturn(TenantResponse.builder()
.searchConnection(SearchConnection.builder().hosts(Set.of("localhost")).port(port).scheme("http").numberOfShards("1").numberOfReplicas("5").build())
.build());
}
static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
@ -42,10 +67,8 @@ public abstract class AbstractElasticsearchIntegrationTest {
esContainer.getEnvMap().put("xpack.security.enabled", "false");
esContainer.start();
String esHost = esContainer.getHttpHostAddress();
int port = Integer.parseInt(esHost.substring(esHost.lastIndexOf(':') + 1));
TestPropertyValues.of("elasticsearch.port=" + port).applyTo(configurableApplicationContext.getEnvironment());
var esHost = esContainer.getHttpHostAddress();
port = Integer.parseInt(esHost.substring(esHost.lastIndexOf(':') + 1));
}
}

View File

@ -1,12 +1,17 @@
package com.iqser.red.service.search.v1.server.service;
import static org.mockito.Mockito.when;
import java.util.Set;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.opensearch.testcontainers.OpensearchContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
@ -19,7 +24,11 @@ import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.testcontainers.utility.DockerImageName;
import com.iqser.red.service.persistence.service.v1.api.model.multitenancy.SearchConnection;
import com.iqser.red.service.persistence.service.v1.api.model.multitenancy.TenantResponse;
import com.iqser.red.service.search.v1.server.Application;
import com.iqser.red.service.search.v1.server.client.TenantsClient;
import com.iqser.red.service.search.v1.server.multitenancy.TenantContext;
import com.iqser.red.storage.commons.StorageAutoConfiguration;
import com.iqser.red.storage.commons.service.StorageService;
@ -34,6 +43,22 @@ public abstract class AbstractOpensearchIntegrationTest {
public static final String WAIT_FOR_WRITE_REQUESTS = "elasticsearch.refreshPolicy=wait_for";
public static final String SEARCH_BACKEND = "search.backend=opensearch";
@MockBean
private TenantsClient tenantsClient;
private static int port;
@BeforeEach
public void setupOptimize() {
TenantContext.setTenantId("redaction");
when(tenantsClient.getTenant("redaction")).thenReturn(TenantResponse.builder()
.searchConnection(SearchConnection.builder().hosts(Set.of("localhost")).port(port).scheme("http").numberOfShards("1").numberOfReplicas("5").build())
.build());
}
static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
@ -43,9 +68,7 @@ public abstract class AbstractOpensearchIntegrationTest {
esContainer.start();
String esHost = esContainer.getHttpHostAddress();
int port = Integer.parseInt(esHost.substring(esHost.lastIndexOf(':') + 1));
TestPropertyValues.of("elasticsearch.port=" + port).applyTo(configurableApplicationContext.getEnvironment());
port = Integer.parseInt(esHost.substring(esHost.lastIndexOf(':') + 1));
}
}