Merge branch 'clari-30' into 'master'

CLARI-30 - reworked OCR service integration to work via queue only, depercated...

See merge request redactmanager/persistence-service!372
This commit is contained in:
Timo Bejan 2024-03-05 14:11:12 +01:00
commit 81b66236f9
8 changed files with 104 additions and 38 deletions

View File

@ -57,6 +57,7 @@ public class FileStatusProcessingUpdateInternalController implements FileStatusP
} }
@Deprecated
public void ocrSuccessful(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId) { public void ocrSuccessful(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId) {
fileStatusProcessingUpdateService.ocrSuccessful(dossierId, fileId); fileStatusProcessingUpdateService.ocrSuccessful(dossierId, fileId);
@ -81,12 +82,14 @@ public class FileStatusProcessingUpdateInternalController implements FileStatusP
} }
@Deprecated
public void ocrProcessing(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId) { public void ocrProcessing(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId) {
fileStatusProcessingUpdateService.ocrProcessing(dossierId, fileId); fileStatusProcessingUpdateService.ocrProcessing(fileId);
} }
@Deprecated
public void ocrFailed(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId) { public void ocrFailed(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId) {
fileStatusProcessingUpdateService.ocrFailed(dossierId, fileStatusProcessingUpdateService.ocrFailed(dossierId,
@ -98,6 +101,7 @@ public class FileStatusProcessingUpdateInternalController implements FileStatusP
} }
@Deprecated
public void ocrFailed(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId, @RequestBody FileErrorInfo fileErrorInfo) { public void ocrFailed(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId, @RequestBody FileErrorInfo fileErrorInfo) {
fileStatusProcessingUpdateService.ocrFailed(dossierId, fileId, fileErrorInfo); fileStatusProcessingUpdateService.ocrFailed(dossierId, fileId, fileErrorInfo);

View File

@ -35,8 +35,11 @@ public class MessagingConfiguration {
public static final String REPORT_RESULT_QUEUE = "reportResultQueue"; public static final String REPORT_RESULT_QUEUE = "reportResultQueue";
public static final String REPORT_RESULT_DLQ = "reportResultDLQ"; public static final String REPORT_RESULT_DLQ = "reportResultDLQ";
public static final String OCR_QUEUE = "ocrQueue"; public static final String OCR_REQUEST_QUEUE = "ocr_request_queue";
public static final String OCR_DLQ = "ocrDLQ"; public static final String OCR_RESPONSE_QUEUE = "ocr_response_queue";
public static final String OCR_DLQ = "ocr_dead_letter_queue";
public static final String OCR_STATUS_UPDATE_RESPONSE_QUEUE = "ocr_status_update_response_queue";
public static final String OCR_STATUS_UPDATE_RESPONSE_DQL = "ocr_status_update_dead_letter_queue";
public static final String INDEXING_QUEUE = "indexingQueue"; public static final String INDEXING_QUEUE = "indexingQueue";
public static final String INDEXING_DQL = "indexingDQL"; public static final String INDEXING_DQL = "indexingDQL";
@ -68,9 +71,6 @@ public class MessagingConfiguration {
public static final String VISUAL_LAYOUT_DLQ = "visual_layout_parsing_service_dead_letter_queue"; public static final String VISUAL_LAYOUT_DLQ = "visual_layout_parsing_service_dead_letter_queue";
public static final String ANALYSIS_FLAG_CALCULATION_QUEUE = "analysis_flag_calculation_queue"; public static final String ANALYSIS_FLAG_CALCULATION_QUEUE = "analysis_flag_calculation_queue";
public static final String OCR_STATUS_UPDATE_RESPONSE_QUEUE = "ocr_status_update_response_queue";
public static final String OCR_STATUS_UPDATE_RESPONSE_DQL = "ocr_status_update_response_dql";
public static final String X_ERROR_INFO_HEADER = "x-error-message"; public static final String X_ERROR_INFO_HEADER = "x-error-message";
public static final String X_ERROR_INFO_TIMESTAMP_HEADER = "x-error-message-timestamp"; public static final String X_ERROR_INFO_TIMESTAMP_HEADER = "x-error-message-timestamp";
@ -241,7 +241,7 @@ public class MessagingConfiguration {
@Bean @Bean
public Queue ocrQueue() { public Queue ocrQueue() {
return QueueBuilder.durable(OCR_QUEUE) return QueueBuilder.durable(OCR_REQUEST_QUEUE)
.withArgument("x-dead-letter-exchange", "") .withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", OCR_DLQ) .withArgument("x-dead-letter-routing-key", OCR_DLQ)
.withArgument("x-max-priority", 2) // Higher value is higher priority. .withArgument("x-max-priority", 2) // Higher value is higher priority.

View File

@ -15,5 +15,6 @@ public class OCRStatusUpdateResponse {
private int numberOfPagesToOCR; private int numberOfPagesToOCR;
private int numberOfOCRedPages; private int numberOfOCRedPages;
private boolean ocrFinished; private boolean ocrFinished;
private boolean ocrStarted;
} }

View File

@ -1,6 +1,7 @@
package com.iqser.red.service.persistence.management.v1.processor.service; package com.iqser.red.service.persistence.management.v1.processor.service;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.retry.support.RetryTemplate; import org.springframework.retry.support.RetryTemplate;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
@ -78,16 +79,16 @@ public class FileStatusProcessingUpdateService {
retryTemplate.execute(retryContext -> { retryTemplate.execute(retryContext -> {
log.info("Preprocessing dossier {} and file {}, Attempt to update status: {}", dossierId, fileId, retryContext.getRetryCount()); log.info("Preprocessing dossier {} and file {}, Attempt to update status: {}", dossierId, fileId, retryContext.getRetryCount());
fileStatusService.setStatusPreProcessing(fileId, fileStatusService.setStatusPreProcessing(fileId,
fileEntity.getProcessingStatus().equals(ProcessingStatus.PRE_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0); fileEntity.getProcessingStatus().equals(ProcessingStatus.PRE_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0);
return null; return null;
}); });
var updatedFileEntity = fileStatusPersistenceService.getStatus(fileId); var updatedFileEntity = fileStatusPersistenceService.getStatus(fileId);
if (updatedFileEntity.getProcessingErrorCounter() > settings.getMaxErrorRetries()) { if (updatedFileEntity.getProcessingErrorCounter() > settings.getMaxErrorRetries()) {
throw new ConflictException(String.format("Max Processing Retries exhausted for dossier %s and file %s with errorCount: %s", throw new ConflictException(String.format("Max Processing Retries exhausted for dossier %s and file %s with errorCount: %s",
dossierId, dossierId,
fileId, fileId,
updatedFileEntity.getProcessingErrorCounter())); updatedFileEntity.getProcessingErrorCounter()));
} }
} }
@ -105,33 +106,52 @@ public class FileStatusProcessingUpdateService {
retryTemplate.execute(retryContext -> { retryTemplate.execute(retryContext -> {
log.warn("Retrying {} time to set ERROR status for file {} in dossier {} with reason {} ", log.warn("Retrying {} time to set ERROR status for file {} in dossier {} with reason {} ",
retryContext.getRetryCount(), retryContext.getRetryCount(),
fileId, fileId,
dossierId, dossierId,
fileErrorInfo != null ? fileErrorInfo.getCause() : null); fileErrorInfo != null ? fileErrorInfo.getCause() : null);
fileStatusService.setStatusError(fileId, fileErrorInfo); fileStatusService.setStatusError(fileId, fileErrorInfo);
return null; return null;
}); });
} }
public void ocrProcessing(String dossierId, String fileId) { public void requeueOCROrMarkFailed(String dossierId, String fileId, FileErrorInfo fileErrorInfo) {
var fileEntity = fileStatusPersistenceService.getStatus(fileId);
if (fileEntity.getProcessingErrorCounter() > settings.getMaxErrorRetries()) {
ocrFailed(dossierId, fileId, fileErrorInfo);
} else {
fileStatusService.setStatusOcrProcessing(fileId,
fileEntity.getProcessingStatus().equals(ProcessingStatus.OCR_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0);
fileStatusService.addToOcrQueue(dossierId, fileId, 2);
}
}
public void ocrProcessingUpdateOnly(String fileId) {
var fileEntity = fileStatusPersistenceService.getStatus(fileId); var fileEntity = fileStatusPersistenceService.getStatus(fileId);
retryTemplate.execute(retryContext -> { retryTemplate.execute(retryContext -> {
log.info("Ocr processing dossier {} and file {}, Attempt to update status: {}", dossierId, fileId, retryContext.getRetryCount()); log.info("Ocr processing dossier {} and file {}, Attempt to update status: {}", fileEntity.getDossierId(), fileId, retryContext.getRetryCount());
fileStatusService.setStatusOcrProcessing(fileId, fileStatusService.setStatusOcrProcessing(fileId,
fileEntity.getProcessingStatus().equals(ProcessingStatus.OCR_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0); fileEntity.getProcessingStatus().equals(ProcessingStatus.OCR_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0);
return null; return null;
}); });
}
public void ocrProcessing(String fileId) {
ocrProcessingUpdateOnly(fileId);
var updatedFileEntity = fileStatusPersistenceService.getStatus(fileId); var updatedFileEntity = fileStatusPersistenceService.getStatus(fileId);
if (updatedFileEntity.getProcessingErrorCounter() > settings.getMaxErrorRetries()) { if (updatedFileEntity.getProcessingErrorCounter() > settings.getMaxErrorRetries()) {
throw new ConflictException(String.format("Max Ocr Retries exhausted for dossier %s and file %s with errorCount: %s", throw new ConflictException(String.format("Max Ocr Retries exhausted for dossier %s and file %s with errorCount: %s",
dossierId, updatedFileEntity.getDossierId(),
fileId, fileId,
updatedFileEntity.getProcessingErrorCounter())); updatedFileEntity.getProcessingErrorCounter()));
} }
} }

View File

@ -456,9 +456,9 @@ public class FileStatusService {
} }
private void addToOcrQueue(String dossierId, String fileId, int priority) { public void addToOcrQueue(String dossierId, String fileId, int priority) {
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_QUEUE, new DocumentRequest(dossierId, fileId), message -> { rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_REQUEST_QUEUE, new DocumentRequest(dossierId, fileId), message -> {
message.getMessageProperties().setPriority(priority); message.getMessageProperties().setPriority(priority);
return message; return message;
}); });

View File

@ -1,14 +1,22 @@
package com.iqser.red.service.persistence.management.v1.processor.service.queue; package com.iqser.red.service.persistence.management.v1.processor.service.queue;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration; import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration;
import com.iqser.red.service.persistence.management.v1.processor.model.OCRStatusUpdateResponse; import com.iqser.red.service.persistence.management.v1.processor.model.OCRStatusUpdateResponse;
import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusProcessingUpdateService;
import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusService; import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusService;
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileErrorInfo;
import com.knecon.fforesight.service.ocr.v1.api.model.DocumentRequest;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -20,13 +28,18 @@ public class OCRProcessingMessageReceiver {
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final FileStatusService fileStatusService; private final FileStatusService fileStatusService;
private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService;
@SneakyThrows @SneakyThrows
@RabbitListener(queues = MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE) @RabbitListener(queues = MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE)
public void receive(OCRStatusUpdateResponse response) { public void handleOCRStatusUpdateMessage(OCRStatusUpdateResponse response) {
fileStatusService.updateOCRStatus(response); if (response.isOcrStarted()) {
fileStatusProcessingUpdateService.ocrProcessingUpdateOnly(response.getFileId());
} else {
fileStatusService.updateOCRStatus(response);
}
log.info("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE); log.info("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE);
} }
@ -34,11 +47,38 @@ public class OCRProcessingMessageReceiver {
@SneakyThrows @SneakyThrows
@RabbitListener(queues = MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_DQL) @RabbitListener(queues = MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_DQL)
public void handleDLQMessage(Message failedMessage) { public void handleOCRStatusUpdateDLQMessage(Message failedMessage) {
var response = objectMapper.readValue(failedMessage.getBody(), OCRStatusUpdateResponse.class); var response = objectMapper.readValue(failedMessage.getBody(), OCRStatusUpdateResponse.class);
log.info("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_DQL); log.info("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_DQL);
} }
@RabbitHandler
@RabbitListener(queues = MessagingConfiguration.OCR_RESPONSE_QUEUE)
public void handleOCRResponseMessage(Message successMessage) throws IOException {
DocumentRequest ocrResponseMessage = objectMapper.readValue(successMessage.getBody(), DocumentRequest.class);
log.info("OCR Response received: {}", ocrResponseMessage);
fileStatusProcessingUpdateService.ocrSuccessful(ocrResponseMessage.getDossierId(), ocrResponseMessage.getFileId());
}
@RabbitHandler
@RabbitListener(queues = MessagingConfiguration.OCR_DLQ)
public void handleOCRDQLMessage(Message failedMessage) throws IOException {
DocumentRequest ocrRequestMessage = objectMapper.readValue(failedMessage.getBody(), DocumentRequest.class);
log.info("OCR DQL received: {}", ocrRequestMessage);
String errorMessage = failedMessage.getMessageProperties().getHeader(MessagingConfiguration.X_ERROR_INFO_HEADER);
OffsetDateTime timestamp = failedMessage.getMessageProperties().getHeader(MessagingConfiguration.X_ERROR_INFO_TIMESTAMP_HEADER);
timestamp = timestamp != null ? timestamp : OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS);
fileStatusProcessingUpdateService.requeueOCROrMarkFailed(ocrRequestMessage.getDossierId(),
ocrRequestMessage.getFileId(),
new FileErrorInfo(errorMessage, MessagingConfiguration.OCR_DLQ, "ocr-service", timestamp));
}
} }

View File

@ -39,18 +39,18 @@ fforesight:
auth-server-url: 'http://localhost:8080' auth-server-url: 'http://localhost:8080'
jobs: jobs:
enabled: true enabled: true
datasource: # datasource:
url: jdbc:postgresql://${PSQL_HOST:localhost}:${PSQL_PORT:25432}/${PSQL_DATABASE:tenantmanager}?ApplicationName=${spring.application.name:}-scheduler&cachePrepStmts=true&useServerPrepStmts=true&rewriteBatchedStatements=true # url: jdbc:postgresql://${PSQL_HOST:localhost}:${PSQL_PORT:5432}/${PSQL_DATABASE:tenantmanager}?ApplicationName=${spring.application.name:}-scheduler&cachePrepStmts=true&useServerPrepStmts=true&rewriteBatchedStatements=true
driverClassName: org.postgresql.Driver # driverClassName: org.postgresql.Driver
username: ${PSQL_USERNAME:tenantmanager} # username: ${PSQL_USERNAME:tenantmanager}
password: ${PSQL_PASSWORD:r3dact3d} # password: ${PSQL_PASSWORD:r3dact3d}
platform: org.hibernate.dialect.PostgreSQL95Dialect # platform: org.hibernate.dialect.PostgreSQL95Dialect
hikari: # hikari:
maximumPoolSize: 2 # maximumPoolSize: 2
data-source-properties: # data-source-properties:
cachePrepStmts: true # cachePrepStmts: true
prepStmtCacheSize: 1000 # prepStmtCacheSize: 1000
prepStmtCacheSqlLimit: 2048 # prepStmtCacheSqlLimit: 2048
management: management:
tracing: tracing:

View File

@ -125,6 +125,7 @@ fforesight:
keycloak: keycloak:
ignored-endpoints: [ '/redaction-gateway-v1','/actuator/health/**', '/redaction-gateway-v1/async/download/with-ott/**', ignored-endpoints: [ '/redaction-gateway-v1','/actuator/health/**', '/redaction-gateway-v1/async/download/with-ott/**',
'/internal-api/**', '/redaction-gateway-v1/docs/swagger-ui', '/internal-api/**', '/redaction-gateway-v1/docs/swagger-ui',
'/redaction-gateway-v1/**',
'/redaction-gateway-v1/docs/**','/redaction-gateway-v1/docs', '/redaction-gateway-v1/docs/**','/redaction-gateway-v1/docs',
'/api', '/api/','/api/docs/**','/api/docs','/api/docs/swagger-ui' ] '/api', '/api/','/api/docs/**','/api/docs','/api/docs/swagger-ui' ]
enabled: true enabled: true