RED-7682: Fixed Joining of parallel processed redaction of files when service is scaled to more than 1
This commit is contained in:
parent
656183d33c
commit
6f87c0bcec
@ -0,0 +1,40 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.entity.download;
|
||||
|
||||
import com.iqser.red.service.pdftron.redaction.v1.api.model.RedactionResultDetail;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.utils.JSONDownloadRedactionFileDetailsConverter;
|
||||
import jakarta.persistence.*;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
@Entity
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Table(name = "download_redaction_file_status")
|
||||
public class DownloadRedactionFileStatusEntity {
|
||||
|
||||
@Id
|
||||
private String id;
|
||||
|
||||
@Column
|
||||
private String downloadStorageId;
|
||||
|
||||
@Column
|
||||
private String fileId;
|
||||
|
||||
@Column
|
||||
private Integer processingErrorCounter;
|
||||
|
||||
@Builder.Default
|
||||
@Column(columnDefinition = "text", name = "details")
|
||||
@Convert(converter = JSONDownloadRedactionFileDetailsConverter.class)
|
||||
private List<RedactionResultDetail> details = new ArrayList<>();
|
||||
|
||||
|
||||
}
|
||||
@ -2,6 +2,7 @@ package com.iqser.red.service.persistence.management.v1.processor.jobs;
|
||||
|
||||
import java.text.ParseException;
|
||||
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.job.*;
|
||||
import org.quartz.CronExpression;
|
||||
import org.quartz.CronScheduleBuilder;
|
||||
import org.quartz.JobBuilder;
|
||||
@ -11,13 +12,6 @@ import org.quartz.TriggerBuilder;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.job.AutomaticAnalysisJob;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.job.DeletedFilesCleanupJob;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.job.DownloadCleanupJob;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.job.KeyCloakUserSyncJob;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.job.SendNotificationEmailJob;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.job.SyncUserPermissionsJob;
|
||||
|
||||
@Configuration
|
||||
public class CreateJobsConfiguration {
|
||||
|
||||
@ -164,4 +158,31 @@ public class CreateJobsConfiguration {
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Bean
|
||||
public Trigger downloadReadyJobTrigger() throws ParseException {
|
||||
|
||||
return TriggerBuilder.newTrigger()
|
||||
.forJob(downloadReadyJobDetail())
|
||||
.withIdentity("DownloadReadyJobTrigger")
|
||||
.withDescription("Triggers DownloadReadyJob every 10 seconds")
|
||||
.withSchedule(CronScheduleBuilder.cronSchedule(new CronExpression("*/10 * * * * ?")))
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public JobDetail downloadReadyJobDetail() {
|
||||
|
||||
return JobBuilder.newJob()
|
||||
.ofType(DownloadReadyJob.class)
|
||||
.storeDurably()
|
||||
.withIdentity("DownloadReadyJob")
|
||||
.withDescription("Builds the download package if all parallel processed files are ready")
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
@ -1,17 +1,5 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.service.download;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.iqser.red.service.pdftron.redaction.v1.api.model.RedactionMessage;
|
||||
@ -23,14 +11,15 @@ import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.
|
||||
import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.DossierTemplateEntity;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.FileEntity;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.ReportTemplateEntity;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.entity.download.DownloadRedactionFileStatusEntity;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.entity.download.DownloadStatusEntity;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.model.RedactionFileResult;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.ColorsService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.FileManagementStorageService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DossierTemplatePersistenceService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DownloadStatusPersistenceService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.NotificationPersistenceService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.ReportTemplatePersistenceService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.repository.DownloadRedactionFileStatusRepository;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.settings.FileManagementServiceSettings;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.utils.FileSystemBackedArchiver;
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.audit.AddNotificationRequest;
|
||||
@ -41,20 +30,25 @@ import com.iqser.red.service.persistence.service.v1.api.shared.model.download.Do
|
||||
import com.iqser.red.service.redaction.report.v1.api.model.ReportResultMessage;
|
||||
import com.iqser.red.service.redaction.report.v1.api.model.StoredFileInformation;
|
||||
import com.knecon.fforesight.tenantcommons.TenantContext;
|
||||
|
||||
import jakarta.transaction.Transactional;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.experimental.FieldDefaults;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
|
||||
public class DownloadPreparationService {
|
||||
static int MAX_RETRY = 2;
|
||||
|
||||
DownloadStatusPersistenceService downloadStatusPersistenceService;
|
||||
FileManagementStorageService fileManagementStorageService;
|
||||
ReportTemplatePersistenceService reportTemplatePersistenceService;
|
||||
@ -65,7 +59,7 @@ public class DownloadPreparationService {
|
||||
ColorsService colorsService;
|
||||
FileManagementServiceSettings settings;
|
||||
DossierTemplatePersistenceService dossierTemplatePersistenceService;
|
||||
Map<String, List<RedactionFileResult>> redactionFileResultsMap;
|
||||
DownloadRedactionFileStatusRepository downloadRedactionFileStatusRepository;
|
||||
|
||||
|
||||
@Transactional
|
||||
@ -75,11 +69,9 @@ public class DownloadPreparationService {
|
||||
var downloadStatus = downloadStatusPersistenceService.getStatus(downloadId);
|
||||
RedactionMessage.RedactionMessageBuilder messageBuilder = this.generateGeneralRedactionMessage(downloadId, downloadStatus);
|
||||
|
||||
redactionFileResultsMap.put(downloadId, new ArrayList<>());
|
||||
|
||||
downloadStatus.getFiles().forEach(fileEntity -> {
|
||||
RedactionMessage message = messageBuilder.fileId(fileEntity.getId()).unapprovedFile(fileEntity.getWorkflowStatus() != WorkflowStatus.APPROVED).build();
|
||||
redactionFileResultsMap.get(downloadId).add(RedactionFileResult.builder().fileId(fileEntity.getId()).processed(false).retryCounter(0).build());
|
||||
log.info("Sending redaction request for downloadId:{} fileId:{} to pdftron-redaction-queue", downloadId, fileEntity.getId());
|
||||
rabbitTemplate.convertAndSend(MessagingConfiguration.PDFTRON_QUEUE, message);
|
||||
});
|
||||
@ -138,54 +130,53 @@ public class DownloadPreparationService {
|
||||
return result;
|
||||
}
|
||||
|
||||
public void processingRedactionResultMessage(RedactionResultMessage redactionResultMessage) {
|
||||
@Transactional
|
||||
public void markFileAsProcessed(RedactionResultMessage redactionResultMessage) {
|
||||
|
||||
String downloadId = redactionResultMessage.getDownloadId();
|
||||
List<RedactionFileResult> redactionFileResults = redactionFileResultsMap.get(downloadId);
|
||||
if (redactionFileResults == null) {
|
||||
log.info("The creation of download has finished for downloadId: {} ", downloadId);
|
||||
return;
|
||||
}
|
||||
long totalFiles = downloadStatusPersistenceService.getStatus(downloadId).getFiles().size();
|
||||
Optional<RedactionFileResult> resultOptional = redactionFileResults.stream().filter(r -> r.getFileId().equals(redactionResultMessage.getFileId())).findFirst();
|
||||
resultOptional.ifPresent(redactionFileResult -> {
|
||||
redactionFileResult.setRedactionResultDetailList(redactionResultMessage.getRedactionResultDetails());
|
||||
redactionFileResult.setProcessed(true);
|
||||
});
|
||||
|
||||
var processedFilesCount = redactionFileResults.stream().filter(RedactionFileResult::isProcessed).count();
|
||||
log.info("Processed {} files out of total {} files for download {}", processedFilesCount, totalFiles, downloadId);
|
||||
|
||||
if (processedFilesCount == totalFiles) {
|
||||
createDownload(redactionFileResults, downloadId);
|
||||
}
|
||||
downloadRedactionFileStatusRepository.save(DownloadRedactionFileStatusEntity.builder()
|
||||
.id(UUID.randomUUID().toString())
|
||||
.downloadStorageId(redactionResultMessage.getDownloadId())
|
||||
.fileId(redactionResultMessage.getFileId())
|
||||
.processingErrorCounter(0)
|
||||
.details(redactionResultMessage.getRedactionResultDetails())
|
||||
.build());
|
||||
}
|
||||
|
||||
public void checkForRetryProcess(String downloadId, String fileId, boolean unapprovedFile) {
|
||||
|
||||
List<RedactionFileResult> redactionFileResults = redactionFileResultsMap.get(downloadId);
|
||||
if (redactionFileResults == null) {
|
||||
log.info("The creation of download has finished for downloadId: {} ", downloadId);
|
||||
return;
|
||||
}
|
||||
Optional<RedactionFileResult> resultOptional = redactionFileResults.stream().filter(r -> r.getFileId().equals(fileId)).findFirst();
|
||||
if (resultOptional.isPresent()) {
|
||||
RedactionFileResult result = resultOptional.get();
|
||||
if (result.getRetryCounter() >= MAX_RETRY) { // update download status
|
||||
log.info("Failed download after max retries: {} for downloadId: {}, set the download status to FAILED", result.getRetryCounter(), downloadId);
|
||||
downloadStatusPersistenceService.updateStatus(downloadId, DownloadStatusValue.FAILED);
|
||||
} else { // retry to send it again
|
||||
result.increaseRetryCounter();
|
||||
var downloadStatus = downloadStatusPersistenceService.getStatus(downloadId);
|
||||
RedactionMessage.RedactionMessageBuilder messageBuilder = this.generateGeneralRedactionMessage(downloadId, downloadStatus);
|
||||
RedactionMessage message = messageBuilder.fileId(fileId).unapprovedFile(unapprovedFile).build();
|
||||
log.info("Resending redaction request for downloadId:{} fileId: {} to {}", downloadId, fileId, MessagingConfiguration.PDFTRON_QUEUE);
|
||||
rabbitTemplate.convertAndSend(MessagingConfiguration.PDFTRON_QUEUE, message);
|
||||
}
|
||||
}
|
||||
@Transactional
|
||||
public void markFileAsProcessingError(RedactionMessage redactionMessage) {
|
||||
|
||||
downloadRedactionFileStatusRepository.save(DownloadRedactionFileStatusEntity.builder()
|
||||
.id(UUID.randomUUID().toString())
|
||||
.downloadStorageId(redactionMessage.getDownloadId())
|
||||
.fileId(redactionMessage.getFileId())
|
||||
.processingErrorCounter(1)
|
||||
.build());
|
||||
}
|
||||
|
||||
public void createDownload(List<RedactionFileResult> redactionFileResults, String downloadId) {
|
||||
|
||||
@Transactional
|
||||
public void increaseProcessingErrorCounter(RedactionMessage redactionMessage, Integer numberOfErrors) {
|
||||
|
||||
downloadRedactionFileStatusRepository.updateStatusErrorInfo(redactionMessage.getDownloadId(), redactionMessage.getFileId(), numberOfErrors);
|
||||
}
|
||||
|
||||
|
||||
@Transactional
|
||||
public Optional<DownloadRedactionFileStatusEntity> getRedactionFileStatusEntry(String downloadStorageId, String fileId) {
|
||||
|
||||
return downloadRedactionFileStatusRepository.findByDownloadStorageIdAndFileId(downloadStorageId, fileId);
|
||||
}
|
||||
|
||||
|
||||
@Transactional
|
||||
public void clearRedactionStatusEntries(String downloadStorageId) {
|
||||
|
||||
downloadRedactionFileStatusRepository.deleteIfPresentByDownloadStorageId(downloadStorageId);
|
||||
}
|
||||
|
||||
|
||||
public void createDownload(List<DownloadRedactionFileStatusEntity> redactionFileResults, String downloadId) {
|
||||
|
||||
DownloadStatusEntity downloadStatus = downloadStatusPersistenceService.getStatus(downloadId);
|
||||
|
||||
@ -198,7 +189,6 @@ public class DownloadPreparationService {
|
||||
storeZipFile(downloadStatus, fileSystemBackedArchiver);
|
||||
|
||||
updateStatusToReady(downloadStatus, fileSystemBackedArchiver);
|
||||
redactionFileResultsMap.remove(downloadId);
|
||||
|
||||
notificationPersistenceService.insertNotification(AddNotificationRequest.builder()
|
||||
.userId(downloadStatus.getUserId())
|
||||
@ -211,7 +201,7 @@ public class DownloadPreparationService {
|
||||
}
|
||||
|
||||
downloadReportCleanupService.deleteTmpReportFiles(storedFileInformations.stream().map(StoredFileInformation::getStorageId).collect(Collectors.toSet()));
|
||||
redactionFileResults.forEach(redactionFileResult -> downloadReportCleanupService.deleteTmpReportFiles(redactionFileResult.getRedactionResultDetailList()
|
||||
redactionFileResults.forEach(redactionFileResult -> downloadReportCleanupService.deleteTmpReportFiles(redactionFileResult.getDetails()
|
||||
.stream()
|
||||
.map(RedactionResultDetail::getStorageId)
|
||||
.collect(Collectors.toSet())));
|
||||
@ -225,7 +215,7 @@ public class DownloadPreparationService {
|
||||
}
|
||||
|
||||
|
||||
private void generateAndAddFiles(DownloadStatusEntity downloadStatus, List<RedactionFileResult> redactionFileResults, FileSystemBackedArchiver fileSystemBackedArchiver) {
|
||||
private void generateAndAddFiles(DownloadStatusEntity downloadStatus, List<DownloadRedactionFileStatusEntity> redactionFileResults, FileSystemBackedArchiver fileSystemBackedArchiver) {
|
||||
|
||||
int i = 1;
|
||||
long fileGenerationStart = System.currentTimeMillis();
|
||||
@ -237,7 +227,7 @@ public class DownloadPreparationService {
|
||||
var isFileApproved = WorkflowStatus.APPROVED.equals(file.getWorkflowStatus());
|
||||
String filename = isFileApproved ? file.getFilename() : "UNAPPROVED_" + file.getFilename();
|
||||
for (DownloadFileType downloadFileType : downloadStatus.getDownloadFileTypes()) {
|
||||
Optional<RedactionFileResult> redactionFileResult = redactionFileResults.stream().filter(rfr -> rfr.getFileId().equals(file.getId())).findFirst();
|
||||
Optional<DownloadRedactionFileStatusEntity> redactionFileResult = redactionFileResults.stream().filter(rfr -> rfr.getFileId().equals(file.getId())).findFirst();
|
||||
|
||||
if (redactionFileResult.isEmpty()) {
|
||||
throw new RuntimeException();
|
||||
@ -248,15 +238,15 @@ public class DownloadPreparationService {
|
||||
}
|
||||
if (downloadFileType.name().equals(DownloadFileType.PREVIEW.name())) {
|
||||
fileSystemBackedArchiver.addEntry(new FileSystemBackedArchiver.ArchiveModel("Preview", addSuffix(filename, "highlighted"), //
|
||||
getPreview(file.getId(), redactionFileResult.get().getRedactionResultDetailList())));
|
||||
getPreview(file.getId(), redactionFileResult.get().getDetails())));
|
||||
}
|
||||
if (downloadFileType.name().equals(DownloadFileType.DELTA_PREVIEW.name())) {
|
||||
fileSystemBackedArchiver.addEntry(new FileSystemBackedArchiver.ArchiveModel("Delta Preview", addSuffix(filename, "delta_highlighted"), //
|
||||
getDeltaPreview(file.getId(), redactionFileResult.get().getRedactionResultDetailList())));
|
||||
getDeltaPreview(file.getId(), redactionFileResult.get().getDetails())));
|
||||
}
|
||||
if (downloadFileType.name().equals(DownloadFileType.REDACTED.name()) && isFileApproved) {
|
||||
fileSystemBackedArchiver.addEntry(new FileSystemBackedArchiver.ArchiveModel("Redacted", addSuffix(file.getFilename(), "redacted"), //
|
||||
getRedacted(file.getId(), redactionFileResult.get().getRedactionResultDetailList())));
|
||||
getRedacted(file.getId(), redactionFileResult.get().getDetails())));
|
||||
}
|
||||
|
||||
}
|
||||
@ -384,7 +374,7 @@ public class DownloadPreparationService {
|
||||
private void storeZipFile(DownloadStatusEntity downloadStatus, FileSystemBackedArchiver fileSystemBackedArchiver) {
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
try(var in = fileSystemBackedArchiver.toInputStream()) {
|
||||
try (var in = fileSystemBackedArchiver.toInputStream()) {
|
||||
fileManagementStorageService.storeObject(downloadStatus.getStorageId(), in);
|
||||
}
|
||||
log.info("Successfully stored zip for downloadId {}, took {}", downloadStatus.getStorageId(), System.currentTimeMillis() - start);
|
||||
|
||||
@ -1,23 +1,23 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.service.download;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.iqser.red.service.pdftron.redaction.v1.api.model.RedactionMessage;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DownloadStatusPersistenceService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.settings.FileManagementServiceSettings;
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.download.DownloadStatusValue;
|
||||
|
||||
import lombok.AccessLevel;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.experimental.FieldDefaults;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@ -25,40 +25,48 @@ import lombok.extern.slf4j.Slf4j;
|
||||
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
|
||||
public class RedactionDlqMessageReceiver {
|
||||
|
||||
private static final String LINE_SEPARATOR = System.lineSeparator();
|
||||
|
||||
ObjectMapper objectMapper;
|
||||
DownloadStatusPersistenceService downloadStatusPersistenceService;
|
||||
DownloadPreparationService downloadPreparationService;
|
||||
FileManagementServiceSettings settings;
|
||||
RetryTemplate retryTemplate;
|
||||
RabbitTemplate rabbitTemplate;
|
||||
|
||||
|
||||
@RabbitHandler
|
||||
@RabbitListener(queues = MessagingConfiguration.PDFTRON_DLQ)
|
||||
public void receive(Message message) throws IOException {
|
||||
|
||||
// Since we receive different message types here, we do not convert to an object here;
|
||||
// We just assume that the message contains a downloadId.
|
||||
JsonNode jsonNode = objectMapper.readTree(message.getBody());
|
||||
final String downloadId;
|
||||
final String fileId;
|
||||
try {
|
||||
downloadId = jsonNode.findValue("downloadId").asText();
|
||||
// Download packages will always be build on the pod that runs the scheduler, as we plan to replace the entire logic by downloading directory direct from storage, we leave it like that for now.
|
||||
|
||||
} catch (Exception e) {
|
||||
log.warn("Received a message in the " + MessagingConfiguration.PDFTRON_DLQ + " that contains no downloadId" + LINE_SEPARATOR + "{}", jsonNode.asText());
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
RedactionMessage redactionMessage = objectMapper.readValue(message.getBody(), RedactionMessage.class);
|
||||
|
||||
try {
|
||||
fileId = jsonNode.findValue("fileId").asText();
|
||||
var unapproved = jsonNode.findValue("unapprovedFile");
|
||||
log.info("Received a dead message with downloadId: {}, fileId: {} check for retry", downloadId, fileId);
|
||||
downloadPreparationService.checkForRetryProcess(downloadId, fileId, unapproved.asBoolean());
|
||||
var fileEntryOptional = downloadPreparationService.getRedactionFileStatusEntry(redactionMessage.getDownloadId(), redactionMessage.getFileId());
|
||||
|
||||
} catch (Exception e) {
|
||||
log.info("Received a dead message with downloadId: {}, updating the download as failed", downloadId);
|
||||
downloadStatusPersistenceService.updateStatus(downloadId, DownloadStatusValue.FAILED);
|
||||
if (fileEntryOptional.isPresent()) {
|
||||
var entry = fileEntryOptional.get();
|
||||
int numErrors = entry.getProcessingErrorCounter() + 1;
|
||||
if (numErrors >= settings.getMaxRedactionFileErrorRetries()) {
|
||||
setDownloadFailed(redactionMessage.getDownloadId());
|
||||
downloadPreparationService.clearRedactionStatusEntries(redactionMessage.getDownloadId());
|
||||
} else {
|
||||
downloadPreparationService.increaseProcessingErrorCounter(redactionMessage, numErrors);
|
||||
rabbitTemplate.convertAndSend(MessagingConfiguration.PDFTRON_QUEUE, message);
|
||||
}
|
||||
} else {
|
||||
downloadPreparationService.markFileAsProcessingError(redactionMessage);
|
||||
rabbitTemplate.convertAndSend(MessagingConfiguration.PDFTRON_QUEUE, message);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void setDownloadFailed(String downloadId) {
|
||||
|
||||
retryTemplate.execute(retryContext -> {
|
||||
log.warn("Retrying {} time to set FAILED status for downloadJob with storageId: {}", retryContext.getRetryCount(), downloadId);
|
||||
downloadStatusPersistenceService.updateStatus(downloadId, DownloadStatusValue.FAILED);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -1,21 +1,19 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.service.download;
|
||||
|
||||
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.iqser.red.service.pdftron.redaction.v1.api.model.RedactionResultMessage;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
|
||||
|
||||
import lombok.AccessLevel;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.experimental.FieldDefaults;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@ -48,7 +46,7 @@ public class RedactionResultMessageReceiver {
|
||||
|
||||
log.info("Received redaction results for downloadId: {} fileId: {}", redactionResultMessage.getDownloadId(), redactionResultMessage.getFileId());
|
||||
|
||||
downloadPreparationService.processingRedactionResultMessage(redactionResultMessage);
|
||||
downloadPreparationService.markFileAsProcessed(redactionResultMessage);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,54 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.service.job;
|
||||
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.download.DownloadPreparationService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DownloadStatusPersistenceService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.repository.DownloadRedactionFileStatusRepository;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.utils.TenantUtils;
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.download.DownloadStatusValue;
|
||||
import com.knecon.fforesight.tenantcommons.TenantContext;
|
||||
import com.knecon.fforesight.tenantcommons.TenantProvider;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.quartz.Job;
|
||||
import org.quartz.JobExecutionContext;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
@Service
|
||||
public class DownloadReadyJob implements Job {
|
||||
|
||||
private final DownloadStatusPersistenceService downloadStatusPersistenceService;
|
||||
private final DownloadRedactionFileStatusRepository downloadRedactionFileStatusRepository;
|
||||
private final DownloadPreparationService downloadPreparationService;
|
||||
private final TenantProvider tenantProvider;
|
||||
|
||||
|
||||
@Override
|
||||
public void execute(JobExecutionContext jobExecutionContext) {
|
||||
|
||||
log.debug("Running DownloadReadyJob");
|
||||
|
||||
tenantProvider.getTenants().forEach(tenant -> {
|
||||
|
||||
if (!TenantUtils.isTenantReadyForPersistence(tenant)) {
|
||||
return;
|
||||
}
|
||||
|
||||
TenantContext.setTenantId(tenant.getTenantId());
|
||||
|
||||
downloadStatusPersistenceService.getStatus().stream().filter(d -> d.getStatus().equals(DownloadStatusValue.GENERATING)).forEach(download -> {
|
||||
|
||||
int numberOfFiles = download.getFiles().size();
|
||||
var downloadRedactionFileStatus = downloadRedactionFileStatusRepository.findAllByDownloadStorageId(download.getStorageId());
|
||||
if (downloadRedactionFileStatus.size() == numberOfFiles) {
|
||||
downloadPreparationService.createDownload(downloadRedactionFileStatus, download.getStorageId());
|
||||
downloadPreparationService.clearRedactionStatusEntries(download.getStorageId());
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@ -0,0 +1,25 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.service.persistence.repository;
|
||||
|
||||
import com.iqser.red.service.persistence.management.v1.processor.entity.download.DownloadRedactionFileStatusEntity;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.data.jpa.repository.Modifying;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
public interface DownloadRedactionFileStatusRepository extends JpaRepository<DownloadRedactionFileStatusEntity, String> {
|
||||
|
||||
List<DownloadRedactionFileStatusEntity> findAllByDownloadStorageId(String storageId);
|
||||
|
||||
Optional<DownloadRedactionFileStatusEntity> findByDownloadStorageIdAndFileId(String storageId, String fileId);
|
||||
|
||||
|
||||
@Modifying
|
||||
@Query("update DownloadRedactionFileStatusEntity e set e.processingErrorCounter = :processingErrorCounter where e.downloadStorageId = :downloadStorageId and e.fileId = :fileId")
|
||||
void updateStatusErrorInfo(String downloadStorageId, String fileId, Integer processingErrorCounter);
|
||||
|
||||
@Modifying
|
||||
Integer deleteIfPresentByDownloadStorageId(String downloadStorageId);
|
||||
|
||||
}
|
||||
@ -30,6 +30,7 @@ public class FileManagementServiceSettings {
|
||||
private boolean migrateOnly;
|
||||
|
||||
private int maxErrorRetries = 1;
|
||||
private int maxRedactionFileErrorRetries = 2;
|
||||
|
||||
private boolean cvTableParsingEnabled;
|
||||
|
||||
|
||||
@ -0,0 +1,40 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.utils;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.iqser.red.service.pdftron.redaction.v1.api.model.RedactionResultDetail;
|
||||
import jakarta.persistence.AttributeConverter;
|
||||
import jakarta.persistence.Converter;
|
||||
import lombok.SneakyThrows;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Converter
|
||||
public class JSONDownloadRedactionFileDetailsConverter implements AttributeConverter<List<RedactionResultDetail>, String> {
|
||||
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public String convertToDatabaseColumn(List<RedactionResultDetail> dataSet) {
|
||||
|
||||
return objectMapper.writeValueAsString(dataSet);
|
||||
}
|
||||
|
||||
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public List<RedactionResultDetail> convertToEntityAttribute(String data) {
|
||||
|
||||
if (data == null) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
TypeReference<ArrayList<RedactionResultDetail>> typeRef = new TypeReference<>() {
|
||||
};
|
||||
return objectMapper.readValue(data, typeRef);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@ -164,4 +164,6 @@ databaseChangeLog:
|
||||
- include:
|
||||
file: db/changelog/tenant/111-make-rule-values-non-nullable.yaml
|
||||
- include:
|
||||
file: db/changelog/tenant/112-modify-section-length.yaml
|
||||
file: db/changelog/tenant/112-modify-section-length.yaml
|
||||
- include:
|
||||
file: db/changelog/tenant/114-add-download-redaction-file-status-table.yaml
|
||||
@ -0,0 +1,28 @@
|
||||
databaseChangeLog:
|
||||
- changeSet:
|
||||
id: download-redaction-file-status-table
|
||||
author: dom
|
||||
changes:
|
||||
- createTable:
|
||||
columns:
|
||||
- column:
|
||||
constraints:
|
||||
nullable: false
|
||||
primaryKey: true
|
||||
primaryKeyName: download_redaction_file_status_pkey
|
||||
name: id
|
||||
type: VARCHAR(255)
|
||||
- column:
|
||||
name: download_storage_id
|
||||
type: VARCHAR(255)
|
||||
- column:
|
||||
name: file_id
|
||||
type: VARCHAR(255)
|
||||
- column:
|
||||
name: processing_error_counter
|
||||
type: INTEGER
|
||||
- column:
|
||||
name: details
|
||||
type: VARCHAR(1024)
|
||||
tableName: download_redaction_file_status
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
databaseChangeLog:
|
||||
- changeSet:
|
||||
id: application-config-table
|
||||
id: download_redaction_file_result
|
||||
author: corina (generated)
|
||||
changes:
|
||||
- createTable:
|
||||
|
||||
@ -8,10 +8,14 @@
|
||||
<KeyValuePair key="version" value="${project.version}"/>
|
||||
</JSONLayout>
|
||||
</Console>
|
||||
<Console name="console" target="SYSTEM_OUT">
|
||||
<PatternLayout
|
||||
pattern="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n" />
|
||||
</Console>
|
||||
</Appenders>
|
||||
<Loggers>
|
||||
<Root level="info">
|
||||
<AppenderRef ref="JsonConsole"/>
|
||||
<AppenderRef ref="console"/>
|
||||
</Root>
|
||||
</Loggers>
|
||||
</Configuration>
|
||||
|
||||
@ -1,17 +1,21 @@
|
||||
package com.iqser.red.service.peristence.v1.server.integration.tests;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.job.DownloadReadyJob;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DownloadStatusPersistenceService;
|
||||
import com.knecon.fforesight.tenantcommons.TenantProvider;
|
||||
import com.knecon.fforesight.tenantcommons.model.TenantResponse;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||
import org.springframework.mock.web.MockMultipartFile;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
@ -82,6 +86,14 @@ public class DownloadPreparationTest extends AbstractPersistenceServerServiceTes
|
||||
|
||||
DossierWithSingleFile testData;
|
||||
|
||||
@Autowired
|
||||
DownloadReadyJob downloadReadyJob;
|
||||
|
||||
@Autowired
|
||||
DownloadStatusPersistenceService downloadStatusPersistenceService;
|
||||
|
||||
|
||||
|
||||
|
||||
@BeforeEach
|
||||
public void createTestData() {
|
||||
@ -137,6 +149,12 @@ public class DownloadPreparationTest extends AbstractPersistenceServerServiceTes
|
||||
.redactionResultDetails(Collections.emptyList())
|
||||
.build());
|
||||
|
||||
// This set in DownloadMessageReceiver first async step via queue is unneeded and can be removed later.
|
||||
downloadStatusPersistenceService.updateStatus(downloadId, DownloadStatusValue.GENERATING);
|
||||
|
||||
when(this.tenantsClient.getTenants()).thenReturn(List.of(TenantResponse.builder().tenantId("redaction").details(Map.of("persistence-service-ready", true)).build()));
|
||||
downloadReadyJob.execute(null); // Will be called by scheduler in prod.
|
||||
|
||||
List<DownloadStatus> finalDownloadStatuses = downloadClient.getDownloadStatus().getDownloadStatus();
|
||||
assertThat(finalDownloadStatuses).hasSize(1);
|
||||
DownloadStatus finalDownloadStatus = finalDownloadStatuses.get(0);
|
||||
|
||||
@ -6,6 +6,11 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import com.iqser.red.service.pdftron.redaction.v1.api.model.RedactionResultDetail;
|
||||
import com.iqser.red.service.pdftron.redaction.v1.api.model.RedactionType;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.repository.DownloadRedactionFileStatusRepository;
|
||||
import com.knecon.fforesight.tenantcommons.EncryptionDecryptionService;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
@ -57,8 +62,15 @@ public class DownloadTest extends AbstractPersistenceServerServiceTest {
|
||||
@Autowired
|
||||
private DownloadPreparationService downloadPreparationService;
|
||||
|
||||
@Autowired
|
||||
DownloadRedactionFileStatusRepository downloadRedactionFileStatusRepository;
|
||||
|
||||
@Autowired
|
||||
EncryptionDecryptionService encryptionDecryptionService;
|
||||
|
||||
|
||||
@Test
|
||||
@Disabled
|
||||
@SneakyThrows
|
||||
public void testDownload() {
|
||||
|
||||
@ -91,8 +103,8 @@ public class DownloadTest extends AbstractPersistenceServerServiceTest {
|
||||
|
||||
downloadPreparationService.createDownload(ReportResultMessage.builder().downloadId(downloads.getStorageId()).build());
|
||||
|
||||
downloadPreparationService.processingRedactionResultMessage(RedactionResultMessage.builder().downloadId(downloads.getStorageId()).dossierId(dossier.getId()).fileId(file.getId()).build());
|
||||
downloadPreparationService.processingRedactionResultMessage(RedactionResultMessage.builder().downloadId(downloads.getStorageId()).dossierId(dossier.getId()).fileId(file2.getId()).build());
|
||||
downloadPreparationService.markFileAsProcessed(RedactionResultMessage.builder().downloadId(downloads.getStorageId()).dossierId(dossier.getId()).fileId(file.getId()).build());
|
||||
downloadPreparationService.markFileAsProcessed(RedactionResultMessage.builder().downloadId(downloads.getStorageId()).dossierId(dossier.getId()).fileId(file2.getId()).build());
|
||||
|
||||
var statuses = downloadClient.getDownloadStatus();
|
||||
assertThat(statuses.getDownloadStatus()).isNotEmpty();
|
||||
@ -104,4 +116,12 @@ public class DownloadTest extends AbstractPersistenceServerServiceTest {
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testMarkAsProcessed(){
|
||||
|
||||
downloadPreparationService.markFileAsProcessed(RedactionResultMessage.builder().downloadId("dsfsdf").dossierId("sdfsdf").fileId("rwerwe").redactionResultDetails(List.of(RedactionResultDetail.builder().redactionType(RedactionType.REDACTED).storageId("dsfsdf").build())).build());
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -176,7 +176,7 @@ public abstract class AbstractPersistenceServerServiceTest {
|
||||
@Autowired
|
||||
private TokenService tokenService;
|
||||
@MockBean
|
||||
private TenantsClient tenantsClient;
|
||||
protected TenantsClient tenantsClient;
|
||||
@MockBean
|
||||
private UsersClient usersClient;
|
||||
@Autowired
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user