Pull request #76: RED-4645: Multitenancy for storage
Merge in RED/search-service from RED-4645 to master * commit '64e0c20334221f7319be1260c8e40c1c954d614a': RED-4645: Multitenancy for storage
This commit is contained in:
commit
b3c4e014c8
@ -28,7 +28,7 @@
|
||||
<dependency>
|
||||
<groupId>com.iqser.red</groupId>
|
||||
<artifactId>platform-commons-dependency</artifactId>
|
||||
<version>1.21.0</version>
|
||||
<version>1.22.0</version>
|
||||
<scope>import</scope>
|
||||
<type>pom</type>
|
||||
</dependency>
|
||||
|
||||
@ -0,0 +1,45 @@
|
||||
package com.iqser.red.service.search.v1.server.multitenancy;
|
||||
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.iqser.red.service.search.v1.server.client.TenantsClient;
|
||||
import com.iqser.red.storage.commons.model.AzureStorageConnection;
|
||||
import com.iqser.red.storage.commons.model.S3StorageConnection;
|
||||
import com.iqser.red.storage.commons.service.StorageConnectionProvider;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class StorageConnectionProviderImpl implements StorageConnectionProvider {
|
||||
|
||||
private final TenantsClient tenantsClient;
|
||||
private final EncryptionDecryptionService encryptionDecryptionService;
|
||||
|
||||
|
||||
@Override
|
||||
public AzureStorageConnection getAzureStorageConnection(String tenantId) {
|
||||
|
||||
var tenant = tenantsClient.getTenant(tenantId);
|
||||
return AzureStorageConnection.builder()
|
||||
.connectionString(encryptionDecryptionService.decrypt(tenant.getAzureStorageConnection().getConnectionString()))
|
||||
.containerName(tenant.getAzureStorageConnection().getContainerName())
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public S3StorageConnection getS3StorageConnection(String tenantId) {
|
||||
|
||||
var tenant = tenantsClient.getTenant(tenantId);
|
||||
return S3StorageConnection.builder()
|
||||
.key(tenant.getS3StorageConnection().getKey())
|
||||
.secret(encryptionDecryptionService.decrypt(tenant.getS3StorageConnection().getSecret()))
|
||||
.signerType(tenant.getS3StorageConnection().getSignerType())
|
||||
.bucketName(tenant.getS3StorageConnection().getBucketName())
|
||||
.region(tenant.getS3StorageConnection().getRegion())
|
||||
.endpoint(tenant.getS3StorageConnection().getEndpoint())
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
||||
@ -4,6 +4,7 @@ import org.springframework.stereotype.Service;
|
||||
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileType;
|
||||
import com.iqser.red.service.search.v1.server.model.Text;
|
||||
import com.iqser.red.service.search.v1.server.multitenancy.TenantContext;
|
||||
import com.iqser.red.storage.commons.exception.StorageObjectDoesNotExist;
|
||||
import com.iqser.red.storage.commons.service.StorageService;
|
||||
|
||||
@ -23,7 +24,7 @@ public class TextStorageService {
|
||||
public Text getText(String dossierId, String fileId) {
|
||||
|
||||
try {
|
||||
return storageService.readJSONObject(StorageIdUtils.getStorageId(dossierId, fileId, FileType.SIMPLIFIED_TEXT), Text.class);
|
||||
return storageService.readJSONObject(TenantContext.getTenantId(), StorageIdUtils.getStorageId(dossierId, fileId, FileType.SIMPLIFIED_TEXT), Text.class);
|
||||
} catch (StorageObjectDoesNotExist e) {
|
||||
throw new RuntimeException("Text is not available", e);
|
||||
} catch (Exception e) {
|
||||
|
||||
@ -47,9 +47,20 @@ public class EsClientCache {
|
||||
.maximumSize(maximumSize)
|
||||
.expireAfterAccess(expireAfterAccess, TimeUnit.MINUTES)
|
||||
.removalListener((RemovalListener<String, EsClient>) removal -> {
|
||||
var elasticsearchClient = removal.getValue();
|
||||
elasticsearchClient.shutdown();
|
||||
log.info("Closed elasticsearch client for tenant {}", removal.getKey());
|
||||
var clientToRemove = removal.getValue();
|
||||
int numberOfUsersForSameClient = 0;
|
||||
for (var client : clients.asMap().values()) {
|
||||
if (clientToRemove.getElasticsearchClient().equals(client.getElasticsearchClient())){
|
||||
numberOfUsersForSameClient++;
|
||||
}
|
||||
}
|
||||
if(numberOfUsersForSameClient <= 1){
|
||||
clientToRemove.shutdown();
|
||||
log.info("Closed elasticsearch client for tenant {}", removal.getKey());
|
||||
} else {
|
||||
log.info("Keeping client open from {} because it is still used by {} other tenants", removal.getKey(), numberOfUsersForSameClient - 1);
|
||||
}
|
||||
|
||||
})
|
||||
.build(new CacheLoader<>() {
|
||||
public EsClient load(String key) {
|
||||
|
||||
@ -47,9 +47,20 @@ public class OpensearchClientCache {
|
||||
.maximumSize(maximumSize)
|
||||
.expireAfterAccess(expireAfterAccess, TimeUnit.MINUTES)
|
||||
.removalListener((RemovalListener<String, OpensearchClient>) removal -> {
|
||||
var client = removal.getValue();
|
||||
client.shutdown();
|
||||
log.info("Closed opensearch client for tenant {}", removal.getKey());
|
||||
var clientToRemove = removal.getValue();
|
||||
int numberOfUsersForSameClient = 0;
|
||||
for (var client : clients.asMap().values()) {
|
||||
if (clientToRemove.getClient().equals(client.getClient())){
|
||||
numberOfUsersForSameClient++;
|
||||
}
|
||||
}
|
||||
if(numberOfUsersForSameClient <= 1){
|
||||
clientToRemove.shutdown();
|
||||
log.info("Closed elasticsearch client for tenant {}", removal.getKey());
|
||||
} else {
|
||||
log.info("Keeping client open from {} because it is still used by {} other tenants", removal.getKey(), numberOfUsersForSameClient - 1);
|
||||
}
|
||||
|
||||
})
|
||||
.build(new CacheLoader<>() {
|
||||
public OpensearchClient load(String key) {
|
||||
|
||||
@ -37,18 +37,5 @@ management:
|
||||
search:
|
||||
backend: elasticsearch
|
||||
|
||||
elasticsearch:
|
||||
hosts:
|
||||
- ${elasticsearch.cluster.hosts}
|
||||
port: ${elasticsearch.cluster.port:9200}
|
||||
scheme: ${elasticsearch.cluster.scheme:http}
|
||||
username: ${elasticsearch.cluster.username}
|
||||
password: ${elasticsearch.cluster.password}
|
||||
apiKeyAuth: ${elasticsearch.cluster.apikey}
|
||||
|
||||
storage:
|
||||
signer-type: 'AWSS3V4SignerType'
|
||||
bucket-name: 'redaction'
|
||||
region: 'us-east-1'
|
||||
endpoint: 'https://s3.amazonaws.com'
|
||||
backend: 's3'
|
||||
|
||||
@ -32,7 +32,7 @@ public class FileSystemBackedStorageService implements StorageService {
|
||||
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public InputStreamResource getObject(String objectId) {
|
||||
public InputStreamResource getObject(String tenantId, String objectId) {
|
||||
|
||||
var res = dataMap.get(objectId);
|
||||
if (res == null) {
|
||||
@ -44,28 +44,22 @@ public class FileSystemBackedStorageService implements StorageService {
|
||||
|
||||
|
||||
@Override
|
||||
public void deleteObject(String objectId) {
|
||||
public void deleteObject(String tenantId, String objectId) {
|
||||
|
||||
dataMap.remove(objectId);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean objectExists(String objectId) {
|
||||
public boolean objectExists(String tenantId, String objectId) {
|
||||
|
||||
return dataMap.containsKey(objectId);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
@SneakyThrows
|
||||
public <T> void storeJSONObject(String objectId, T any) {
|
||||
public <T> void storeJSONObject(String tenantId, String objectId, T any) {
|
||||
|
||||
File tempFile = File.createTempFile("test", ".tmp");
|
||||
getMapper().writeValue(new FileOutputStream(tempFile), any);
|
||||
@ -81,7 +75,7 @@ public class FileSystemBackedStorageService implements StorageService {
|
||||
|
||||
@Override
|
||||
@SneakyThrows
|
||||
public <T> T readJSONObject(String objectId, Class<T> clazz) {
|
||||
public <T> T readJSONObject(String tenantId, String objectId, Class<T> clazz) {
|
||||
|
||||
if (dataMap.get(objectId) == null || !dataMap.get(objectId).exists()) {
|
||||
throw new StorageObjectDoesNotExist("Stored object not found");
|
||||
@ -104,7 +98,7 @@ public class FileSystemBackedStorageService implements StorageService {
|
||||
|
||||
@Override
|
||||
@SneakyThrows
|
||||
public void storeObject(String objectId, InputStream stream) {
|
||||
public void storeObject(String tenantId, String objectId, InputStream stream) {
|
||||
|
||||
File tempFile = File.createTempFile("test", ".tmp");
|
||||
|
||||
|
||||
@ -22,6 +22,7 @@ import com.iqser.red.service.search.v1.server.client.FileStatusProcessingUpdateC
|
||||
import com.iqser.red.service.search.v1.server.client.IndexInformationClient;
|
||||
import com.iqser.red.service.search.v1.server.controller.SearchController;
|
||||
import com.iqser.red.service.search.v1.server.model.Text;
|
||||
import com.iqser.red.service.search.v1.server.multitenancy.TenantContext;
|
||||
import com.iqser.red.service.search.v1.server.queue.IndexingMessageReceiver;
|
||||
import com.iqser.red.service.search.v1.server.service.utils.MetricValidationUtils;
|
||||
|
||||
@ -66,7 +67,7 @@ public class MetricsIntegrationTest extends AbstractElasticsearchIntegrationTest
|
||||
|
||||
ClassPathResource textResource = new ClassPathResource("files/Text2.json");
|
||||
Text text = objectMapper.readValue(textResource.getInputStream(), Text.class);
|
||||
storageService.storeJSONObject(TextStorageService.StorageIdUtils.getStorageId("1", "1", FileType.SIMPLIFIED_TEXT), text);
|
||||
storageService.storeJSONObject(TenantContext.getTenantId(), TextStorageService.StorageIdUtils.getStorageId("1", "1", FileType.SIMPLIFIED_TEXT), text);
|
||||
|
||||
IndexMessage indexRequest = new IndexMessage();
|
||||
indexRequest.setDossierId("1");
|
||||
|
||||
@ -10,14 +10,7 @@ spring:
|
||||
allow-circular-references: true # FIXME
|
||||
|
||||
storage:
|
||||
signer-type: 'AWSS3V4SignerType'
|
||||
bucket-name: 'redaction'
|
||||
region: 'us-east-1'
|
||||
endpoint: 'https://s3.amazonaws.com'
|
||||
backend: 's3'
|
||||
elasticsearch:
|
||||
hosts:
|
||||
- 'localhost'
|
||||
|
||||
management:
|
||||
endpoint:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user