CLARI-30 - reworked OCR service integration to work via queue only, depercated... #372

Merged
timo.bejan.ext merged 2 commits from clari-30 into master 2024-03-05 14:11:13 +01:00
8 changed files with 104 additions and 38 deletions

View File

@ -57,6 +57,7 @@ public class FileStatusProcessingUpdateInternalController implements FileStatusP
}
@Deprecated
public void ocrSuccessful(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId) {
fileStatusProcessingUpdateService.ocrSuccessful(dossierId, fileId);
@ -81,12 +82,14 @@ public class FileStatusProcessingUpdateInternalController implements FileStatusP
}
@Deprecated
public void ocrProcessing(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId) {
fileStatusProcessingUpdateService.ocrProcessing(dossierId, fileId);
fileStatusProcessingUpdateService.ocrProcessing(fileId);
}
@Deprecated
public void ocrFailed(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId) {
fileStatusProcessingUpdateService.ocrFailed(dossierId,
@ -98,6 +101,7 @@ public class FileStatusProcessingUpdateInternalController implements FileStatusP
}
@Deprecated
public void ocrFailed(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId, @RequestBody FileErrorInfo fileErrorInfo) {
fileStatusProcessingUpdateService.ocrFailed(dossierId, fileId, fileErrorInfo);

View File

@ -35,8 +35,11 @@ public class MessagingConfiguration {
public static final String REPORT_RESULT_QUEUE = "reportResultQueue";
public static final String REPORT_RESULT_DLQ = "reportResultDLQ";
public static final String OCR_QUEUE = "ocrQueue";
public static final String OCR_DLQ = "ocrDLQ";
public static final String OCR_REQUEST_QUEUE = "ocr_request_queue";
public static final String OCR_RESPONSE_QUEUE = "ocr_response_queue";
public static final String OCR_DLQ = "ocr_dead_letter_queue";
public static final String OCR_STATUS_UPDATE_RESPONSE_QUEUE = "ocr_status_update_response_queue";
public static final String OCR_STATUS_UPDATE_RESPONSE_DQL = "ocr_status_update_dead_letter_queue";
public static final String INDEXING_QUEUE = "indexingQueue";
public static final String INDEXING_DQL = "indexingDQL";
@ -68,9 +71,6 @@ public class MessagingConfiguration {
public static final String VISUAL_LAYOUT_DLQ = "visual_layout_parsing_service_dead_letter_queue";
public static final String ANALYSIS_FLAG_CALCULATION_QUEUE = "analysis_flag_calculation_queue";
public static final String OCR_STATUS_UPDATE_RESPONSE_QUEUE = "ocr_status_update_response_queue";
public static final String OCR_STATUS_UPDATE_RESPONSE_DQL = "ocr_status_update_response_dql";
public static final String X_ERROR_INFO_HEADER = "x-error-message";
public static final String X_ERROR_INFO_TIMESTAMP_HEADER = "x-error-message-timestamp";
@ -241,7 +241,7 @@ public class MessagingConfiguration {
@Bean
public Queue ocrQueue() {
return QueueBuilder.durable(OCR_QUEUE)
return QueueBuilder.durable(OCR_REQUEST_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", OCR_DLQ)
.withArgument("x-max-priority", 2) // Higher value is higher priority.

View File

@ -15,5 +15,6 @@ public class OCRStatusUpdateResponse {
private int numberOfPagesToOCR;
private int numberOfOCRedPages;
private boolean ocrFinished;
private boolean ocrStarted;
}

View File

@ -1,6 +1,7 @@
package com.iqser.red.service.persistence.management.v1.processor.service;
import org.apache.commons.lang3.StringUtils;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.web.bind.annotation.RestController;
@ -78,16 +79,16 @@ public class FileStatusProcessingUpdateService {
retryTemplate.execute(retryContext -> {
log.info("Preprocessing dossier {} and file {}, Attempt to update status: {}", dossierId, fileId, retryContext.getRetryCount());
fileStatusService.setStatusPreProcessing(fileId,
fileEntity.getProcessingStatus().equals(ProcessingStatus.PRE_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0);
fileEntity.getProcessingStatus().equals(ProcessingStatus.PRE_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0);
return null;
});
var updatedFileEntity = fileStatusPersistenceService.getStatus(fileId);
if (updatedFileEntity.getProcessingErrorCounter() > settings.getMaxErrorRetries()) {
throw new ConflictException(String.format("Max Processing Retries exhausted for dossier %s and file %s with errorCount: %s",
dossierId,
fileId,
updatedFileEntity.getProcessingErrorCounter()));
dossierId,
fileId,
updatedFileEntity.getProcessingErrorCounter()));
}
}
@ -105,33 +106,52 @@ public class FileStatusProcessingUpdateService {
retryTemplate.execute(retryContext -> {
log.warn("Retrying {} time to set ERROR status for file {} in dossier {} with reason {} ",
retryContext.getRetryCount(),
fileId,
dossierId,
fileErrorInfo != null ? fileErrorInfo.getCause() : null);
retryContext.getRetryCount(),
fileId,
dossierId,
fileErrorInfo != null ? fileErrorInfo.getCause() : null);
fileStatusService.setStatusError(fileId, fileErrorInfo);
return null;
});
}
public void ocrProcessing(String dossierId, String fileId) {
public void requeueOCROrMarkFailed(String dossierId, String fileId, FileErrorInfo fileErrorInfo) {
var fileEntity = fileStatusPersistenceService.getStatus(fileId);
if (fileEntity.getProcessingErrorCounter() > settings.getMaxErrorRetries()) {
ocrFailed(dossierId, fileId, fileErrorInfo);
} else {
fileStatusService.setStatusOcrProcessing(fileId,
fileEntity.getProcessingStatus().equals(ProcessingStatus.OCR_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0);
fileStatusService.addToOcrQueue(dossierId, fileId, 2);
}
}
public void ocrProcessingUpdateOnly(String fileId) {
var fileEntity = fileStatusPersistenceService.getStatus(fileId);
retryTemplate.execute(retryContext -> {
log.info("Ocr processing dossier {} and file {}, Attempt to update status: {}", dossierId, fileId, retryContext.getRetryCount());
log.info("Ocr processing dossier {} and file {}, Attempt to update status: {}", fileEntity.getDossierId(), fileId, retryContext.getRetryCount());
fileStatusService.setStatusOcrProcessing(fileId,
fileEntity.getProcessingStatus().equals(ProcessingStatus.OCR_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0);
fileEntity.getProcessingStatus().equals(ProcessingStatus.OCR_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0);
return null;
});
}
public void ocrProcessing(String fileId) {
ocrProcessingUpdateOnly(fileId);
var updatedFileEntity = fileStatusPersistenceService.getStatus(fileId);
if (updatedFileEntity.getProcessingErrorCounter() > settings.getMaxErrorRetries()) {
throw new ConflictException(String.format("Max Ocr Retries exhausted for dossier %s and file %s with errorCount: %s",
dossierId,
fileId,
updatedFileEntity.getProcessingErrorCounter()));
updatedFileEntity.getDossierId(),
fileId,
updatedFileEntity.getProcessingErrorCounter()));
}
}

View File

@ -456,9 +456,9 @@ public class FileStatusService {
}
private void addToOcrQueue(String dossierId, String fileId, int priority) {
public void addToOcrQueue(String dossierId, String fileId, int priority) {
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_QUEUE, new DocumentRequest(dossierId, fileId), message -> {
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_REQUEST_QUEUE, new DocumentRequest(dossierId, fileId), message -> {
message.getMessageProperties().setPriority(priority);
return message;
});

View File

@ -1,14 +1,22 @@
package com.iqser.red.service.persistence.management.v1.processor.service.queue;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
import com.iqser.red.service.persistence.management.v1.processor.model.OCRStatusUpdateResponse;
import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusProcessingUpdateService;
import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusService;
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileErrorInfo;
import com.knecon.fforesight.service.ocr.v1.api.model.DocumentRequest;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@ -20,13 +28,18 @@ public class OCRProcessingMessageReceiver {
private final ObjectMapper objectMapper;
private final FileStatusService fileStatusService;
private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService;
@SneakyThrows
@RabbitListener(queues = MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE)
public void receive(OCRStatusUpdateResponse response) {
public void handleOCRStatusUpdateMessage(OCRStatusUpdateResponse response) {
fileStatusService.updateOCRStatus(response);
if (response.isOcrStarted()) {
fileStatusProcessingUpdateService.ocrProcessingUpdateOnly(response.getFileId());
} else {
fileStatusService.updateOCRStatus(response);
}
log.info("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE);
}
@ -34,11 +47,38 @@ public class OCRProcessingMessageReceiver {
@SneakyThrows
@RabbitListener(queues = MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_DQL)
public void handleDLQMessage(Message failedMessage) {
public void handleOCRStatusUpdateDLQMessage(Message failedMessage) {
var response = objectMapper.readValue(failedMessage.getBody(), OCRStatusUpdateResponse.class);
log.info("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_DQL);
}
@RabbitHandler
@RabbitListener(queues = MessagingConfiguration.OCR_RESPONSE_QUEUE)
public void handleOCRResponseMessage(Message successMessage) throws IOException {
DocumentRequest ocrResponseMessage = objectMapper.readValue(successMessage.getBody(), DocumentRequest.class);
log.info("OCR Response received: {}", ocrResponseMessage);
fileStatusProcessingUpdateService.ocrSuccessful(ocrResponseMessage.getDossierId(), ocrResponseMessage.getFileId());
}
@RabbitHandler
@RabbitListener(queues = MessagingConfiguration.OCR_DLQ)
public void handleOCRDQLMessage(Message failedMessage) throws IOException {
DocumentRequest ocrRequestMessage = objectMapper.readValue(failedMessage.getBody(), DocumentRequest.class);
log.info("OCR DQL received: {}", ocrRequestMessage);
String errorMessage = failedMessage.getMessageProperties().getHeader(MessagingConfiguration.X_ERROR_INFO_HEADER);
OffsetDateTime timestamp = failedMessage.getMessageProperties().getHeader(MessagingConfiguration.X_ERROR_INFO_TIMESTAMP_HEADER);
timestamp = timestamp != null ? timestamp : OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS);
fileStatusProcessingUpdateService.requeueOCROrMarkFailed(ocrRequestMessage.getDossierId(),
ocrRequestMessage.getFileId(),
new FileErrorInfo(errorMessage, MessagingConfiguration.OCR_DLQ, "ocr-service", timestamp));
}
}

View File

@ -39,18 +39,18 @@ fforesight:
auth-server-url: 'http://localhost:8080'
jobs:
enabled: true
datasource:
url: jdbc:postgresql://${PSQL_HOST:localhost}:${PSQL_PORT:25432}/${PSQL_DATABASE:tenantmanager}?ApplicationName=${spring.application.name:}-scheduler&cachePrepStmts=true&useServerPrepStmts=true&rewriteBatchedStatements=true
driverClassName: org.postgresql.Driver
username: ${PSQL_USERNAME:tenantmanager}
password: ${PSQL_PASSWORD:r3dact3d}
platform: org.hibernate.dialect.PostgreSQL95Dialect
hikari:
maximumPoolSize: 2
data-source-properties:
cachePrepStmts: true
prepStmtCacheSize: 1000
prepStmtCacheSqlLimit: 2048
# datasource:
# url: jdbc:postgresql://${PSQL_HOST:localhost}:${PSQL_PORT:5432}/${PSQL_DATABASE:tenantmanager}?ApplicationName=${spring.application.name:}-scheduler&cachePrepStmts=true&useServerPrepStmts=true&rewriteBatchedStatements=true
# driverClassName: org.postgresql.Driver
# username: ${PSQL_USERNAME:tenantmanager}
# password: ${PSQL_PASSWORD:r3dact3d}
# platform: org.hibernate.dialect.PostgreSQL95Dialect
# hikari:
# maximumPoolSize: 2
# data-source-properties:
# cachePrepStmts: true
# prepStmtCacheSize: 1000
# prepStmtCacheSqlLimit: 2048
management:
tracing:

View File

@ -125,6 +125,7 @@ fforesight:
keycloak:
ignored-endpoints: [ '/redaction-gateway-v1','/actuator/health/**', '/redaction-gateway-v1/async/download/with-ott/**',
'/internal-api/**', '/redaction-gateway-v1/docs/swagger-ui',
'/redaction-gateway-v1/**',
'/redaction-gateway-v1/docs/**','/redaction-gateway-v1/docs',
'/api', '/api/','/api/docs/**','/api/docs','/api/docs/swagger-ui' ]
enabled: true