CLARI-30 - reworked OCR service integration to work via queue only, depercated... #372
@ -57,6 +57,7 @@ public class FileStatusProcessingUpdateInternalController implements FileStatusP
|
||||
}
|
||||
|
||||
|
||||
@Deprecated
|
||||
public void ocrSuccessful(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId) {
|
||||
|
||||
fileStatusProcessingUpdateService.ocrSuccessful(dossierId, fileId);
|
||||
@ -81,12 +82,14 @@ public class FileStatusProcessingUpdateInternalController implements FileStatusP
|
||||
}
|
||||
|
||||
|
||||
@Deprecated
|
||||
public void ocrProcessing(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId) {
|
||||
|
||||
fileStatusProcessingUpdateService.ocrProcessing(dossierId, fileId);
|
||||
fileStatusProcessingUpdateService.ocrProcessing(fileId);
|
||||
}
|
||||
|
||||
|
||||
@Deprecated
|
||||
public void ocrFailed(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId) {
|
||||
|
||||
fileStatusProcessingUpdateService.ocrFailed(dossierId,
|
||||
@ -98,6 +101,7 @@ public class FileStatusProcessingUpdateInternalController implements FileStatusP
|
||||
}
|
||||
|
||||
|
||||
@Deprecated
|
||||
public void ocrFailed(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId, @RequestBody FileErrorInfo fileErrorInfo) {
|
||||
|
||||
fileStatusProcessingUpdateService.ocrFailed(dossierId, fileId, fileErrorInfo);
|
||||
|
||||
@ -35,8 +35,11 @@ public class MessagingConfiguration {
|
||||
public static final String REPORT_RESULT_QUEUE = "reportResultQueue";
|
||||
public static final String REPORT_RESULT_DLQ = "reportResultDLQ";
|
||||
|
||||
public static final String OCR_QUEUE = "ocrQueue";
|
||||
public static final String OCR_DLQ = "ocrDLQ";
|
||||
public static final String OCR_REQUEST_QUEUE = "ocr_request_queue";
|
||||
public static final String OCR_RESPONSE_QUEUE = "ocr_response_queue";
|
||||
public static final String OCR_DLQ = "ocr_dead_letter_queue";
|
||||
public static final String OCR_STATUS_UPDATE_RESPONSE_QUEUE = "ocr_status_update_response_queue";
|
||||
public static final String OCR_STATUS_UPDATE_RESPONSE_DQL = "ocr_status_update_dead_letter_queue";
|
||||
|
||||
public static final String INDEXING_QUEUE = "indexingQueue";
|
||||
public static final String INDEXING_DQL = "indexingDQL";
|
||||
@ -68,9 +71,6 @@ public class MessagingConfiguration {
|
||||
public static final String VISUAL_LAYOUT_DLQ = "visual_layout_parsing_service_dead_letter_queue";
|
||||
public static final String ANALYSIS_FLAG_CALCULATION_QUEUE = "analysis_flag_calculation_queue";
|
||||
|
||||
public static final String OCR_STATUS_UPDATE_RESPONSE_QUEUE = "ocr_status_update_response_queue";
|
||||
public static final String OCR_STATUS_UPDATE_RESPONSE_DQL = "ocr_status_update_response_dql";
|
||||
|
||||
public static final String X_ERROR_INFO_HEADER = "x-error-message";
|
||||
public static final String X_ERROR_INFO_TIMESTAMP_HEADER = "x-error-message-timestamp";
|
||||
|
||||
@ -241,7 +241,7 @@ public class MessagingConfiguration {
|
||||
@Bean
|
||||
public Queue ocrQueue() {
|
||||
|
||||
return QueueBuilder.durable(OCR_QUEUE)
|
||||
return QueueBuilder.durable(OCR_REQUEST_QUEUE)
|
||||
.withArgument("x-dead-letter-exchange", "")
|
||||
.withArgument("x-dead-letter-routing-key", OCR_DLQ)
|
||||
.withArgument("x-max-priority", 2) // Higher value is higher priority.
|
||||
|
||||
@ -15,5 +15,6 @@ public class OCRStatusUpdateResponse {
|
||||
private int numberOfPagesToOCR;
|
||||
private int numberOfOCRedPages;
|
||||
private boolean ocrFinished;
|
||||
private boolean ocrStarted;
|
||||
|
||||
}
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.service;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
@ -78,16 +79,16 @@ public class FileStatusProcessingUpdateService {
|
||||
retryTemplate.execute(retryContext -> {
|
||||
log.info("Preprocessing dossier {} and file {}, Attempt to update status: {}", dossierId, fileId, retryContext.getRetryCount());
|
||||
fileStatusService.setStatusPreProcessing(fileId,
|
||||
fileEntity.getProcessingStatus().equals(ProcessingStatus.PRE_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0);
|
||||
fileEntity.getProcessingStatus().equals(ProcessingStatus.PRE_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0);
|
||||
return null;
|
||||
});
|
||||
|
||||
var updatedFileEntity = fileStatusPersistenceService.getStatus(fileId);
|
||||
if (updatedFileEntity.getProcessingErrorCounter() > settings.getMaxErrorRetries()) {
|
||||
throw new ConflictException(String.format("Max Processing Retries exhausted for dossier %s and file %s with errorCount: %s",
|
||||
dossierId,
|
||||
fileId,
|
||||
updatedFileEntity.getProcessingErrorCounter()));
|
||||
dossierId,
|
||||
fileId,
|
||||
updatedFileEntity.getProcessingErrorCounter()));
|
||||
}
|
||||
}
|
||||
|
||||
@ -105,33 +106,52 @@ public class FileStatusProcessingUpdateService {
|
||||
|
||||
retryTemplate.execute(retryContext -> {
|
||||
log.warn("Retrying {} time to set ERROR status for file {} in dossier {} with reason {} ",
|
||||
retryContext.getRetryCount(),
|
||||
fileId,
|
||||
dossierId,
|
||||
fileErrorInfo != null ? fileErrorInfo.getCause() : null);
|
||||
retryContext.getRetryCount(),
|
||||
fileId,
|
||||
dossierId,
|
||||
fileErrorInfo != null ? fileErrorInfo.getCause() : null);
|
||||
fileStatusService.setStatusError(fileId, fileErrorInfo);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
public void ocrProcessing(String dossierId, String fileId) {
|
||||
public void requeueOCROrMarkFailed(String dossierId, String fileId, FileErrorInfo fileErrorInfo) {
|
||||
|
||||
var fileEntity = fileStatusPersistenceService.getStatus(fileId);
|
||||
if (fileEntity.getProcessingErrorCounter() > settings.getMaxErrorRetries()) {
|
||||
ocrFailed(dossierId, fileId, fileErrorInfo);
|
||||
} else {
|
||||
fileStatusService.setStatusOcrProcessing(fileId,
|
||||
fileEntity.getProcessingStatus().equals(ProcessingStatus.OCR_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0);
|
||||
fileStatusService.addToOcrQueue(dossierId, fileId, 2);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void ocrProcessingUpdateOnly(String fileId) {
|
||||
|
||||
var fileEntity = fileStatusPersistenceService.getStatus(fileId);
|
||||
|
||||
retryTemplate.execute(retryContext -> {
|
||||
log.info("Ocr processing dossier {} and file {}, Attempt to update status: {}", dossierId, fileId, retryContext.getRetryCount());
|
||||
log.info("Ocr processing dossier {} and file {}, Attempt to update status: {}", fileEntity.getDossierId(), fileId, retryContext.getRetryCount());
|
||||
fileStatusService.setStatusOcrProcessing(fileId,
|
||||
fileEntity.getProcessingStatus().equals(ProcessingStatus.OCR_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0);
|
||||
fileEntity.getProcessingStatus().equals(ProcessingStatus.OCR_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
public void ocrProcessing(String fileId) {
|
||||
|
||||
ocrProcessingUpdateOnly(fileId);
|
||||
|
||||
var updatedFileEntity = fileStatusPersistenceService.getStatus(fileId);
|
||||
if (updatedFileEntity.getProcessingErrorCounter() > settings.getMaxErrorRetries()) {
|
||||
throw new ConflictException(String.format("Max Ocr Retries exhausted for dossier %s and file %s with errorCount: %s",
|
||||
dossierId,
|
||||
fileId,
|
||||
updatedFileEntity.getProcessingErrorCounter()));
|
||||
updatedFileEntity.getDossierId(),
|
||||
fileId,
|
||||
updatedFileEntity.getProcessingErrorCounter()));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -456,9 +456,9 @@ public class FileStatusService {
|
||||
}
|
||||
|
||||
|
||||
private void addToOcrQueue(String dossierId, String fileId, int priority) {
|
||||
public void addToOcrQueue(String dossierId, String fileId, int priority) {
|
||||
|
||||
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_QUEUE, new DocumentRequest(dossierId, fileId), message -> {
|
||||
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_REQUEST_QUEUE, new DocumentRequest(dossierId, fileId), message -> {
|
||||
message.getMessageProperties().setPriority(priority);
|
||||
return message;
|
||||
});
|
||||
|
||||
@ -1,14 +1,22 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.service.queue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.model.OCRStatusUpdateResponse;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusProcessingUpdateService;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusService;
|
||||
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileErrorInfo;
|
||||
import com.knecon.fforesight.service.ocr.v1.api.model.DocumentRequest;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -20,13 +28,18 @@ public class OCRProcessingMessageReceiver {
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
private final FileStatusService fileStatusService;
|
||||
private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService;
|
||||
|
||||
|
||||
@SneakyThrows
|
||||
@RabbitListener(queues = MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE)
|
||||
public void receive(OCRStatusUpdateResponse response) {
|
||||
public void handleOCRStatusUpdateMessage(OCRStatusUpdateResponse response) {
|
||||
|
||||
fileStatusService.updateOCRStatus(response);
|
||||
if (response.isOcrStarted()) {
|
||||
fileStatusProcessingUpdateService.ocrProcessingUpdateOnly(response.getFileId());
|
||||
} else {
|
||||
fileStatusService.updateOCRStatus(response);
|
||||
}
|
||||
|
||||
log.info("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE);
|
||||
}
|
||||
@ -34,11 +47,38 @@ public class OCRProcessingMessageReceiver {
|
||||
|
||||
@SneakyThrows
|
||||
@RabbitListener(queues = MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_DQL)
|
||||
public void handleDLQMessage(Message failedMessage) {
|
||||
public void handleOCRStatusUpdateDLQMessage(Message failedMessage) {
|
||||
|
||||
var response = objectMapper.readValue(failedMessage.getBody(), OCRStatusUpdateResponse.class);
|
||||
|
||||
log.info("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_DQL);
|
||||
}
|
||||
|
||||
|
||||
@RabbitHandler
|
||||
@RabbitListener(queues = MessagingConfiguration.OCR_RESPONSE_QUEUE)
|
||||
public void handleOCRResponseMessage(Message successMessage) throws IOException {
|
||||
|
||||
DocumentRequest ocrResponseMessage = objectMapper.readValue(successMessage.getBody(), DocumentRequest.class);
|
||||
|
||||
log.info("OCR Response received: {}", ocrResponseMessage);
|
||||
fileStatusProcessingUpdateService.ocrSuccessful(ocrResponseMessage.getDossierId(), ocrResponseMessage.getFileId());
|
||||
}
|
||||
|
||||
|
||||
@RabbitHandler
|
||||
@RabbitListener(queues = MessagingConfiguration.OCR_DLQ)
|
||||
public void handleOCRDQLMessage(Message failedMessage) throws IOException {
|
||||
|
||||
DocumentRequest ocrRequestMessage = objectMapper.readValue(failedMessage.getBody(), DocumentRequest.class);
|
||||
|
||||
log.info("OCR DQL received: {}", ocrRequestMessage);
|
||||
String errorMessage = failedMessage.getMessageProperties().getHeader(MessagingConfiguration.X_ERROR_INFO_HEADER);
|
||||
OffsetDateTime timestamp = failedMessage.getMessageProperties().getHeader(MessagingConfiguration.X_ERROR_INFO_TIMESTAMP_HEADER);
|
||||
timestamp = timestamp != null ? timestamp : OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS);
|
||||
fileStatusProcessingUpdateService.requeueOCROrMarkFailed(ocrRequestMessage.getDossierId(),
|
||||
ocrRequestMessage.getFileId(),
|
||||
new FileErrorInfo(errorMessage, MessagingConfiguration.OCR_DLQ, "ocr-service", timestamp));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -39,18 +39,18 @@ fforesight:
|
||||
auth-server-url: 'http://localhost:8080'
|
||||
jobs:
|
||||
enabled: true
|
||||
datasource:
|
||||
url: jdbc:postgresql://${PSQL_HOST:localhost}:${PSQL_PORT:25432}/${PSQL_DATABASE:tenantmanager}?ApplicationName=${spring.application.name:}-scheduler&cachePrepStmts=true&useServerPrepStmts=true&rewriteBatchedStatements=true
|
||||
driverClassName: org.postgresql.Driver
|
||||
username: ${PSQL_USERNAME:tenantmanager}
|
||||
password: ${PSQL_PASSWORD:r3dact3d}
|
||||
platform: org.hibernate.dialect.PostgreSQL95Dialect
|
||||
hikari:
|
||||
maximumPoolSize: 2
|
||||
data-source-properties:
|
||||
cachePrepStmts: true
|
||||
prepStmtCacheSize: 1000
|
||||
prepStmtCacheSqlLimit: 2048
|
||||
# datasource:
|
||||
# url: jdbc:postgresql://${PSQL_HOST:localhost}:${PSQL_PORT:5432}/${PSQL_DATABASE:tenantmanager}?ApplicationName=${spring.application.name:}-scheduler&cachePrepStmts=true&useServerPrepStmts=true&rewriteBatchedStatements=true
|
||||
# driverClassName: org.postgresql.Driver
|
||||
# username: ${PSQL_USERNAME:tenantmanager}
|
||||
# password: ${PSQL_PASSWORD:r3dact3d}
|
||||
# platform: org.hibernate.dialect.PostgreSQL95Dialect
|
||||
# hikari:
|
||||
# maximumPoolSize: 2
|
||||
# data-source-properties:
|
||||
# cachePrepStmts: true
|
||||
# prepStmtCacheSize: 1000
|
||||
# prepStmtCacheSqlLimit: 2048
|
||||
|
||||
management:
|
||||
tracing:
|
||||
|
||||
@ -125,6 +125,7 @@ fforesight:
|
||||
keycloak:
|
||||
ignored-endpoints: [ '/redaction-gateway-v1','/actuator/health/**', '/redaction-gateway-v1/async/download/with-ott/**',
|
||||
'/internal-api/**', '/redaction-gateway-v1/docs/swagger-ui',
|
||||
'/redaction-gateway-v1/**',
|
||||
'/redaction-gateway-v1/docs/**','/redaction-gateway-v1/docs',
|
||||
'/api', '/api/','/api/docs/**','/api/docs','/api/docs/swagger-ui' ]
|
||||
enabled: true
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user