RED-8031 - Migration for non gzipped files #237
@ -47,9 +47,11 @@ public class SaasMigrationService implements TenantSyncService {
|
||||
|
||||
private final SaasAnnotationIdMigrationService saasAnnotationIdMigrationService;
|
||||
|
||||
private final UncompressedFilesMigrationService uncompressedFilesMigrationService;
|
||||
|
||||
|
||||
@Override
|
||||
public void syncTenant(TenantSyncEvent tenantSyncEvent) {
|
||||
public synchronized void syncTenant(TenantSyncEvent tenantSyncEvent) {
|
||||
|
||||
startMigrationForTenant(tenantSyncEvent.getTenantId());
|
||||
}
|
||||
@ -59,9 +61,12 @@ public class SaasMigrationService implements TenantSyncService {
|
||||
public void startMigrationForTenant(String tenantId) {
|
||||
|
||||
// TODO migrate rules.
|
||||
|
||||
automaticAnalysisJob.stopForTenant(tenantId);
|
||||
|
||||
log.info("Starting uncompressed files migration ...");
|
||||
uncompressedFilesMigrationService.migrateUncompressedFiles(tenantId);
|
||||
log.info("Finished uncompressed files migration ...");
|
||||
|
||||
int numberOfFiles = 0;
|
||||
var dossiers = dossierService.getAllDossiers().stream().filter(dossier -> dossier.getHardDeletedTime() == null).toList();
|
||||
for (var dossier : dossiers) {
|
||||
|
||||
@ -0,0 +1,114 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.migration;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.iqser.red.storage.commons.exception.StorageException;
|
||||
import com.iqser.red.storage.commons.exception.StorageObjectDoesNotExist;
|
||||
import com.iqser.red.storage.commons.service.StorageClientCache;
|
||||
import com.iqser.red.storage.commons.service.StorageService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
|
||||
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
|
||||
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
|
||||
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class UncompressedFilesMigrationService {
|
||||
|
||||
private final StorageClientCache storageClientCache;
|
||||
|
||||
private final StorageService storageService;
|
||||
|
||||
|
||||
@SneakyThrows
|
||||
public void migrateUncompressedFiles(String tenant) {
|
||||
|
||||
var client = storageClientCache.getClient(tenant);
|
||||
if (client.getS3StorageClient() != null) {
|
||||
|
||||
var keysToMigrate = new ArrayList<String>();
|
||||
ListObjectsV2Request listObjects = ListObjectsV2Request
|
||||
.builder()
|
||||
.bucket(client.getS3StorageClient().getS3StorageConnection().getBucketName())
|
||||
.maxKeys(100)
|
||||
.build();
|
||||
|
||||
var response = client.getS3StorageClient().listObjectsV2Paginator(listObjects);
|
||||
|
||||
var counter = new AtomicInteger();
|
||||
for (var listResponse : response) {
|
||||
|
||||
var objects = listResponse.contents();
|
||||
for (var object : objects) {
|
||||
|
||||
counter.incrementAndGet();
|
||||
if (!object.key().toLowerCase(Locale.ROOT).endsWith(".gz")) {
|
||||
log.info("Found key to migrate/compress: {}", object.key());
|
||||
keysToMigrate.add(object.key());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Total files that require compression: {} / {} ", keysToMigrate.size(), counter.get());
|
||||
|
||||
String tmpdir = Files.createTempDirectory("compressed-migration").toFile().getAbsolutePath();
|
||||
|
||||
for (var key : keysToMigrate) {
|
||||
|
||||
log.info("Migrating key: {}", key);
|
||||
var getObjectRequest = GetObjectRequest
|
||||
.builder()
|
||||
.bucket(client.getS3StorageClient().getS3StorageConnection().getBucketName())
|
||||
.key(key)
|
||||
.build();
|
||||
|
||||
var tempFile = new File(tmpdir, key);
|
||||
// in case it was created by a previous migration
|
||||
tempFile.mkdirs();
|
||||
tempFile.delete();
|
||||
|
||||
client.getS3StorageClient().getObject(getObjectRequest, tempFile.toPath());
|
||||
|
||||
var fis = new FileInputStream(tempFile);
|
||||
storageService.storeObject(tenant, key, fis);
|
||||
|
||||
IOUtils.closeQuietly(fis);
|
||||
try {
|
||||
tempFile.delete();
|
||||
} catch (Exception e) {
|
||||
log.debug("Failed to delete temp file: {}", tempFile.getAbsolutePath());
|
||||
}
|
||||
|
||||
var deleteObjectsRequest = DeleteObjectRequest.builder()
|
||||
.bucket(client.getS3StorageClient().getS3StorageConnection().getBucketName())
|
||||
.key(key)
|
||||
.build();
|
||||
|
||||
try {
|
||||
client.getS3StorageClient().deleteObject(deleteObjectsRequest);
|
||||
} catch (NoSuchKeyException e) {
|
||||
throw new StorageObjectDoesNotExist(e);
|
||||
}
|
||||
|
||||
log.info("Key: {} migrated successfully", key);
|
||||
}
|
||||
|
||||
} else {
|
||||
throw new StorageException("No client available for tenant: " + tenant);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user