RED-8031 - Migration for non gzipped files #237
@ -47,9 +47,11 @@ public class SaasMigrationService implements TenantSyncService {
|
|||||||
|
|
||||||
private final SaasAnnotationIdMigrationService saasAnnotationIdMigrationService;
|
private final SaasAnnotationIdMigrationService saasAnnotationIdMigrationService;
|
||||||
|
|
||||||
|
private final UncompressedFilesMigrationService uncompressedFilesMigrationService;
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void syncTenant(TenantSyncEvent tenantSyncEvent) {
|
public synchronized void syncTenant(TenantSyncEvent tenantSyncEvent) {
|
||||||
|
|
||||||
startMigrationForTenant(tenantSyncEvent.getTenantId());
|
startMigrationForTenant(tenantSyncEvent.getTenantId());
|
||||||
}
|
}
|
||||||
@ -59,9 +61,12 @@ public class SaasMigrationService implements TenantSyncService {
|
|||||||
public void startMigrationForTenant(String tenantId) {
|
public void startMigrationForTenant(String tenantId) {
|
||||||
|
|
||||||
// TODO migrate rules.
|
// TODO migrate rules.
|
||||||
|
|
||||||
automaticAnalysisJob.stopForTenant(tenantId);
|
automaticAnalysisJob.stopForTenant(tenantId);
|
||||||
|
|
||||||
|
log.info("Starting uncompressed files migration ...");
|
||||||
|
uncompressedFilesMigrationService.migrateUncompressedFiles(tenantId);
|
||||||
|
log.info("Finished uncompressed files migration ...");
|
||||||
|
|
||||||
int numberOfFiles = 0;
|
int numberOfFiles = 0;
|
||||||
var dossiers = dossierService.getAllDossiers().stream().filter(dossier -> dossier.getHardDeletedTime() == null).toList();
|
var dossiers = dossierService.getAllDossiers().stream().filter(dossier -> dossier.getHardDeletedTime() == null).toList();
|
||||||
for (var dossier : dossiers) {
|
for (var dossier : dossiers) {
|
||||||
|
|||||||
@ -0,0 +1,114 @@
|
|||||||
|
package com.iqser.red.service.persistence.management.v1.processor.migration;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Locale;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import com.iqser.red.storage.commons.exception.StorageException;
|
||||||
|
import com.iqser.red.storage.commons.exception.StorageObjectDoesNotExist;
|
||||||
|
import com.iqser.red.storage.commons.service.StorageClientCache;
|
||||||
|
import com.iqser.red.storage.commons.service.StorageService;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.SneakyThrows;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
|
||||||
|
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
|
||||||
|
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
|
||||||
|
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class UncompressedFilesMigrationService {
|
||||||
|
|
||||||
|
private final StorageClientCache storageClientCache;
|
||||||
|
|
||||||
|
private final StorageService storageService;
|
||||||
|
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
|
public void migrateUncompressedFiles(String tenant) {
|
||||||
|
|
||||||
|
var client = storageClientCache.getClient(tenant);
|
||||||
|
if (client.getS3StorageClient() != null) {
|
||||||
|
|
||||||
|
var keysToMigrate = new ArrayList<String>();
|
||||||
|
ListObjectsV2Request listObjects = ListObjectsV2Request
|
||||||
|
.builder()
|
||||||
|
.bucket(client.getS3StorageClient().getS3StorageConnection().getBucketName())
|
||||||
|
.maxKeys(100)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
var response = client.getS3StorageClient().listObjectsV2Paginator(listObjects);
|
||||||
|
|
||||||
|
var counter = new AtomicInteger();
|
||||||
|
for (var listResponse : response) {
|
||||||
|
|
||||||
|
var objects = listResponse.contents();
|
||||||
|
for (var object : objects) {
|
||||||
|
|
||||||
|
counter.incrementAndGet();
|
||||||
|
if (!object.key().toLowerCase(Locale.ROOT).endsWith(".gz")) {
|
||||||
|
log.info("Found key to migrate/compress: {}", object.key());
|
||||||
|
keysToMigrate.add(object.key());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("Total files that require compression: {} / {} ", keysToMigrate.size(), counter.get());
|
||||||
|
|
||||||
|
String tmpdir = Files.createTempDirectory("compressed-migration").toFile().getAbsolutePath();
|
||||||
|
|
||||||
|
for (var key : keysToMigrate) {
|
||||||
|
|
||||||
|
log.info("Migrating key: {}", key);
|
||||||
|
var getObjectRequest = GetObjectRequest
|
||||||
|
.builder()
|
||||||
|
.bucket(client.getS3StorageClient().getS3StorageConnection().getBucketName())
|
||||||
|
.key(key)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
var tempFile = new File(tmpdir, key);
|
||||||
|
// in case it was created by a previous migration
|
||||||
|
tempFile.mkdirs();
|
||||||
|
tempFile.delete();
|
||||||
|
|
||||||
|
client.getS3StorageClient().getObject(getObjectRequest, tempFile.toPath());
|
||||||
|
|
||||||
|
var fis = new FileInputStream(tempFile);
|
||||||
|
storageService.storeObject(tenant, key, fis);
|
||||||
|
|
||||||
|
IOUtils.closeQuietly(fis);
|
||||||
|
try {
|
||||||
|
tempFile.delete();
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.debug("Failed to delete temp file: {}", tempFile.getAbsolutePath());
|
||||||
|
}
|
||||||
|
|
||||||
|
var deleteObjectsRequest = DeleteObjectRequest.builder()
|
||||||
|
.bucket(client.getS3StorageClient().getS3StorageConnection().getBucketName())
|
||||||
|
.key(key)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
try {
|
||||||
|
client.getS3StorageClient().deleteObject(deleteObjectsRequest);
|
||||||
|
} catch (NoSuchKeyException e) {
|
||||||
|
throw new StorageObjectDoesNotExist(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("Key: {} migrated successfully", key);
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
throw new StorageException("No client available for tenant: " + tenant);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user