RED-4289: Add AMQP queues for cv-analysis (table-parsing)
This commit is contained in:
parent
126beb1815
commit
428bba22c5
@ -52,6 +52,10 @@ public class MessagingConfiguration {
|
||||
public static final String PDFTRON_DLQ = "pdftron_dlq";
|
||||
public static final String PDFTRON_RESULT_QUEUE = "pdftron_result_queue";
|
||||
|
||||
public static final String CV_ANALYSIS_QUEUE = "cv_analysis_request_queue";
|
||||
public static final String CV_ANALYSIS_RESPONSE_QUEUE = "cv_analysis_response_queue";
|
||||
public static final String CV_ANALYSIS_DLQ = "cv_analysis_dead_letter_queue";
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue nerRequestQueue() {
|
||||
@ -116,6 +120,27 @@ public class MessagingConfiguration {
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue cvAnalysisRequestQueue() {
|
||||
|
||||
return QueueBuilder.durable(CV_ANALYSIS_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", CV_ANALYSIS_DLQ).build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue cvAnalysisResponseQueue() {
|
||||
|
||||
return QueueBuilder.durable(CV_ANALYSIS_RESPONSE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", CV_ANALYSIS_DLQ).build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue cvAnalysisResponseDLQ() {
|
||||
|
||||
return QueueBuilder.durable(CV_ANALYSIS_DLQ).build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue redactionQueue() {
|
||||
|
||||
|
||||
@ -0,0 +1,25 @@
|
||||
package com.iqser.red.service.peristence.v1.server.model;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class CvAnalysisServiceRequest {
|
||||
|
||||
public static final String OPERATION_TABLE_PARSING = "table_parsing";
|
||||
|
||||
private String dossierId;
|
||||
private String fileId;
|
||||
@Builder.Default
|
||||
private Set<Integer> pages = new HashSet<>();
|
||||
private String operation;
|
||||
|
||||
}
|
||||
@ -0,0 +1,27 @@
|
||||
package com.iqser.red.service.peristence.v1.server.model;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class CvAnalysisServiceResponse {
|
||||
|
||||
|
||||
public static final String OPERATION_TABLE_PARSING = "table_parsing";
|
||||
|
||||
private String dossierId;
|
||||
private String fileId;
|
||||
@Builder.Default
|
||||
private Set<Integer> pages = new HashSet<>();
|
||||
private String operation;
|
||||
private String responseFile;
|
||||
|
||||
}
|
||||
@ -0,0 +1,27 @@
|
||||
package com.iqser.red.service.peristence.v1.server.model;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class Pdf2ImageServiceResponse {
|
||||
|
||||
public static final String TARGET_FILE_EXTENSION = "ORIGIN.pdf.gz";
|
||||
public static final String OPERATION_CONVERSION = "conversion";
|
||||
|
||||
private String dossierId;
|
||||
private String fileId;
|
||||
private String targetFileExtension;
|
||||
@Builder.Default
|
||||
private Set<Integer> pages = new HashSet<>();
|
||||
private String operation;
|
||||
|
||||
}
|
||||
@ -0,0 +1,50 @@
|
||||
package com.iqser.red.service.peristence.v1.server.service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.iqser.red.service.peristence.v1.server.configuration.MessagingConfiguration;
|
||||
import com.iqser.red.service.peristence.v1.server.model.CvAnalysisServiceResponse;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class CvAnalysisMessageReceiver {
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
|
||||
@SneakyThrows
|
||||
@RabbitListener(queues = MessagingConfiguration.CV_ANALYSIS_RESPONSE_QUEUE)
|
||||
public void receive(String in) {
|
||||
|
||||
CvAnalysisServiceResponse response = objectMapper.readValue(in, CvAnalysisServiceResponse.class);
|
||||
|
||||
log.debug("{}", response);
|
||||
|
||||
log.info("Received message {} for dossierId {} and fileId {} and pages {}", MessagingConfiguration.CV_ANALYSIS_RESPONSE_QUEUE, response.getDossierId(), response.getFileId(), response.getPages());
|
||||
}
|
||||
|
||||
|
||||
@RabbitListener(queues = MessagingConfiguration.CV_ANALYSIS_DLQ)
|
||||
public void handleDLQMessage(Message failedMessage) throws IOException {
|
||||
|
||||
HashMap<String, Object> cvAnalysisResponse = objectMapper.readValue(failedMessage.getBody(), new TypeReference<>() {
|
||||
});
|
||||
String dossierId = (String) cvAnalysisResponse.get("dossierId");
|
||||
String fileId = (String) cvAnalysisResponse.get("fileId");
|
||||
|
||||
log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.CV_ANALYSIS_DLQ, dossierId, fileId);
|
||||
}
|
||||
|
||||
}
|
||||
@ -20,6 +20,7 @@ import com.google.common.collect.Sets;
|
||||
import com.iqser.red.service.pdftron.redaction.v1.api.model.DocumentRequest;
|
||||
import com.iqser.red.service.pdftron.redaction.v1.api.model.ProcessUntouchedDocumentRequest;
|
||||
import com.iqser.red.service.peristence.v1.server.configuration.MessagingConfiguration;
|
||||
import com.iqser.red.service.peristence.v1.server.model.CvAnalysisServiceRequest;
|
||||
import com.iqser.red.service.peristence.v1.server.model.NerServiceRequest;
|
||||
import com.iqser.red.service.peristence.v1.server.model.Pdf2ImageServiceRequest;
|
||||
import com.iqser.red.service.peristence.v1.server.model.image.ImageServiceRequest;
|
||||
@ -28,7 +29,6 @@ import com.iqser.red.service.peristence.v1.server.utils.FileModelMapper;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.FileAttributeEntity;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.entity.dossier.FileEntity;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.exception.InternalServerErrorException;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.ApplicationConfigService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.DossierPersistenceService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.FileAttributeConfigPersistenceService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.FileStatusPersistenceService;
|
||||
@ -127,7 +127,7 @@ public class FileStatusService {
|
||||
addToAnalysisQueue(dossierId, fileId, false, Set.of());
|
||||
|
||||
if (fileManagementServiceSettings.isPdf2ImageServiceEnabled()) {
|
||||
addToPdf2ImageQueue(dossierId, fileId);
|
||||
addToPdf2ImageQueue(dossierId, fileId, Set.of());
|
||||
}
|
||||
|
||||
}
|
||||
@ -258,7 +258,7 @@ public class FileStatusService {
|
||||
public void createStatus(String dossierId, String fileId, String uploader, String filename) {
|
||||
|
||||
fileStatusPersistenceService.createStatus(dossierId, fileId, filename, uploader);
|
||||
addToAnalysisQueue(dossierId, fileId,false, Set.of());
|
||||
addToAnalysisQueue(dossierId, fileId, false, Set.of());
|
||||
}
|
||||
|
||||
|
||||
@ -377,13 +377,14 @@ public class FileStatusService {
|
||||
}
|
||||
|
||||
|
||||
public void addToPdf2ImageQueue(String dossierId, String fileId) {
|
||||
public void addToPdf2ImageQueue(String dossierId, String fileId, Set<Integer> pages) {
|
||||
|
||||
try {
|
||||
rabbitTemplate.convertAndSend(MessagingConfiguration.PDF_2_IMAGE_QUEUE, objectMapper.writeValueAsString(Pdf2ImageServiceRequest.builder()
|
||||
.dossierId(dossierId)
|
||||
.fileId(fileId)
|
||||
.targetFileExtension(Pdf2ImageServiceRequest.TARGET_FILE_EXTENSION)
|
||||
.pages(pages)
|
||||
.operation(Pdf2ImageServiceRequest.OPERATION_CONVERSION)
|
||||
.build()), message -> {
|
||||
message.getMessageProperties().setPriority(1);
|
||||
@ -395,6 +396,24 @@ public class FileStatusService {
|
||||
}
|
||||
|
||||
|
||||
public void addToCvAnalysisRequestQueue(String dossierId, String fileId, Set<Integer> pages) {
|
||||
|
||||
try {
|
||||
rabbitTemplate.convertAndSend(MessagingConfiguration.CV_ANALYSIS_QUEUE, objectMapper.writeValueAsString(CvAnalysisServiceRequest.builder()
|
||||
.dossierId(dossierId)
|
||||
.fileId(fileId)
|
||||
.pages(pages)
|
||||
.operation(CvAnalysisServiceRequest.OPERATION_TABLE_PARSING)
|
||||
.build()), message -> {
|
||||
message.getMessageProperties().setPriority(1);
|
||||
return message;
|
||||
});
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void updateLastOCRTime(String fileId) {
|
||||
|
||||
fileStatusPersistenceService.updateLastOCRTime(fileId, OffsetDateTime.now());
|
||||
@ -423,6 +442,7 @@ public class FileStatusService {
|
||||
}
|
||||
|
||||
|
||||
@Transactional
|
||||
public void deleteManualRedactions(String dossierId, String fileId) {
|
||||
|
||||
OffsetDateTime now = OffsetDateTime.now();
|
||||
@ -496,7 +516,7 @@ public class FileStatusService {
|
||||
var dossier = dossierPersistenceService.getAndValidateDossier(dossierId);
|
||||
var fileEntity = fileStatusPersistenceService.getStatus(fileId);
|
||||
|
||||
if(!fileManagementStorageService.objectExists(dossierId, fileId, FileType.ORIGIN)){
|
||||
if (!fileManagementStorageService.objectExists(dossierId, fileId, FileType.ORIGIN)) {
|
||||
addToPreprocessingQueue(dossierId, fileId, fileEntity.getFilename());
|
||||
return;
|
||||
}
|
||||
|
||||
@ -43,6 +43,8 @@ public class ImageMessageReceiver {
|
||||
fileManagementStorageService.storeObject(dossierId, fileId, FileType.IMAGE_INFO, in);
|
||||
}
|
||||
fileStatusService.setStatusAnalyse(dossierId, fileId, false);
|
||||
|
||||
log.info("Received message from {} for dossierId {} and fileId {}", MessagingConfiguration.IMAGE_SERVICE_RESPONSE_QUEUE, dossierId, fileId);
|
||||
}
|
||||
|
||||
|
||||
@ -55,6 +57,8 @@ public class ImageMessageReceiver {
|
||||
String fileId = (String) imageResponse.get("fileId");
|
||||
|
||||
fileStatusProcessingUpdateService.analysisFailed(dossierId, fileId);
|
||||
|
||||
log.warn("Received message from {} for dossierId {} and fileId {}", MessagingConfiguration.IMAGE_SERVICE_DLQ, dossierId, fileId);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -35,7 +35,7 @@ public class NerMessageReceiver {
|
||||
String dossierId = (String) entityResponse.get("dossierId");
|
||||
String fileId = (String) entityResponse.get("fileId");
|
||||
|
||||
log.info("Received NER Message from {} {}", dossierId, fileId);
|
||||
log.info("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.NER_SERVICE_RESPONSE_QUEUE, dossierId, fileId);
|
||||
fileStatusService.setStatusAnalyse(dossierId, fileId, false);
|
||||
}
|
||||
|
||||
@ -48,7 +48,7 @@ public class NerMessageReceiver {
|
||||
String dossierId = (String) entityResponse.get("dossierId");
|
||||
String fileId = (String) entityResponse.get("fileId");
|
||||
|
||||
log.info("Received NER DLQ Message from {} {}", dossierId, fileId);
|
||||
log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.NER_SERVICE_DLQ, dossierId, fileId);
|
||||
fileStatusProcessingUpdateService.analysisFailed(dossierId, fileId);
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,57 @@
|
||||
package com.iqser.red.service.peristence.v1.server.service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.iqser.red.service.peristence.v1.server.configuration.MessagingConfiguration;
|
||||
import com.iqser.red.service.peristence.v1.server.model.Pdf2ImageServiceResponse;
|
||||
import com.iqser.red.service.peristence.v1.server.settings.FileManagementServiceSettings;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class Pdf2ImageMessageReceiver {
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
private final FileStatusService fileStatusService;
|
||||
private final FileManagementServiceSettings settings;
|
||||
|
||||
|
||||
@SneakyThrows
|
||||
@RabbitListener(queues = MessagingConfiguration.PDF_2_IMAGE_RESPONSE_QUEUE)
|
||||
public void receive(byte[] in) {
|
||||
|
||||
Pdf2ImageServiceResponse response = objectMapper.readValue(in, Pdf2ImageServiceResponse.class);
|
||||
|
||||
log.debug("{}", response);
|
||||
|
||||
log.info("Received message {} for dossierId {} and fileId {} and pages {}", MessagingConfiguration.PDF_2_IMAGE_RESPONSE_QUEUE, response.getDossierId(), response.getFileId(), response.getPages());
|
||||
|
||||
if (settings.isCvServiceEnabled()) {
|
||||
fileStatusService.addToCvAnalysisRequestQueue(response.getDossierId(), response.getFileId(), response.getPages());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@RabbitListener(queues = MessagingConfiguration.PDF_2_IMAGE_DLQ)
|
||||
public void handleDLQMessage(Message failedMessage) throws IOException {
|
||||
|
||||
HashMap<String, Object> pdf2ImageResponse = objectMapper.readValue(failedMessage.getBody(), new TypeReference<>() {
|
||||
});
|
||||
String dossierId = (String) pdf2ImageResponse.get("dossierId");
|
||||
String fileId = (String) pdf2ImageResponse.get("fileId");
|
||||
|
||||
log.warn("Received message {} for dossierId {} and fileId {}", MessagingConfiguration.PDF_2_IMAGE_DLQ, dossierId, fileId);
|
||||
}
|
||||
|
||||
}
|
||||
@ -32,4 +32,6 @@ public class FileManagementServiceSettings {
|
||||
|
||||
private int maxErrorRetries = 1;
|
||||
|
||||
private boolean cvServiceEnabled;
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user