RED-3611: Use priority queue for user triggerd actions
This commit is contained in:
parent
968292ca75
commit
c43f052afb
@ -5,7 +5,6 @@ import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.
|
||||
import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.FileEntity;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.exception.BadRequestException;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.exception.NotFoundException;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.repository.DossierRepository;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.repository.FileAttributesRepository;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.repository.FileRepository;
|
||||
import com.iqser.red.service.persistence.service.v1.api.model.dossiertemplate.dossier.file.ProcessingStatus;
|
||||
@ -27,7 +26,6 @@ import java.util.stream.Collectors;
|
||||
public class FileStatusPersistenceService {
|
||||
|
||||
private final FileRepository fileRepository;
|
||||
private final DossierRepository dossierRepository;
|
||||
private final FileAttributesRepository fileAttributesRepository;
|
||||
private final FileAttributeConfigPersistenceService fileAttributeConfigPersistenceService;
|
||||
private final DossierPersistenceService dossierService;
|
||||
@ -162,7 +160,7 @@ public class FileStatusPersistenceService {
|
||||
|
||||
var fileAttributeEntities = convertFileAttributes(dossierId, file, fileAttributes);
|
||||
fileAttributesRepository.saveAllAndFlush(fileAttributeEntities);
|
||||
fileRepository.updateLastAttributeChangeDate(fileId, OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS));
|
||||
fileRepository.updateLastAttributeChangeDate(fileId,OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS));
|
||||
|
||||
}, () -> {
|
||||
throw new NotFoundException("Unknown file=" + fileId);
|
||||
@ -362,7 +360,7 @@ public class FileStatusPersistenceService {
|
||||
|
||||
|
||||
@Transactional
|
||||
public void updateFileModificationDate(String fileId, OffsetDateTime fileManipulationDate){
|
||||
public void updateFileModificationDate(String fileId, OffsetDateTime fileManipulationDate) {
|
||||
fileRepository.updateFileModificationDate(fileId, fileManipulationDate);
|
||||
}
|
||||
|
||||
|
||||
@ -12,6 +12,7 @@ import lombok.RequiredArgsConstructor;
|
||||
public class MessagingConfiguration {
|
||||
|
||||
public static final String REDACTION_QUEUE = "redactionQueue";
|
||||
public static final String REDACTION_PRIORITY_QUEUE = "redactionPriorityQueue";
|
||||
public static final String REDACTION_DQL = "redactionDQL";
|
||||
|
||||
public static final String DOWNLOAD_QUEUE = "downloadQueue";
|
||||
@ -106,6 +107,17 @@ public class MessagingConfiguration {
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue redactionPriorityQueue() {
|
||||
|
||||
return QueueBuilder.durable(REDACTION_PRIORITY_QUEUE)
|
||||
.withArgument("x-dead-letter-exchange", "")
|
||||
.withArgument("x-dead-letter-routing-key", REDACTION_DQL)
|
||||
.maxPriority(2)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue ocrQueue() {
|
||||
|
||||
|
||||
@ -129,6 +129,7 @@ public class FileAttributesController implements FileAttributesResource {
|
||||
@PathVariable(FILE_ID) String fileId, @RequestBody Map<String, String> fileAttributesMap) {
|
||||
|
||||
fileStatusPersistenceService.setFileAttributes(dossierId, fileId, fileAttributesMap);
|
||||
fileStatusService.setStatusReprocess(dossierId, fileId, true);
|
||||
indexingService.addToIndexingQueue(IndexMessageType.UPDATE, null, dossierId, fileId, 2);
|
||||
}
|
||||
|
||||
|
||||
@ -43,7 +43,6 @@ public class ReanalysisController implements ReanalysisResource {
|
||||
private final FileStatusService fileStatusService;
|
||||
private final DossierPersistenceService dossierPersistenceService;
|
||||
private final IndexingService indexingService;
|
||||
private final ReanalysisRequiredStatusService reanalysisRequiredStatusService;
|
||||
private final PDFTronRedactionClient pDFTronRedactionClient;
|
||||
|
||||
|
||||
@ -106,7 +105,7 @@ public class ReanalysisController implements ReanalysisResource {
|
||||
throw new BadRequestException("The files differ in number of pages");
|
||||
}
|
||||
pDFTronRedactionClient.importRedactions(documentRequest);
|
||||
fileStatusService.setStatusFullReprocess(documentRequest.getDossierId(), documentRequest.getFileId(), 1);
|
||||
fileStatusService.setStatusFullReprocess(documentRequest.getDossierId(), documentRequest.getFileId(), true);
|
||||
}
|
||||
|
||||
|
||||
@ -118,7 +117,7 @@ public class ReanalysisController implements ReanalysisResource {
|
||||
fileStatusService.updateFileModificationDate(textHighlightRequest.getFileId(), OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS));
|
||||
}
|
||||
if (textHighlightRequest.getOperation().equals(TextHighlightOperation.CONVERT)) {
|
||||
fileStatusService.setStatusFullReprocess(textHighlightRequest.getDossierId(), textHighlightRequest.getFileId(), 1);
|
||||
fileStatusService.setStatusFullReprocess(textHighlightRequest.getDossierId(), textHighlightRequest.getFileId(), true);
|
||||
}
|
||||
return textHighlightResponse;
|
||||
}
|
||||
@ -191,11 +190,11 @@ public class ReanalysisController implements ReanalysisResource {
|
||||
|
||||
if (force) {
|
||||
filesToReanalyse.forEach(file -> {
|
||||
fileStatusService.setStatusReprocess(dossierId, file.getId(), 2, true);
|
||||
fileStatusService.setStatusReprocess(dossierId, file.getId(), filesToReanalyse.size() == 1 ? true : false, true);
|
||||
});
|
||||
} else {
|
||||
filesToReanalyse.stream().filter(FileModel::isReanalysisRequired).forEach(file -> {
|
||||
fileStatusService.setStatusReprocess(dossierId, file.getId(), 2, true);
|
||||
fileStatusService.setStatusReprocess(dossierId, file.getId(), filesToReanalyse.size() == 1 ? true : false, true);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@ -24,7 +24,7 @@ public class ExcludeFromAnalysisService {
|
||||
|
||||
if (!excluded) {
|
||||
// if file has been re-enabled - process it
|
||||
fileStatusService.setStatusFullReprocess(dossierId, fileId, 2);
|
||||
fileStatusService.setStatusFullReprocess(dossierId, fileId, false);
|
||||
}
|
||||
|
||||
}
|
||||
@ -37,7 +37,7 @@ public class ExcludeFromAnalysisService {
|
||||
|
||||
if (!excludedFromAutomaticAnalysis) {
|
||||
// if file has been re-enabled - process it
|
||||
fileStatusService.setStatusFullReprocess(dossierId, fileId, 2);
|
||||
fileStatusService.setStatusFullReprocess(dossierId, fileId, false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -89,7 +89,7 @@ public class FileService {
|
||||
if (existingStatus != null) {
|
||||
// the file is already uploaded, just reanalyse it.
|
||||
|
||||
fileStatusService.overwriteFile(request.getDossierId(), fileId, request.getUploader(), request.getFilename(), request.getData().length);
|
||||
fileStatusService.overwriteFile(request.getDossierId(), fileId, request.getUploader(), request.getFilename());
|
||||
} else {
|
||||
// the file is new, should create a new status for it.
|
||||
log.info("File {} has no status yet, creating one and setting to unprocessed.", request.getFilename());
|
||||
|
||||
@ -37,8 +37,12 @@ public class FileStatusProcessingUpdateService {
|
||||
if (settings.isNerServiceEnabled() && !fileManagementStorageService.nerEntitiesExists(dossierId, fileId)) {
|
||||
fileStatusService.addToNerQueue(dossierId, fileId);
|
||||
} else {
|
||||
fileStatusService.setStatusAnalyse(dossierId, fileId, 1);
|
||||
fileStatusService.addToAnalysisQueue(dossierId, fileId, 2, null);
|
||||
|
||||
//TODO This might be also priority depending on what was the pervious call.
|
||||
fileStatusService.setStatusAnalyse(dossierId, fileId, false);
|
||||
|
||||
//TODO This might be also priority depending on what was the pervious call.
|
||||
fileStatusService.addToAnalysisQueue(dossierId, fileId, false, null);
|
||||
}
|
||||
break;
|
||||
|
||||
@ -73,7 +77,7 @@ public class FileStatusProcessingUpdateService {
|
||||
log.info("OCR Successful for dossier {} and file {}, Attempt to update status: {}", dossierId, fileId, retryContext.getRetryCount());
|
||||
|
||||
fileStatusService.updateLastOCRTime(fileId);
|
||||
fileStatusService.setStatusFullReprocess(dossierId, fileId, 2);
|
||||
fileStatusService.setStatusFullReprocess(dossierId, fileId, false);
|
||||
|
||||
return null;
|
||||
});
|
||||
|
||||
@ -147,7 +147,7 @@ public class FileStatusService {
|
||||
|
||||
|
||||
@Transactional
|
||||
public void setStatusReprocess(String dossierId, String fileId, int priority, Set<Integer> sectionsToReanalyse,
|
||||
public void setStatusReprocess(String dossierId, String fileId, boolean priority, Set<Integer> sectionsToReanalyse,
|
||||
boolean triggeredManually) {
|
||||
|
||||
log.info("Reprocessing file: {} from dossier {}", fileId, dossierId);
|
||||
@ -182,7 +182,7 @@ public class FileStatusService {
|
||||
|
||||
|
||||
@Transactional
|
||||
public void setStatusFullReprocess(String dossierId, String fileId, int priority) {
|
||||
public void setStatusFullReprocess(String dossierId, String fileId, boolean priority) {
|
||||
|
||||
FileEntity fileStatus = fileStatusPersistenceService.getStatus(fileId);
|
||||
|
||||
@ -202,7 +202,7 @@ public class FileStatusService {
|
||||
|
||||
|
||||
@Transactional
|
||||
protected void addToAnalysisQueue(String dossierId, String fileId, int priority, Set<Integer> sectionsToReanalyse) {
|
||||
protected void addToAnalysisQueue(String dossierId, String fileId, boolean priority, Set<Integer> sectionsToReanalyse) {
|
||||
|
||||
var dossier = dossierPersistenceService.getAndValidateDossier(dossierId);
|
||||
var fileStatus = fileStatusPersistenceService.getStatus(fileId);
|
||||
@ -246,10 +246,11 @@ public class FileStatusService {
|
||||
setStatusProcessing(fileId);
|
||||
|
||||
try {
|
||||
rabbitTemplate.convertAndSend(MessagingConfiguration.REDACTION_QUEUE, objectMapper.writeValueAsString(analyseRequest), message -> {
|
||||
message.getMessageProperties().setPriority(priority);
|
||||
return message;
|
||||
});
|
||||
if(priority){
|
||||
rabbitTemplate.convertAndSend(MessagingConfiguration.REDACTION_PRIORITY_QUEUE, objectMapper.writeValueAsString(analyseRequest));
|
||||
} else {
|
||||
rabbitTemplate.convertAndSend(MessagingConfiguration.REDACTION_QUEUE, objectMapper.writeValueAsString(analyseRequest));
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@ -283,14 +284,14 @@ public class FileStatusService {
|
||||
|
||||
|
||||
@Transactional
|
||||
public void setStatusReprocess(String dossierId, String fileId, int priority) {
|
||||
public void setStatusReprocess(String dossierId, String fileId, boolean priority) {
|
||||
|
||||
setStatusReprocess(dossierId, fileId, priority, Sets.newHashSet(), false);
|
||||
}
|
||||
|
||||
|
||||
@Transactional
|
||||
public void setStatusReprocess(String dossierId, String fileId, int priority, boolean triggeredManually) {
|
||||
public void setStatusReprocess(String dossierId, String fileId, boolean priority, boolean triggeredManually) {
|
||||
|
||||
setStatusReprocess(dossierId, fileId, priority, Sets.newHashSet(), triggeredManually);
|
||||
}
|
||||
@ -300,7 +301,7 @@ public class FileStatusService {
|
||||
public void createStatus(String dossierId, String fileId, String uploader, String filename) {
|
||||
|
||||
fileStatusPersistenceService.createStatus(dossierId, fileId, filename, uploader);
|
||||
addToAnalysisQueue(dossierId, fileId, 1, Set.of());
|
||||
addToAnalysisQueue(dossierId, fileId, false, Set.of());
|
||||
}
|
||||
|
||||
|
||||
@ -432,11 +433,11 @@ public class FileStatusService {
|
||||
|
||||
|
||||
@Transactional
|
||||
public void overwriteFile(String dossierId, String fileId, String uploader, String filename, int length) {
|
||||
public void overwriteFile(String dossierId, String fileId, String uploader, String filename) {
|
||||
|
||||
fileStatusPersistenceService.overwriteFile(fileId, uploader, filename);
|
||||
wipeFileData(dossierId, fileId);
|
||||
setStatusFullReprocess(dossierId, fileId, length);
|
||||
setStatusFullReprocess(dossierId, fileId, false);
|
||||
}
|
||||
|
||||
|
||||
@ -518,7 +519,7 @@ public class FileStatusService {
|
||||
|
||||
|
||||
@Transactional
|
||||
public void setStatusAnalyse(String dossierId, String fileId, int priority) {
|
||||
public void setStatusAnalyse(String dossierId, String fileId, boolean priority) {
|
||||
|
||||
FileEntity fileStatus = fileStatusPersistenceService.getStatus(fileId);
|
||||
|
||||
|
||||
@ -42,7 +42,7 @@ public class ImageMessageReceiver {
|
||||
if(settings.isStoreImageFile()) {
|
||||
fileManagementStorageService.storeObject(dossierId, fileId, FileType.IMAGE_INFO, in);
|
||||
}
|
||||
fileStatusService.setStatusAnalyse(dossierId, fileId, 1);
|
||||
fileStatusService.setStatusAnalyse(dossierId, fileId, false);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -649,7 +649,7 @@ public class ManualRedactionService {
|
||||
|
||||
private void reprocess(String dossierId, String fileId) {
|
||||
|
||||
fileStatusService.setStatusReprocess(dossierId, fileId, 2);
|
||||
fileStatusService.setStatusReprocess(dossierId, fileId, true);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -36,7 +36,7 @@ public class NerMessageReceiver {
|
||||
String fileId = (String) entityResponse.get("fileId");
|
||||
|
||||
log.info("Received NER Message from {} {}", dossierId, fileId);
|
||||
fileStatusService.setStatusAnalyse(dossierId, fileId, 1);
|
||||
fileStatusService.setStatusAnalyse(dossierId, fileId, false);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -63,10 +63,10 @@ public class AutomaticAnalysisScheduler {
|
||||
|
||||
if (next.isFullAnalysisRequired()) {
|
||||
log.info("Queued file: {} for automatic full analysis! ", next.getFilename());
|
||||
fileStatusService.setStatusFullReprocess(next.getDossierId(), next.getId(), 1);
|
||||
fileStatusService.setStatusFullReprocess(next.getDossierId(), next.getId(), false);
|
||||
} else if (next.isReanalysisRequired()) {
|
||||
log.info("Queued file: {} for automatic reanalysis! ", next.getFilename());
|
||||
fileStatusService.setStatusReprocess(next.getDossierId(), next.getId(), 1);
|
||||
fileStatusService.setStatusReprocess(next.getDossierId(), next.getId(), false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user