Fixed transitive dependencies and concurrent download issue RED-8175
This commit is contained in:
parent
ea9af28633
commit
d0571a436c
@ -6,9 +6,9 @@ plugins {
|
||||
val springBootStarterVersion = "3.1.5"
|
||||
|
||||
dependencies {
|
||||
api(project(":persistence-service-shared-api-v1"))
|
||||
api(project(":persistence-service-external-api-v1"))
|
||||
api(project(":persistence-service-internal-api-v1"))
|
||||
api(project(":persistence-service-shared-api-v1"))
|
||||
api("com.iqser.red.service:pdftron-redaction-service-api-v1:${rootProject.extra.get("pdftronRedactionServiceVersion")}") {
|
||||
exclude(group = "com.iqser.red.service", module = "persistence-service-internal-api-v1")
|
||||
exclude(group = "com.iqser.red.service", module = "persistence-service-shared-api-v1")
|
||||
@ -47,13 +47,13 @@ dependencies {
|
||||
api("org.springframework.boot:spring-boot-starter-web:${springBootStarterVersion}")
|
||||
api("com.iqser.red.commons:spring-commons:2.1.0")
|
||||
api("com.iqser.red.commons:jackson-commons:2.1.0")
|
||||
api("org.apache.commons:commons-compress:1.21")
|
||||
api("com.iqser.red.commons:storage-commons:2.45.0")
|
||||
api("com.iqser.red.commons:spring-boot-starter-web-custom-commons:2.1.0")
|
||||
api("com.iqser.red.commons:metric-commons:2.1.0")
|
||||
api("org.apache.commons:commons-compress:1.21")
|
||||
api("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2")
|
||||
api("org.postgresql:postgresql:42.2.23")
|
||||
api("org.apache.commons:commons-lang3:3.12.0")
|
||||
api("com.iqser.red.commons:spring-boot-starter-web-custom-commons:2.1.0")
|
||||
api("com.iqser.red.commons:metric-commons:2.1.0")
|
||||
api("com.opencsv:opencsv:5.4")
|
||||
api("org.springframework.cloud:spring-cloud-starter-openfeign:4.0.4")
|
||||
api("commons-validator:commons-validator:1.7")
|
||||
|
||||
@ -23,6 +23,10 @@ public class MessagingConfiguration {
|
||||
public static final String DOWNLOAD_QUEUE = "downloadQueue";
|
||||
public static final String DOWNLOAD_DLQ = "downloadDLQ";
|
||||
|
||||
public static final String DOWNLOAD_COMPRESSION_QUEUE = "download_compression_queue";
|
||||
public static final String DOWNLOAD_COMPRESSION_DLQ = "download_compression_dlq";
|
||||
|
||||
|
||||
public static final String EXPORT_DOWNLOAD_QUEUE = "exportDownloadQueue";
|
||||
public static final String EXPORT_DOWNLOAD_DLQ = "exportDownloadDLQ";
|
||||
|
||||
@ -227,6 +231,19 @@ public class MessagingConfiguration {
|
||||
return QueueBuilder.durable(DOWNLOAD_DLQ).build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue downloadCompressionQueue() {
|
||||
|
||||
return QueueBuilder.durable(DOWNLOAD_COMPRESSION_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", DOWNLOAD_COMPRESSION_DLQ).build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue downloadCompressionQueueDLQ() {
|
||||
|
||||
return QueueBuilder.durable(DOWNLOAD_COMPRESSION_DLQ).build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue exportDownloadQueue() {
|
||||
|
||||
@ -0,0 +1,37 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.service.download;
|
||||
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.model.DownloadJob;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.repository.DownloadStatusRepository;
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.download.DownloadStatusValue;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class DownloadCompressingService {
|
||||
|
||||
private final DownloadPreparationService downloadPreparationService;
|
||||
private final DownloadStatusRepository downloadStatusRepository;
|
||||
private final RabbitTemplate rabbitTemplate;
|
||||
|
||||
|
||||
public void markDownloadForCompression(String downloadId, String userId) {
|
||||
|
||||
var updated = downloadStatusRepository.updateStatusOnlyIfNotAlreadySet(downloadId, DownloadStatusValue.COMPRESSING);
|
||||
|
||||
if (updated == 1) {
|
||||
rabbitTemplate.convertAndSend(MessagingConfiguration.DOWNLOAD_COMPRESSION_QUEUE, DownloadJob.builder().storageId(downloadId).userId(userId).build());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void compressDownload(String downloadId) {
|
||||
|
||||
downloadPreparationService.createDownload(downloadId);
|
||||
downloadPreparationService.clearRedactionStatusEntries(downloadId);
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,34 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.service.download;
|
||||
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.model.DownloadJob;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class DownloadCompressionMessageReceiver {
|
||||
|
||||
private final DownloadCompressingService downloadCompressingService;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
|
||||
@SneakyThrows
|
||||
@RabbitHandler
|
||||
@RabbitListener(queues = MessagingConfiguration.DOWNLOAD_COMPRESSION_QUEUE)
|
||||
public void receive(DownloadJob downloadJob) throws JsonProcessingException {
|
||||
|
||||
downloadCompressingService.compressDownload(downloadJob.getStorageId());
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,13 +1,9 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.service.download;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.model.DownloadJob;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DownloadStatusPersistenceService;
|
||||
@ -24,25 +20,22 @@ import lombok.extern.slf4j.Slf4j;
|
||||
public class DownloadDLQMessageReceiver {
|
||||
|
||||
private final DownloadStatusPersistenceService downloadStatusPersistenceService;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final RetryTemplate retryTemplate;
|
||||
|
||||
|
||||
@RabbitListener(queues = MessagingConfiguration.DOWNLOAD_DLQ)
|
||||
public void handleDlqMessage(DownloadJob downloadJob) throws IOException {
|
||||
@RabbitListener(queues = MessagingConfiguration.DOWNLOAD_COMPRESSION_DLQ)
|
||||
public void handleCompressionDlqMessage(DownloadJob downloadJob) {
|
||||
|
||||
log.warn("Handling download job in DLQ userId: {} storageId: {} - setting status to error!", downloadJob.getUserId(), downloadJob.getStorageId());
|
||||
log.warn("Handling download compression in DLQ userId: {} storageId: {} - setting status to error!", downloadJob.getUserId(), downloadJob.getStorageId());
|
||||
setDownloadFailed(downloadJob.getUserId(), downloadJob.getStorageId());
|
||||
}
|
||||
|
||||
|
||||
public void setDownloadFailed(String userId, String downloadId) {
|
||||
@RabbitListener(queues = MessagingConfiguration.DOWNLOAD_DLQ)
|
||||
public void handleDlqMessage(DownloadJob downloadJob) {
|
||||
|
||||
retryTemplate.execute(retryContext -> {
|
||||
log.warn("Retrying {} time to set FAILED status for downloadJob userId: {} storageId: {}", retryContext.getRetryCount(), userId, downloadId);
|
||||
downloadStatusPersistenceService.updateStatus(downloadId, DownloadStatusValue.FAILED);
|
||||
return null;
|
||||
});
|
||||
log.warn("Handling download job in DLQ userId: {} storageId: {} - setting status to error!", downloadJob.getUserId(), downloadJob.getStorageId());
|
||||
setDownloadFailed(downloadJob.getUserId(), downloadJob.getStorageId());
|
||||
}
|
||||
|
||||
|
||||
@ -61,4 +54,14 @@ public class DownloadDLQMessageReceiver {
|
||||
setDownloadFailed(reportResultMessage.getUserId(), reportResultMessage.getDownloadId());
|
||||
}
|
||||
|
||||
|
||||
private void setDownloadFailed(String userId, String downloadId) {
|
||||
|
||||
retryTemplate.execute(retryContext -> {
|
||||
log.warn("Retrying {} time to set FAILED status for downloadJob userId: {} storageId: {}", retryContext.getRetryCount(), userId, downloadId);
|
||||
downloadStatusPersistenceService.updateStatus(downloadId, DownloadStatusValue.FAILED);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -176,8 +176,9 @@ public class DownloadPreparationService {
|
||||
}
|
||||
|
||||
|
||||
public void createDownload(List<DownloadRedactionFileStatusEntity> redactionFileResults, String downloadId) {
|
||||
public void createDownload(String downloadId) {
|
||||
|
||||
var redactionFileResults = downloadRedactionFileStatusRepository.findAllByDownloadStorageId(downloadId);
|
||||
DownloadStatusEntity downloadStatus = downloadStatusPersistenceService.getStatus(downloadId);
|
||||
|
||||
var storedFileInformations = getStoredFileInformation(downloadId);
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.service.job;
|
||||
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.download.DownloadCompressingService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.download.DownloadPreparationService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DownloadStatusPersistenceService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.repository.DownloadRedactionFileStatusRepository;
|
||||
@ -20,7 +21,7 @@ public class DownloadReadyJob implements Job {
|
||||
|
||||
private final DownloadStatusPersistenceService downloadStatusPersistenceService;
|
||||
private final DownloadRedactionFileStatusRepository downloadRedactionFileStatusRepository;
|
||||
private final DownloadPreparationService downloadPreparationService;
|
||||
private final DownloadCompressingService downloadCompressingService;
|
||||
private final TenantProvider tenantProvider;
|
||||
|
||||
|
||||
@ -42,8 +43,7 @@ public class DownloadReadyJob implements Job {
|
||||
int numberOfFiles = download.getFiles().size();
|
||||
var downloadRedactionFileStatus = downloadRedactionFileStatusRepository.findAllByDownloadStorageId(download.getStorageId());
|
||||
if (downloadRedactionFileStatus.size() == numberOfFiles) {
|
||||
downloadPreparationService.createDownload(downloadRedactionFileStatus, download.getStorageId());
|
||||
downloadPreparationService.clearRedactionStatusEntries(download.getStorageId());
|
||||
downloadCompressingService.markDownloadForCompression(download.getStorageId(),download.getUserId());
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@ -7,6 +7,7 @@ import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.data.jpa.repository.Modifying;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import com.iqser.red.service.persistence.management.v1.processor.entity.download.DownloadStatusEntity;
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.download.DownloadStatusValue;
|
||||
@ -17,7 +18,12 @@ public interface DownloadStatusRepository extends JpaRepository<DownloadStatusEn
|
||||
|
||||
@Modifying
|
||||
@Query("update DownloadStatusEntity ds set ds.status = :status where ds.storageId = :storageId")
|
||||
void updateStatus(@Param("storageId") String storageId, @Param("status") DownloadStatusValue status);
|
||||
int updateStatus(@Param("storageId") String storageId, @Param("status") DownloadStatusValue status);
|
||||
|
||||
@Modifying
|
||||
@Transactional
|
||||
@Query("update DownloadStatusEntity ds set ds.status = :status where ds.storageId = :storageId and ds.status != :status")
|
||||
int updateStatusOnlyIfNotAlreadySet(@Param("storageId") String storageId, @Param("status") DownloadStatusValue status);
|
||||
|
||||
@Modifying
|
||||
@Query("update DownloadStatusEntity ds set ds.lastDownload = :lastDownload where ds.storageId = :storageId")
|
||||
|
||||
@ -11,8 +11,6 @@ import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.mock.web.MockMultipartFile;
|
||||
|
||||
@ -26,6 +24,8 @@ import com.iqser.red.service.peristence.v1.server.integration.service.DossierTem
|
||||
import com.iqser.red.service.peristence.v1.server.integration.service.DossierTesterAndProvider;
|
||||
import com.iqser.red.service.peristence.v1.server.integration.service.FileTesterAndProvider;
|
||||
import com.iqser.red.service.peristence.v1.server.integration.utils.AbstractPersistenceServerServiceTest;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.model.DownloadJob;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.download.DownloadCompressionMessageReceiver;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.download.DownloadReportMessageReceiver;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.download.RedactionResultMessageReceiver;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.job.DownloadReadyJob;
|
||||
@ -45,10 +45,11 @@ import com.iqser.red.service.redaction.report.v1.api.model.StoredFileInformation
|
||||
import com.iqser.red.storage.commons.service.StorageService;
|
||||
import com.knecon.fforesight.tenantcommons.TenantContext;
|
||||
import com.knecon.fforesight.tenantcommons.model.TenantResponse;
|
||||
|
||||
import lombok.AccessLevel;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.experimental.FieldDefaults;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@FieldDefaults(level = AccessLevel.PRIVATE)
|
||||
public class DownloadPreparationTest extends AbstractPersistenceServerServiceTest {
|
||||
@ -92,6 +93,9 @@ public class DownloadPreparationTest extends AbstractPersistenceServerServiceTes
|
||||
@Autowired
|
||||
DownloadStatusPersistenceService downloadStatusPersistenceService;
|
||||
|
||||
@Autowired
|
||||
DownloadCompressionMessageReceiver downloadCompressionMessageReceiver;
|
||||
|
||||
|
||||
@BeforeEach
|
||||
public void createTestData() {
|
||||
@ -152,12 +156,13 @@ public class DownloadPreparationTest extends AbstractPersistenceServerServiceTes
|
||||
|
||||
when(this.tenantsClient.getTenants()).thenReturn(List.of(TenantResponse.builder().tenantId("redaction").details(Map.of("persistence-service-ready", true)).build()));
|
||||
downloadReadyJob.execute(null); // Will be called by scheduler in prod.
|
||||
var firstStatus = getFirstStatus();
|
||||
assertThat(getFirstStatus().getStatus()).isEqualTo(DownloadStatusValue.COMPRESSING);
|
||||
|
||||
List<DownloadStatus> finalDownloadStatuses = downloadClient.getDownloadStatus().getDownloadStatus();
|
||||
assertThat(finalDownloadStatuses).hasSize(1);
|
||||
DownloadStatus finalDownloadStatus = finalDownloadStatuses.get(0);
|
||||
assertThat(finalDownloadStatus.getStatus()).isEqualTo(DownloadStatusValue.READY);
|
||||
assertThat(finalDownloadStatus.getFileSize()).isGreaterThan(0);
|
||||
downloadCompressionMessageReceiver.receive(DownloadJob.builder().storageId(firstStatus.getStorageId()).build());
|
||||
firstStatus = getFirstStatus();
|
||||
assertThat(firstStatus.getStatus()).isEqualTo(DownloadStatusValue.READY);
|
||||
assertThat(firstStatus.getFileSize()).isGreaterThan(0);
|
||||
|
||||
clearTenantContext();
|
||||
}
|
||||
@ -206,6 +211,14 @@ public class DownloadPreparationTest extends AbstractPersistenceServerServiceTes
|
||||
}
|
||||
|
||||
|
||||
private DownloadStatus getFirstStatus() {
|
||||
|
||||
List<DownloadStatus> finalDownloadStatuses = downloadClient.getDownloadStatus().getDownloadStatus();
|
||||
assertThat(finalDownloadStatuses).hasSize(1);
|
||||
return finalDownloadStatuses.iterator().next();
|
||||
}
|
||||
|
||||
|
||||
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
|
||||
private final class DossierWithSingleFile {
|
||||
|
||||
|
||||
@ -3,6 +3,7 @@ package com.iqser.red.service.persistence.service.v1.api.shared.model.download;
|
||||
public enum DownloadStatusValue {
|
||||
QUEUED,
|
||||
GENERATING,
|
||||
COMPRESSING,
|
||||
READY,
|
||||
FAILED
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user