RED-2440: Add to image-service queue after upload and analyze after image-service response
This commit is contained in:
parent
aaa9dadd2f
commit
ab1f691df0
@ -3,7 +3,7 @@ package com.iqser.red.service.persistence.service.v1.api.model.dossiertemplate.d
|
||||
import lombok.Getter;
|
||||
|
||||
public enum FileType {
|
||||
ORIGIN(".pdf"), REDACTION_LOG(".json"), SECTION_GRID(".json"), TEXT(".json"), NER_ENTITIES(".json");
|
||||
ORIGIN(".pdf"), REDACTION_LOG(".json"), SECTION_GRID(".json"), TEXT(".json"), NER_ENTITIES(".json"), IMAGE_INFO(".json");
|
||||
|
||||
@Getter
|
||||
private final String extension;
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
package com.iqser.red.service.persistence.service.v1.api.model.dossiertemplate.dossier.file;
|
||||
|
||||
public enum ProcessingStatus {
|
||||
UNPROCESSED, REPROCESS, PROCESSING, PROCESSED, ERROR, DELETED, FULLREPROCESS, OCR_PROCESSING, INDEXING
|
||||
UNPROCESSED, REPROCESS, PROCESSING, PROCESSED, ERROR, DELETED, FULLREPROCESS, OCR_PROCESSING, INDEXING, IMAGE_ANALYZING
|
||||
}
|
||||
|
||||
@ -31,6 +31,37 @@ public class MessagingConfiguration {
|
||||
public static final String DELETE_FROM_INDEX_QUEUE = "deleteFromIndexQueue";
|
||||
public static final String DELETE_FROM_INDEX_DLQ = "deleteFromIndexDLQ";
|
||||
|
||||
public static final String IMAGE_SERVICE_QUEUE = "image_request_queue";
|
||||
public static final String IMAGE_SERVICE_RESPONSE_QUEUE = "image_response_queue";
|
||||
public static final String IMAGE_SERVICE_DLQ = "image_dead_letter_queue";
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue imageRequestQueue() {
|
||||
|
||||
return QueueBuilder.durable(IMAGE_SERVICE_QUEUE)
|
||||
.withArgument("x-dead-letter-exchange", "")
|
||||
.withArgument("x-dead-letter-routing-key", IMAGE_SERVICE_DLQ)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue imageResponseQueue() {
|
||||
|
||||
return QueueBuilder.durable(IMAGE_SERVICE_RESPONSE_QUEUE)
|
||||
.withArgument("x-dead-letter-exchange", "")
|
||||
.withArgument("x-dead-letter-routing-key", IMAGE_SERVICE_DLQ)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue imageResponseDLQ() {
|
||||
|
||||
return QueueBuilder.durable(IMAGE_SERVICE_DLQ).build();
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue redactionQueue() {
|
||||
|
||||
@ -0,0 +1,17 @@
|
||||
package com.iqser.red.service.peristence.v1.server.model.image;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class ImageServiceRequest {
|
||||
|
||||
private String dossierId;
|
||||
private String fileId;
|
||||
|
||||
}
|
||||
@ -88,7 +88,7 @@ public class FileService {
|
||||
} else {
|
||||
// the file is new, should create a new status for it.
|
||||
log.info("File {} has no status yet, creating one and setting to unprocessed.", request.getFilename());
|
||||
fileStatusService.createStatus(request.getDossierId(), fileId, request.getUploader(), request.getFilename(), 1);
|
||||
fileStatusService.createStatus(request.getDossierId(), fileId, request.getUploader(), request.getFilename());
|
||||
}
|
||||
return new JSONPrimitive<>(fileId);
|
||||
}
|
||||
|
||||
@ -6,6 +6,8 @@ import com.google.common.collect.Sets;
|
||||
import com.iqser.red.service.pdftron.redaction.v1.api.model.OcrRequestMessage;
|
||||
import com.iqser.red.service.peristence.v1.server.configuration.MessagingConfiguration;
|
||||
import com.iqser.red.service.peristence.v1.server.controller.RulesController;
|
||||
import com.iqser.red.service.peristence.v1.server.model.image.ImageServiceRequest;
|
||||
import com.iqser.red.service.peristence.v1.server.settings.FileManagementServiceSettings;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.FileAttributeEntity;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.FileEntity;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.exception.UserNotFoundException;
|
||||
@ -19,11 +21,13 @@ import com.iqser.red.service.redaction.v1.model.MessageType;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.transaction.Transactional;
|
||||
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@ -50,6 +54,8 @@ public class FileStatusService {
|
||||
private final RemoveRedactionPersistenceService removeRedactionPersistenceService;
|
||||
private final AddRedactionPersistenceService addRedactionPersistenceService;
|
||||
private final ResizeRedactionPersistenceService resizeRedactionPersistenceService;
|
||||
private final FileManagementServiceSettings settings;
|
||||
|
||||
|
||||
public List<FileEntity> getActiveFiles(String dossierId) {
|
||||
|
||||
@ -86,6 +92,11 @@ public class FileStatusService {
|
||||
fileStatusPersistenceService.updateProcessingStatus(fileId, ProcessingStatus.PROCESSING);
|
||||
}
|
||||
|
||||
public void setStatusImageAnalyzing(String fileId) {
|
||||
|
||||
fileStatusPersistenceService.updateProcessingStatus(fileId, ProcessingStatus.IMAGE_ANALYZING);
|
||||
}
|
||||
|
||||
|
||||
public void setStatusSuccessful(String dossierId, String fileId, AnalyzeResult analyzeResult) {
|
||||
|
||||
@ -106,10 +117,15 @@ public class FileStatusService {
|
||||
|
||||
|
||||
@Transactional
|
||||
public void createStatus(String dossierId, String fileId, String uploader, String filename, int priority) {
|
||||
public void createStatus(String dossierId, String fileId, String uploader, String filename) {
|
||||
|
||||
fileStatusPersistenceService.createStatus(dossierId, fileId, filename, uploader);
|
||||
addToAnalysisQueue(dossierId, fileId, priority, Sets.newHashSet());
|
||||
|
||||
if(settings.isImageServiceEnabled()) {
|
||||
addToImageQueue(dossierId, fileId);
|
||||
} else {
|
||||
addToAnalysisQueue(dossierId, fileId, 1, Set.of());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -212,11 +228,11 @@ public class FileStatusService {
|
||||
FileEntity fileStatus = fileStatusPersistenceService.getStatus(fileId);
|
||||
String lastReviewer = fileStatus.getLastReviewer();
|
||||
String lastApprover = fileStatus.getLastApprover();
|
||||
if(WorkflowStatus.UNDER_REVIEW.equals(fileStatus.getWorkflowStatus())) {
|
||||
lastReviewer = StringUtils.isNotEmpty(assignee)? assignee: fileStatus.getAssignee();
|
||||
if (WorkflowStatus.UNDER_REVIEW.equals(fileStatus.getWorkflowStatus())) {
|
||||
lastReviewer = StringUtils.isNotEmpty(assignee) ? assignee : fileStatus.getAssignee();
|
||||
}
|
||||
if(WorkflowStatus.UNDER_APPROVAL.equals(fileStatus.getWorkflowStatus())) {
|
||||
lastApprover = StringUtils.isNotEmpty(assignee)? assignee: fileStatus.getAssignee();
|
||||
if (WorkflowStatus.UNDER_APPROVAL.equals(fileStatus.getWorkflowStatus())) {
|
||||
lastApprover = StringUtils.isNotEmpty(assignee) ? assignee : fileStatus.getAssignee();
|
||||
}
|
||||
|
||||
fileStatusPersistenceService.setAssignee(fileId, assignee, lastReviewer, lastApprover);
|
||||
@ -242,7 +258,8 @@ public class FileStatusService {
|
||||
|
||||
var fileAttributes = fileStatus.getFileAttributes();
|
||||
|
||||
boolean reanalyse = !fileStatus.getProcessingStatus().equals(ProcessingStatus.UNPROCESSED) && !fileStatus.getProcessingStatus()
|
||||
boolean reanalyse = !fileStatus.getProcessingStatus()
|
||||
.equals(ProcessingStatus.UNPROCESSED) && !fileStatus.getProcessingStatus()
|
||||
.equals(ProcessingStatus.FULLREPROCESS) && fileStatus.getRulesVersion() == rulesController.getVersion(dossier.getDossierTemplateId()) && (fileStatus.getLastFileAttributeChange() == null || fileStatus.getLastProcessed()
|
||||
.isAfter(fileStatus.getLastFileAttributeChange()));
|
||||
|
||||
@ -258,7 +275,6 @@ public class FileStatusService {
|
||||
.excludedPages(fileStatus.getExcludedPages())
|
||||
.build();
|
||||
|
||||
|
||||
setStatusProcessing(fileId);
|
||||
|
||||
try {
|
||||
@ -272,6 +288,20 @@ public class FileStatusService {
|
||||
}
|
||||
|
||||
|
||||
private void addToImageQueue(String dossierId, String fileId) {
|
||||
|
||||
setStatusImageAnalyzing(fileId);
|
||||
try {
|
||||
rabbitTemplate.convertAndSend(MessagingConfiguration.IMAGE_SERVICE_QUEUE, objectMapper.writeValueAsString(new ImageServiceRequest(dossierId, fileId)), message -> {
|
||||
message.getMessageProperties().setPriority(1);
|
||||
return message;
|
||||
});
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void addToOcrQueue(String dossierId, String fileId, int priority) {
|
||||
|
||||
try {
|
||||
@ -307,6 +337,7 @@ public class FileStatusService {
|
||||
|
||||
|
||||
public void wipeFileData(String dossierId, String fileId) {
|
||||
|
||||
OffsetDateTime now = OffsetDateTime.now();
|
||||
// remove everything related to redaction
|
||||
fileManagementStorageService.deleteObject(dossierId, fileId, FileType.REDACTION_LOG);
|
||||
@ -314,12 +345,12 @@ public class FileStatusService {
|
||||
|
||||
// wipe comments
|
||||
var comments = commentPersistenceService.findCommentsByFileID(fileId, false);
|
||||
comments.forEach((key, value) -> value.forEach(comment ->
|
||||
commentPersistenceService.softDelete(comment.getId(), now)));
|
||||
comments.forEach((key, value) -> value.forEach(comment -> commentPersistenceService.softDelete(comment.getId(), now)));
|
||||
|
||||
// wipe force redactions
|
||||
var forceRedactions = forceRedactionPersistenceService.findForceRedactions(fileId, false);
|
||||
forceRedactions.forEach(f -> forceRedactionPersistenceService.softDelete(fileId, f.getId().getAnnotationId(), now));
|
||||
forceRedactions.forEach(f -> forceRedactionPersistenceService.softDelete(fileId, f.getId()
|
||||
.getAnnotationId(), now));
|
||||
|
||||
// wipe add manual redactions
|
||||
var addRedactions = addRedactionPersistenceService.findAddRedactions(fileId, false);
|
||||
@ -327,19 +358,23 @@ public class FileStatusService {
|
||||
|
||||
// wipe removeRedactions
|
||||
var removeRedactions = removeRedactionPersistenceService.findRemoveRedactions(fileId, false);
|
||||
removeRedactions.forEach(f -> removeRedactionPersistenceService.softDelete(fileId, f.getId().getAnnotationId(), now));
|
||||
removeRedactions.forEach(f -> removeRedactionPersistenceService.softDelete(fileId, f.getId()
|
||||
.getAnnotationId(), now));
|
||||
|
||||
// wipe image recat
|
||||
var imageRecategorizations = imageRecategorizationPersistenceService.findRecategorizations(fileId, false);
|
||||
imageRecategorizations.forEach(f -> imageRecategorizationPersistenceService.softDelete(fileId, f.getId().getAnnotationId(), now));// wipe image recat
|
||||
imageRecategorizations.forEach(f -> imageRecategorizationPersistenceService.softDelete(fileId, f.getId()
|
||||
.getAnnotationId(), now));// wipe image recat
|
||||
|
||||
// wipe resize redactions
|
||||
var resizeRedactions = resizeRedactionPersistenceService.findResizeRedactions(fileId, false);
|
||||
resizeRedactions.forEach(f -> resizeRedactionPersistenceService.softDelete(fileId, f.getId().getAnnotationId(), now));
|
||||
resizeRedactions.forEach(f -> resizeRedactionPersistenceService.softDelete(fileId, f.getId()
|
||||
.getAnnotationId(), now));
|
||||
|
||||
// wipe legal basis changes
|
||||
var legalBasisChanges = legalBasisChangePersistenceService.findLegalBasisChanges(fileId, false);
|
||||
legalBasisChanges.forEach(f -> legalBasisChangePersistenceService.softDelete(fileId, f.getId().getAnnotationId(), now));
|
||||
legalBasisChanges.forEach(f -> legalBasisChangePersistenceService.softDelete(fileId, f.getId()
|
||||
.getAnnotationId(), now));
|
||||
|
||||
fileStatusPersistenceService.updateHasComments(fileId, false);
|
||||
}
|
||||
@ -363,11 +398,16 @@ public class FileStatusService {
|
||||
return fileAttributeList;
|
||||
}
|
||||
|
||||
|
||||
public boolean hasChangesSince(String dossierId, OffsetDateTime since) {
|
||||
|
||||
return fileStatusPersistenceService.hasChangesSince(dossierId, since);
|
||||
}
|
||||
|
||||
|
||||
public int countSoftDeletedFiles(String dossierId) {
|
||||
|
||||
return fileStatusPersistenceService.countSoftDeletedFiles(dossierId);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,57 @@
|
||||
package com.iqser.red.service.peristence.v1.server.service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Set;
|
||||
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.iqser.red.service.peristence.v1.server.configuration.MessagingConfiguration;
|
||||
import com.iqser.red.service.persistence.service.v1.api.model.dossiertemplate.dossier.file.FileType;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class ImageMessageReceiver {
|
||||
|
||||
private final FileManagementStorageService fileManagementStorageService;
|
||||
private final FileStatusService fileStatusService;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService;
|
||||
|
||||
|
||||
@SneakyThrows
|
||||
@RabbitListener(queues = MessagingConfiguration.IMAGE_SERVICE_RESPONSE_QUEUE)
|
||||
public void receive(byte[] in) {
|
||||
|
||||
HashMap<String, Object> imageResponse = objectMapper.readValue(in, new TypeReference<>() {
|
||||
});
|
||||
|
||||
String dossierId = (String) imageResponse.get("dossierId");
|
||||
String fileId = (String) imageResponse.get("fileId");
|
||||
|
||||
fileManagementStorageService.storeObject(dossierId, fileId, FileType.IMAGE_INFO, in);
|
||||
fileStatusService.addToAnalysisQueue(dossierId, fileId, 1, Set.of());
|
||||
}
|
||||
|
||||
|
||||
@RabbitListener(queues = MessagingConfiguration.IMAGE_SERVICE_DLQ)
|
||||
public void handleDLQMessage(Message failedMessage) throws IOException {
|
||||
|
||||
HashMap<String, Object> imageResponse = objectMapper.readValue(failedMessage.getBody(), new TypeReference<>() {
|
||||
});
|
||||
String dossierId = (String) imageResponse.get("dossierId");
|
||||
String fileId = (String) imageResponse.get("fileId");
|
||||
|
||||
fileStatusProcessingUpdateService.analysisFailed(dossierId, fileId);
|
||||
}
|
||||
|
||||
}
|
||||
@ -20,4 +20,6 @@ public class FileManagementServiceSettings {
|
||||
private int softDeleteCleanupTime = 96;
|
||||
|
||||
private boolean migrateOnly;
|
||||
|
||||
private boolean imageServiceEnabled = true;
|
||||
}
|
||||
|
||||
@ -94,7 +94,7 @@ public class DossierStatsTest extends AbstractPersistenceServerServiceTest {
|
||||
assertThat(dossierStats.isHasSuggestionsFilePresent()).isTrue();
|
||||
assertThat(dossierStats.isHasUpdatesFilePresent()).isTrue();
|
||||
assertThat(dossierStats.isHasNoFlagsFilePresent()).isFalse();
|
||||
assertThat(dossierStats.getFileCountPerProcessingStatus().get(ProcessingStatus.PROCESSING)).isEqualTo(2);
|
||||
assertThat(dossierStats.getFileCountPerProcessingStatus().get(ProcessingStatus.IMAGE_ANALYZING)).isEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user