Merge branch 'RED-8175' into 'master'

Fixed transitive dependencies and concurrent download issue RED-8175

Closes RED-8175

See merge request redactmanager/persistence-service!295
This commit is contained in:
Timo Bejan 2024-01-08 11:23:38 +01:00
commit be705a5406
10 changed files with 143 additions and 31 deletions

View File

@ -6,9 +6,9 @@ plugins {
val springBootStarterVersion = "3.1.5"
dependencies {
api(project(":persistence-service-shared-api-v1"))
api(project(":persistence-service-external-api-v1"))
api(project(":persistence-service-internal-api-v1"))
api(project(":persistence-service-shared-api-v1"))
api("com.iqser.red.service:pdftron-redaction-service-api-v1:${rootProject.extra.get("pdftronRedactionServiceVersion")}") {
exclude(group = "com.iqser.red.service", module = "persistence-service-internal-api-v1")
exclude(group = "com.iqser.red.service", module = "persistence-service-shared-api-v1")
@ -47,13 +47,13 @@ dependencies {
api("org.springframework.boot:spring-boot-starter-web:${springBootStarterVersion}")
api("com.iqser.red.commons:spring-commons:2.1.0")
api("com.iqser.red.commons:jackson-commons:2.1.0")
api("org.apache.commons:commons-compress:1.21")
api("com.iqser.red.commons:storage-commons:2.45.0")
api("com.iqser.red.commons:spring-boot-starter-web-custom-commons:2.1.0")
api("com.iqser.red.commons:metric-commons:2.1.0")
api("org.apache.commons:commons-compress:1.21")
api("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2")
api("org.postgresql:postgresql:42.2.23")
api("org.apache.commons:commons-lang3:3.12.0")
api("com.iqser.red.commons:spring-boot-starter-web-custom-commons:2.1.0")
api("com.iqser.red.commons:metric-commons:2.1.0")
api("com.opencsv:opencsv:5.4")
api("org.springframework.cloud:spring-cloud-starter-openfeign:4.0.4")
api("commons-validator:commons-validator:1.7")

View File

@ -23,6 +23,10 @@ public class MessagingConfiguration {
public static final String DOWNLOAD_QUEUE = "downloadQueue";
public static final String DOWNLOAD_DLQ = "downloadDLQ";
public static final String DOWNLOAD_COMPRESSION_QUEUE = "download_compression_queue";
public static final String DOWNLOAD_COMPRESSION_DLQ = "download_compression_dlq";
public static final String EXPORT_DOWNLOAD_QUEUE = "exportDownloadQueue";
public static final String EXPORT_DOWNLOAD_DLQ = "exportDownloadDLQ";
@ -227,6 +231,19 @@ public class MessagingConfiguration {
return QueueBuilder.durable(DOWNLOAD_DLQ).build();
}
@Bean
public Queue downloadCompressionQueue() {
return QueueBuilder.durable(DOWNLOAD_COMPRESSION_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", DOWNLOAD_COMPRESSION_DLQ).build();
}
@Bean
public Queue downloadCompressionQueueDLQ() {
return QueueBuilder.durable(DOWNLOAD_COMPRESSION_DLQ).build();
}
@Bean
public Queue exportDownloadQueue() {

View File

@ -0,0 +1,37 @@
package com.iqser.red.service.persistence.management.v1.processor.service.download;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
import com.iqser.red.service.persistence.management.v1.processor.model.DownloadJob;
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.repository.DownloadStatusRepository;
import com.iqser.red.service.persistence.service.v1.api.shared.model.download.DownloadStatusValue;
import lombok.RequiredArgsConstructor;
@Service
@RequiredArgsConstructor
public class DownloadCompressingService {
private final DownloadPreparationService downloadPreparationService;
private final DownloadStatusRepository downloadStatusRepository;
private final RabbitTemplate rabbitTemplate;
public void markDownloadForCompression(String downloadId, String userId) {
var updated = downloadStatusRepository.updateStatusOnlyIfNotAlreadySet(downloadId, DownloadStatusValue.COMPRESSING);
if (updated == 1) {
rabbitTemplate.convertAndSend(MessagingConfiguration.DOWNLOAD_COMPRESSION_QUEUE, DownloadJob.builder().storageId(downloadId).userId(userId).build());
}
}
public void compressDownload(String downloadId) {
downloadPreparationService.createDownload(downloadId);
downloadPreparationService.clearRedactionStatusEntries(downloadId);
}
}

View File

@ -0,0 +1,34 @@
package com.iqser.red.service.persistence.management.v1.processor.service.download;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
import com.iqser.red.service.persistence.management.v1.processor.model.DownloadJob;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
@RequiredArgsConstructor
public class DownloadCompressionMessageReceiver {
private final DownloadCompressingService downloadCompressingService;
private final ObjectMapper objectMapper;
@SneakyThrows
@RabbitHandler
@RabbitListener(queues = MessagingConfiguration.DOWNLOAD_COMPRESSION_QUEUE)
public void receive(DownloadJob downloadJob) throws JsonProcessingException {
downloadCompressingService.compressDownload(downloadJob.getStorageId());
}
}

View File

@ -1,13 +1,9 @@
package com.iqser.red.service.persistence.management.v1.processor.service.download;
import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
import com.iqser.red.service.persistence.management.v1.processor.model.DownloadJob;
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DownloadStatusPersistenceService;
@ -24,25 +20,22 @@ import lombok.extern.slf4j.Slf4j;
public class DownloadDLQMessageReceiver {
private final DownloadStatusPersistenceService downloadStatusPersistenceService;
private final ObjectMapper objectMapper;
private final RetryTemplate retryTemplate;
@RabbitListener(queues = MessagingConfiguration.DOWNLOAD_DLQ)
public void handleDlqMessage(DownloadJob downloadJob) throws IOException {
@RabbitListener(queues = MessagingConfiguration.DOWNLOAD_COMPRESSION_DLQ)
public void handleCompressionDlqMessage(DownloadJob downloadJob) {
log.warn("Handling download job in DLQ userId: {} storageId: {} - setting status to error!", downloadJob.getUserId(), downloadJob.getStorageId());
log.warn("Handling download compression in DLQ userId: {} storageId: {} - setting status to error!", downloadJob.getUserId(), downloadJob.getStorageId());
setDownloadFailed(downloadJob.getUserId(), downloadJob.getStorageId());
}
public void setDownloadFailed(String userId, String downloadId) {
@RabbitListener(queues = MessagingConfiguration.DOWNLOAD_DLQ)
public void handleDlqMessage(DownloadJob downloadJob) {
retryTemplate.execute(retryContext -> {
log.warn("Retrying {} time to set FAILED status for downloadJob userId: {} storageId: {}", retryContext.getRetryCount(), userId, downloadId);
downloadStatusPersistenceService.updateStatus(downloadId, DownloadStatusValue.FAILED);
return null;
});
log.warn("Handling download job in DLQ userId: {} storageId: {} - setting status to error!", downloadJob.getUserId(), downloadJob.getStorageId());
setDownloadFailed(downloadJob.getUserId(), downloadJob.getStorageId());
}
@ -61,4 +54,14 @@ public class DownloadDLQMessageReceiver {
setDownloadFailed(reportResultMessage.getUserId(), reportResultMessage.getDownloadId());
}
private void setDownloadFailed(String userId, String downloadId) {
retryTemplate.execute(retryContext -> {
log.warn("Retrying {} time to set FAILED status for downloadJob userId: {} storageId: {}", retryContext.getRetryCount(), userId, downloadId);
downloadStatusPersistenceService.updateStatus(downloadId, DownloadStatusValue.FAILED);
return null;
});
}
}

View File

@ -176,8 +176,9 @@ public class DownloadPreparationService {
}
public void createDownload(List<DownloadRedactionFileStatusEntity> redactionFileResults, String downloadId) {
public void createDownload(String downloadId) {
var redactionFileResults = downloadRedactionFileStatusRepository.findAllByDownloadStorageId(downloadId);
DownloadStatusEntity downloadStatus = downloadStatusPersistenceService.getStatus(downloadId);
var storedFileInformations = getStoredFileInformation(downloadId);

View File

@ -1,5 +1,6 @@
package com.iqser.red.service.persistence.management.v1.processor.service.job;
import com.iqser.red.service.persistence.management.v1.processor.service.download.DownloadCompressingService;
import com.iqser.red.service.persistence.management.v1.processor.service.download.DownloadPreparationService;
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DownloadStatusPersistenceService;
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.repository.DownloadRedactionFileStatusRepository;
@ -20,7 +21,7 @@ public class DownloadReadyJob implements Job {
private final DownloadStatusPersistenceService downloadStatusPersistenceService;
private final DownloadRedactionFileStatusRepository downloadRedactionFileStatusRepository;
private final DownloadPreparationService downloadPreparationService;
private final DownloadCompressingService downloadCompressingService;
private final TenantProvider tenantProvider;
@ -42,8 +43,7 @@ public class DownloadReadyJob implements Job {
int numberOfFiles = download.getFiles().size();
var downloadRedactionFileStatus = downloadRedactionFileStatusRepository.findAllByDownloadStorageId(download.getStorageId());
if (downloadRedactionFileStatus.size() == numberOfFiles) {
downloadPreparationService.createDownload(downloadRedactionFileStatus, download.getStorageId());
downloadPreparationService.clearRedactionStatusEntries(download.getStorageId());
downloadCompressingService.markDownloadForCompression(download.getStorageId(),download.getUserId());
}
});
});

View File

@ -7,6 +7,7 @@ import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;
import com.iqser.red.service.persistence.management.v1.processor.entity.download.DownloadStatusEntity;
import com.iqser.red.service.persistence.service.v1.api.shared.model.download.DownloadStatusValue;
@ -17,7 +18,12 @@ public interface DownloadStatusRepository extends JpaRepository<DownloadStatusEn
@Modifying
@Query("update DownloadStatusEntity ds set ds.status = :status where ds.storageId = :storageId")
void updateStatus(@Param("storageId") String storageId, @Param("status") DownloadStatusValue status);
int updateStatus(@Param("storageId") String storageId, @Param("status") DownloadStatusValue status);
@Modifying
@Transactional
@Query("update DownloadStatusEntity ds set ds.status = :status where ds.storageId = :storageId and ds.status != :status")
int updateStatusOnlyIfNotAlreadySet(@Param("storageId") String storageId, @Param("status") DownloadStatusValue status);
@Modifying
@Query("update DownloadStatusEntity ds set ds.lastDownload = :lastDownload where ds.storageId = :storageId")

View File

@ -11,8 +11,6 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mock.web.MockMultipartFile;
@ -26,6 +24,8 @@ import com.iqser.red.service.peristence.v1.server.integration.service.DossierTem
import com.iqser.red.service.peristence.v1.server.integration.service.DossierTesterAndProvider;
import com.iqser.red.service.peristence.v1.server.integration.service.FileTesterAndProvider;
import com.iqser.red.service.peristence.v1.server.integration.utils.AbstractPersistenceServerServiceTest;
import com.iqser.red.service.persistence.management.v1.processor.model.DownloadJob;
import com.iqser.red.service.persistence.management.v1.processor.service.download.DownloadCompressionMessageReceiver;
import com.iqser.red.service.persistence.management.v1.processor.service.download.DownloadReportMessageReceiver;
import com.iqser.red.service.persistence.management.v1.processor.service.download.RedactionResultMessageReceiver;
import com.iqser.red.service.persistence.management.v1.processor.service.job.DownloadReadyJob;
@ -45,10 +45,11 @@ import com.iqser.red.service.redaction.report.v1.api.model.StoredFileInformation
import com.iqser.red.storage.commons.service.StorageService;
import com.knecon.fforesight.tenantcommons.TenantContext;
import com.knecon.fforesight.tenantcommons.model.TenantResponse;
import lombok.AccessLevel;
import lombok.SneakyThrows;
import lombok.experimental.FieldDefaults;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@FieldDefaults(level = AccessLevel.PRIVATE)
public class DownloadPreparationTest extends AbstractPersistenceServerServiceTest {
@ -92,6 +93,9 @@ public class DownloadPreparationTest extends AbstractPersistenceServerServiceTes
@Autowired
DownloadStatusPersistenceService downloadStatusPersistenceService;
@Autowired
DownloadCompressionMessageReceiver downloadCompressionMessageReceiver;
@BeforeEach
public void createTestData() {
@ -152,12 +156,13 @@ public class DownloadPreparationTest extends AbstractPersistenceServerServiceTes
when(this.tenantsClient.getTenants()).thenReturn(List.of(TenantResponse.builder().tenantId("redaction").details(Map.of("persistence-service-ready", true)).build()));
downloadReadyJob.execute(null); // Will be called by scheduler in prod.
var firstStatus = getFirstStatus();
assertThat(getFirstStatus().getStatus()).isEqualTo(DownloadStatusValue.COMPRESSING);
List<DownloadStatus> finalDownloadStatuses = downloadClient.getDownloadStatus().getDownloadStatus();
assertThat(finalDownloadStatuses).hasSize(1);
DownloadStatus finalDownloadStatus = finalDownloadStatuses.get(0);
assertThat(finalDownloadStatus.getStatus()).isEqualTo(DownloadStatusValue.READY);
assertThat(finalDownloadStatus.getFileSize()).isGreaterThan(0);
downloadCompressionMessageReceiver.receive(DownloadJob.builder().storageId(firstStatus.getStorageId()).build());
firstStatus = getFirstStatus();
assertThat(firstStatus.getStatus()).isEqualTo(DownloadStatusValue.READY);
assertThat(firstStatus.getFileSize()).isGreaterThan(0);
clearTenantContext();
}
@ -206,6 +211,14 @@ public class DownloadPreparationTest extends AbstractPersistenceServerServiceTes
}
private DownloadStatus getFirstStatus() {
List<DownloadStatus> finalDownloadStatuses = downloadClient.getDownloadStatus().getDownloadStatus();
assertThat(finalDownloadStatuses).hasSize(1);
return finalDownloadStatuses.iterator().next();
}
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
private final class DossierWithSingleFile {

View File

@ -3,6 +3,7 @@ package com.iqser.red.service.persistence.service.v1.api.shared.model.download;
public enum DownloadStatusValue {
QUEUED,
GENERATING,
COMPRESSING,
READY,
FAILED
}