CLARI-30 - reworked OCR service integration to work via queue only, depercated old rest endpoints. Renamed queues to naming convetion.
This commit is contained in:
parent
2fa7b4337a
commit
d849859def
@ -57,6 +57,7 @@ public class FileStatusProcessingUpdateInternalController implements FileStatusP
|
||||
}
|
||||
|
||||
|
||||
@Deprecated
|
||||
public void ocrSuccessful(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId) {
|
||||
|
||||
fileStatusProcessingUpdateService.ocrSuccessful(dossierId, fileId);
|
||||
@ -81,12 +82,14 @@ public class FileStatusProcessingUpdateInternalController implements FileStatusP
|
||||
}
|
||||
|
||||
|
||||
@Deprecated
|
||||
public void ocrProcessing(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId) {
|
||||
|
||||
fileStatusProcessingUpdateService.ocrProcessing(dossierId, fileId);
|
||||
fileStatusProcessingUpdateService.ocrProcessing(fileId);
|
||||
}
|
||||
|
||||
|
||||
@Deprecated
|
||||
public void ocrFailed(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId) {
|
||||
|
||||
fileStatusProcessingUpdateService.ocrFailed(dossierId,
|
||||
@ -98,6 +101,7 @@ public class FileStatusProcessingUpdateInternalController implements FileStatusP
|
||||
}
|
||||
|
||||
|
||||
@Deprecated
|
||||
public void ocrFailed(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId, @RequestBody FileErrorInfo fileErrorInfo) {
|
||||
|
||||
fileStatusProcessingUpdateService.ocrFailed(dossierId, fileId, fileErrorInfo);
|
||||
|
||||
@ -35,8 +35,11 @@ public class MessagingConfiguration {
|
||||
public static final String REPORT_RESULT_QUEUE = "reportResultQueue";
|
||||
public static final String REPORT_RESULT_DLQ = "reportResultDLQ";
|
||||
|
||||
public static final String OCR_QUEUE = "ocrQueue";
|
||||
public static final String OCR_DLQ = "ocrDLQ";
|
||||
public static final String OCR_REQUEST_QUEUE = "ocr_request_queue";
|
||||
public static final String OCR_RESPONSE_QUEUE = "ocr_response_queue";
|
||||
public static final String OCR_DLQ = "ocr_dead_letter_queue";
|
||||
public static final String OCR_STATUS_UPDATE_RESPONSE_QUEUE = "ocr_status_update_response_queue";
|
||||
public static final String OCR_STATUS_UPDATE_RESPONSE_DQL = "ocr_status_update_dead_letter_queue";
|
||||
|
||||
public static final String INDEXING_QUEUE = "indexingQueue";
|
||||
public static final String INDEXING_DQL = "indexingDQL";
|
||||
@ -68,9 +71,6 @@ public class MessagingConfiguration {
|
||||
public static final String VISUAL_LAYOUT_DLQ = "visual_layout_parsing_service_dead_letter_queue";
|
||||
public static final String ANALYSIS_FLAG_CALCULATION_QUEUE = "analysis_flag_calculation_queue";
|
||||
|
||||
public static final String OCR_STATUS_UPDATE_RESPONSE_QUEUE = "ocr_status_update_response_queue";
|
||||
public static final String OCR_STATUS_UPDATE_RESPONSE_DQL = "ocr_status_update_response_dql";
|
||||
|
||||
public static final String X_ERROR_INFO_HEADER = "x-error-message";
|
||||
public static final String X_ERROR_INFO_TIMESTAMP_HEADER = "x-error-message-timestamp";
|
||||
|
||||
@ -241,7 +241,7 @@ public class MessagingConfiguration {
|
||||
@Bean
|
||||
public Queue ocrQueue() {
|
||||
|
||||
return QueueBuilder.durable(OCR_QUEUE)
|
||||
return QueueBuilder.durable(OCR_REQUEST_QUEUE)
|
||||
.withArgument("x-dead-letter-exchange", "")
|
||||
.withArgument("x-dead-letter-routing-key", OCR_DLQ)
|
||||
.withArgument("x-max-priority", 2) // Higher value is higher priority.
|
||||
|
||||
@ -15,5 +15,6 @@ public class OCRStatusUpdateResponse {
|
||||
private int numberOfPagesToOCR;
|
||||
private int numberOfOCRedPages;
|
||||
private boolean ocrFinished;
|
||||
private boolean ocrStarted;
|
||||
|
||||
}
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.service;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
@ -78,16 +79,16 @@ public class FileStatusProcessingUpdateService {
|
||||
retryTemplate.execute(retryContext -> {
|
||||
log.info("Preprocessing dossier {} and file {}, Attempt to update status: {}", dossierId, fileId, retryContext.getRetryCount());
|
||||
fileStatusService.setStatusPreProcessing(fileId,
|
||||
fileEntity.getProcessingStatus().equals(ProcessingStatus.PRE_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0);
|
||||
fileEntity.getProcessingStatus().equals(ProcessingStatus.PRE_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0);
|
||||
return null;
|
||||
});
|
||||
|
||||
var updatedFileEntity = fileStatusPersistenceService.getStatus(fileId);
|
||||
if (updatedFileEntity.getProcessingErrorCounter() > settings.getMaxErrorRetries()) {
|
||||
throw new ConflictException(String.format("Max Processing Retries exhausted for dossier %s and file %s with errorCount: %s",
|
||||
dossierId,
|
||||
fileId,
|
||||
updatedFileEntity.getProcessingErrorCounter()));
|
||||
dossierId,
|
||||
fileId,
|
||||
updatedFileEntity.getProcessingErrorCounter()));
|
||||
}
|
||||
}
|
||||
|
||||
@ -105,33 +106,37 @@ public class FileStatusProcessingUpdateService {
|
||||
|
||||
retryTemplate.execute(retryContext -> {
|
||||
log.warn("Retrying {} time to set ERROR status for file {} in dossier {} with reason {} ",
|
||||
retryContext.getRetryCount(),
|
||||
fileId,
|
||||
dossierId,
|
||||
fileErrorInfo != null ? fileErrorInfo.getCause() : null);
|
||||
retryContext.getRetryCount(),
|
||||
fileId,
|
||||
dossierId,
|
||||
fileErrorInfo != null ? fileErrorInfo.getCause() : null);
|
||||
fileStatusService.setStatusError(fileId, fileErrorInfo);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
public void ocrProcessing(String dossierId, String fileId) {
|
||||
|
||||
public void ocrProcessingUpdateOnly(String fileId) {
|
||||
var fileEntity = fileStatusPersistenceService.getStatus(fileId);
|
||||
|
||||
retryTemplate.execute(retryContext -> {
|
||||
log.info("Ocr processing dossier {} and file {}, Attempt to update status: {}", dossierId, fileId, retryContext.getRetryCount());
|
||||
log.info("Ocr processing dossier {} and file {}, Attempt to update status: {}", fileEntity.getDossierId(), fileId, retryContext.getRetryCount());
|
||||
fileStatusService.setStatusOcrProcessing(fileId,
|
||||
fileEntity.getProcessingStatus().equals(ProcessingStatus.OCR_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0);
|
||||
fileEntity.getProcessingStatus().equals(ProcessingStatus.OCR_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
public void ocrProcessing(String fileId) {
|
||||
|
||||
ocrProcessingUpdateOnly(fileId);
|
||||
|
||||
var updatedFileEntity = fileStatusPersistenceService.getStatus(fileId);
|
||||
if (updatedFileEntity.getProcessingErrorCounter() > settings.getMaxErrorRetries()) {
|
||||
throw new ConflictException(String.format("Max Ocr Retries exhausted for dossier %s and file %s with errorCount: %s",
|
||||
dossierId,
|
||||
fileId,
|
||||
updatedFileEntity.getProcessingErrorCounter()));
|
||||
updatedFileEntity.getDossierId(),
|
||||
fileId,
|
||||
updatedFileEntity.getProcessingErrorCounter()));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -458,7 +458,7 @@ public class FileStatusService {
|
||||
|
||||
private void addToOcrQueue(String dossierId, String fileId, int priority) {
|
||||
|
||||
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_QUEUE, new DocumentRequest(dossierId, fileId), message -> {
|
||||
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_REQUEST_QUEUE, new DocumentRequest(dossierId, fileId), message -> {
|
||||
message.getMessageProperties().setPriority(priority);
|
||||
return message;
|
||||
});
|
||||
|
||||
@ -1,14 +1,22 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.service.queue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.model.OCRStatusUpdateResponse;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusProcessingUpdateService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusService;
|
||||
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileErrorInfo;
|
||||
import com.knecon.fforesight.service.ocr.v1.api.model.DocumentRequest;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -20,13 +28,18 @@ public class OCRProcessingMessageReceiver {
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
private final FileStatusService fileStatusService;
|
||||
private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService;
|
||||
|
||||
|
||||
@SneakyThrows
|
||||
@RabbitListener(queues = MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE)
|
||||
public void receive(OCRStatusUpdateResponse response) {
|
||||
public void handleOCRStatusUpdateMessage(OCRStatusUpdateResponse response) {
|
||||
|
||||
fileStatusService.updateOCRStatus(response);
|
||||
if (response.isOcrStarted()) {
|
||||
fileStatusProcessingUpdateService.ocrProcessingUpdateOnly(response.getFileId());
|
||||
} else {
|
||||
fileStatusService.updateOCRStatus(response);
|
||||
}
|
||||
|
||||
log.info("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE);
|
||||
}
|
||||
@ -34,11 +47,38 @@ public class OCRProcessingMessageReceiver {
|
||||
|
||||
@SneakyThrows
|
||||
@RabbitListener(queues = MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_DQL)
|
||||
public void handleDLQMessage(Message failedMessage) {
|
||||
public void handleOCRStatusUpdateDLQMessage(Message failedMessage) {
|
||||
|
||||
var response = objectMapper.readValue(failedMessage.getBody(), OCRStatusUpdateResponse.class);
|
||||
|
||||
log.info("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_DQL);
|
||||
}
|
||||
|
||||
|
||||
@RabbitHandler
|
||||
@RabbitListener(queues = MessagingConfiguration.OCR_RESPONSE_QUEUE)
|
||||
public void handleOCRResponseMessage(Message successMessage) throws IOException {
|
||||
|
||||
DocumentRequest ocrResponseMessage = objectMapper.readValue(successMessage.getBody(), DocumentRequest.class);
|
||||
|
||||
log.info("OCR Response received: {}", ocrResponseMessage);
|
||||
fileStatusProcessingUpdateService.ocrSuccessful(ocrResponseMessage.getDossierId(), ocrResponseMessage.getFileId());
|
||||
}
|
||||
|
||||
|
||||
@RabbitHandler
|
||||
@RabbitListener(queues = MessagingConfiguration.OCR_DLQ)
|
||||
public void handleOCRDQLMessage(Message failedMessage) throws IOException {
|
||||
|
||||
DocumentRequest ocrRequestMessage = objectMapper.readValue(failedMessage.getBody(), DocumentRequest.class);
|
||||
|
||||
log.info("OCR DQL received: {}", ocrRequestMessage);
|
||||
String errorMessage = failedMessage.getMessageProperties().getHeader(MessagingConfiguration.X_ERROR_INFO_HEADER);
|
||||
OffsetDateTime timestamp = failedMessage.getMessageProperties().getHeader(MessagingConfiguration.X_ERROR_INFO_TIMESTAMP_HEADER);
|
||||
timestamp = timestamp != null ? timestamp : OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS);
|
||||
fileStatusProcessingUpdateService.ocrFailed(ocrRequestMessage.getDossierId(),
|
||||
ocrRequestMessage.getFileId(),
|
||||
new FileErrorInfo(errorMessage, MessagingConfiguration.OCR_DLQ, "ocr-service", timestamp));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user