From d849859def58ad600f425fd21660bcb871bd4c2f Mon Sep 17 00:00:00 2001 From: Timo Bejan Date: Mon, 4 Mar 2024 11:46:27 +0200 Subject: [PATCH 1/2] CLARI-30 - reworked OCR service integration to work via queue only, depercated old rest endpoints. Renamed queues to naming convetion. --- ...tusProcessingUpdateInternalController.java | 6 ++- .../configuration/MessagingConfiguration.java | 12 ++--- .../model/OCRStatusUpdateResponse.java | 1 + .../FileStatusProcessingUpdateService.java | 37 ++++++++------- .../processor/service/FileStatusService.java | 2 +- .../queue/OCRProcessingMessageReceiver.java | 46 +++++++++++++++++-- 6 files changed, 77 insertions(+), 27 deletions(-) diff --git a/persistence-service-v1/persistence-service-internal-api-impl-v1/src/main/java/com/iqser/red/service/persistence/v1/internal/api/controller/FileStatusProcessingUpdateInternalController.java b/persistence-service-v1/persistence-service-internal-api-impl-v1/src/main/java/com/iqser/red/service/persistence/v1/internal/api/controller/FileStatusProcessingUpdateInternalController.java index bdf151f1a..7d530b5d5 100644 --- a/persistence-service-v1/persistence-service-internal-api-impl-v1/src/main/java/com/iqser/red/service/persistence/v1/internal/api/controller/FileStatusProcessingUpdateInternalController.java +++ b/persistence-service-v1/persistence-service-internal-api-impl-v1/src/main/java/com/iqser/red/service/persistence/v1/internal/api/controller/FileStatusProcessingUpdateInternalController.java @@ -57,6 +57,7 @@ public class FileStatusProcessingUpdateInternalController implements FileStatusP } + @Deprecated public void ocrSuccessful(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId) { fileStatusProcessingUpdateService.ocrSuccessful(dossierId, fileId); @@ -81,12 +82,14 @@ public class FileStatusProcessingUpdateInternalController implements FileStatusP } + @Deprecated public void ocrProcessing(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId) { - fileStatusProcessingUpdateService.ocrProcessing(dossierId, fileId); + fileStatusProcessingUpdateService.ocrProcessing(fileId); } + @Deprecated public void ocrFailed(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId) { fileStatusProcessingUpdateService.ocrFailed(dossierId, @@ -98,6 +101,7 @@ public class FileStatusProcessingUpdateInternalController implements FileStatusP } + @Deprecated public void ocrFailed(@PathVariable(DOSSIER_ID_PARAM) String dossierId, @PathVariable(FILE_ID) String fileId, @RequestBody FileErrorInfo fileErrorInfo) { fileStatusProcessingUpdateService.ocrFailed(dossierId, fileId, fileErrorInfo); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/MessagingConfiguration.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/MessagingConfiguration.java index 0b7d74425..73e52b64c 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/MessagingConfiguration.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/configuration/MessagingConfiguration.java @@ -35,8 +35,11 @@ public class MessagingConfiguration { public static final String REPORT_RESULT_QUEUE = "reportResultQueue"; public static final String REPORT_RESULT_DLQ = "reportResultDLQ"; - public static final String OCR_QUEUE = "ocrQueue"; - public static final String OCR_DLQ = "ocrDLQ"; + public static final String OCR_REQUEST_QUEUE = "ocr_request_queue"; + public static final String OCR_RESPONSE_QUEUE = "ocr_response_queue"; + public static final String OCR_DLQ = "ocr_dead_letter_queue"; + public static final String OCR_STATUS_UPDATE_RESPONSE_QUEUE = "ocr_status_update_response_queue"; + public static final String OCR_STATUS_UPDATE_RESPONSE_DQL = "ocr_status_update_dead_letter_queue"; public static final String INDEXING_QUEUE = "indexingQueue"; public static final String INDEXING_DQL = "indexingDQL"; @@ -68,9 +71,6 @@ public class MessagingConfiguration { public static final String VISUAL_LAYOUT_DLQ = "visual_layout_parsing_service_dead_letter_queue"; public static final String ANALYSIS_FLAG_CALCULATION_QUEUE = "analysis_flag_calculation_queue"; - public static final String OCR_STATUS_UPDATE_RESPONSE_QUEUE = "ocr_status_update_response_queue"; - public static final String OCR_STATUS_UPDATE_RESPONSE_DQL = "ocr_status_update_response_dql"; - public static final String X_ERROR_INFO_HEADER = "x-error-message"; public static final String X_ERROR_INFO_TIMESTAMP_HEADER = "x-error-message-timestamp"; @@ -241,7 +241,7 @@ public class MessagingConfiguration { @Bean public Queue ocrQueue() { - return QueueBuilder.durable(OCR_QUEUE) + return QueueBuilder.durable(OCR_REQUEST_QUEUE) .withArgument("x-dead-letter-exchange", "") .withArgument("x-dead-letter-routing-key", OCR_DLQ) .withArgument("x-max-priority", 2) // Higher value is higher priority. diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/model/OCRStatusUpdateResponse.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/model/OCRStatusUpdateResponse.java index b874938c5..41c0d88e4 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/model/OCRStatusUpdateResponse.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/model/OCRStatusUpdateResponse.java @@ -15,5 +15,6 @@ public class OCRStatusUpdateResponse { private int numberOfPagesToOCR; private int numberOfOCRedPages; private boolean ocrFinished; + private boolean ocrStarted; } diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusProcessingUpdateService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusProcessingUpdateService.java index 829cfd93d..2e5151f5c 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusProcessingUpdateService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusProcessingUpdateService.java @@ -1,6 +1,7 @@ package com.iqser.red.service.persistence.management.v1.processor.service; import org.apache.commons.lang3.StringUtils; + import org.springframework.retry.support.RetryTemplate; import org.springframework.web.bind.annotation.RestController; @@ -78,16 +79,16 @@ public class FileStatusProcessingUpdateService { retryTemplate.execute(retryContext -> { log.info("Preprocessing dossier {} and file {}, Attempt to update status: {}", dossierId, fileId, retryContext.getRetryCount()); fileStatusService.setStatusPreProcessing(fileId, - fileEntity.getProcessingStatus().equals(ProcessingStatus.PRE_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0); + fileEntity.getProcessingStatus().equals(ProcessingStatus.PRE_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0); return null; }); var updatedFileEntity = fileStatusPersistenceService.getStatus(fileId); if (updatedFileEntity.getProcessingErrorCounter() > settings.getMaxErrorRetries()) { throw new ConflictException(String.format("Max Processing Retries exhausted for dossier %s and file %s with errorCount: %s", - dossierId, - fileId, - updatedFileEntity.getProcessingErrorCounter())); + dossierId, + fileId, + updatedFileEntity.getProcessingErrorCounter())); } } @@ -105,33 +106,37 @@ public class FileStatusProcessingUpdateService { retryTemplate.execute(retryContext -> { log.warn("Retrying {} time to set ERROR status for file {} in dossier {} with reason {} ", - retryContext.getRetryCount(), - fileId, - dossierId, - fileErrorInfo != null ? fileErrorInfo.getCause() : null); + retryContext.getRetryCount(), + fileId, + dossierId, + fileErrorInfo != null ? fileErrorInfo.getCause() : null); fileStatusService.setStatusError(fileId, fileErrorInfo); return null; }); } - - public void ocrProcessing(String dossierId, String fileId) { - + public void ocrProcessingUpdateOnly(String fileId) { var fileEntity = fileStatusPersistenceService.getStatus(fileId); retryTemplate.execute(retryContext -> { - log.info("Ocr processing dossier {} and file {}, Attempt to update status: {}", dossierId, fileId, retryContext.getRetryCount()); + log.info("Ocr processing dossier {} and file {}, Attempt to update status: {}", fileEntity.getDossierId(), fileId, retryContext.getRetryCount()); fileStatusService.setStatusOcrProcessing(fileId, - fileEntity.getProcessingStatus().equals(ProcessingStatus.OCR_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0); + fileEntity.getProcessingStatus().equals(ProcessingStatus.OCR_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0); return null; }); + } + + + public void ocrProcessing(String fileId) { + + ocrProcessingUpdateOnly(fileId); var updatedFileEntity = fileStatusPersistenceService.getStatus(fileId); if (updatedFileEntity.getProcessingErrorCounter() > settings.getMaxErrorRetries()) { throw new ConflictException(String.format("Max Ocr Retries exhausted for dossier %s and file %s with errorCount: %s", - dossierId, - fileId, - updatedFileEntity.getProcessingErrorCounter())); + updatedFileEntity.getDossierId(), + fileId, + updatedFileEntity.getProcessingErrorCounter())); } } diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusService.java index 06117fde5..50152ea51 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusService.java @@ -458,7 +458,7 @@ public class FileStatusService { private void addToOcrQueue(String dossierId, String fileId, int priority) { - rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_QUEUE, new DocumentRequest(dossierId, fileId), message -> { + rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_REQUEST_QUEUE, new DocumentRequest(dossierId, fileId), message -> { message.getMessageProperties().setPriority(priority); return message; }); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/OCRProcessingMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/OCRProcessingMessageReceiver.java index 9fa4a0d6e..ef9168309 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/OCRProcessingMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/OCRProcessingMessageReceiver.java @@ -1,14 +1,22 @@ package com.iqser.red.service.persistence.management.v1.processor.service.queue; +import java.io.IOException; +import java.time.OffsetDateTime; +import java.time.temporal.ChronoUnit; + import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import com.fasterxml.jackson.databind.ObjectMapper; import com.iqser.red.service.persistence.management.v1.processor.configuration.MessagingConfiguration; import com.iqser.red.service.persistence.management.v1.processor.model.OCRStatusUpdateResponse; +import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusProcessingUpdateService; import com.iqser.red.service.persistence.management.v1.processor.service.FileStatusService; +import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileErrorInfo; +import com.knecon.fforesight.service.ocr.v1.api.model.DocumentRequest; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -20,13 +28,18 @@ public class OCRProcessingMessageReceiver { private final ObjectMapper objectMapper; private final FileStatusService fileStatusService; + private final FileStatusProcessingUpdateService fileStatusProcessingUpdateService; @SneakyThrows @RabbitListener(queues = MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE) - public void receive(OCRStatusUpdateResponse response) { + public void handleOCRStatusUpdateMessage(OCRStatusUpdateResponse response) { - fileStatusService.updateOCRStatus(response); + if (response.isOcrStarted()) { + fileStatusProcessingUpdateService.ocrProcessingUpdateOnly(response.getFileId()); + } else { + fileStatusService.updateOCRStatus(response); + } log.info("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE); } @@ -34,11 +47,38 @@ public class OCRProcessingMessageReceiver { @SneakyThrows @RabbitListener(queues = MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_DQL) - public void handleDLQMessage(Message failedMessage) { + public void handleOCRStatusUpdateDLQMessage(Message failedMessage) { var response = objectMapper.readValue(failedMessage.getBody(), OCRStatusUpdateResponse.class); log.info("Received message {} in {}", response, MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_DQL); } + + @RabbitHandler + @RabbitListener(queues = MessagingConfiguration.OCR_RESPONSE_QUEUE) + public void handleOCRResponseMessage(Message successMessage) throws IOException { + + DocumentRequest ocrResponseMessage = objectMapper.readValue(successMessage.getBody(), DocumentRequest.class); + + log.info("OCR Response received: {}", ocrResponseMessage); + fileStatusProcessingUpdateService.ocrSuccessful(ocrResponseMessage.getDossierId(), ocrResponseMessage.getFileId()); + } + + + @RabbitHandler + @RabbitListener(queues = MessagingConfiguration.OCR_DLQ) + public void handleOCRDQLMessage(Message failedMessage) throws IOException { + + DocumentRequest ocrRequestMessage = objectMapper.readValue(failedMessage.getBody(), DocumentRequest.class); + + log.info("OCR DQL received: {}", ocrRequestMessage); + String errorMessage = failedMessage.getMessageProperties().getHeader(MessagingConfiguration.X_ERROR_INFO_HEADER); + OffsetDateTime timestamp = failedMessage.getMessageProperties().getHeader(MessagingConfiguration.X_ERROR_INFO_TIMESTAMP_HEADER); + timestamp = timestamp != null ? timestamp : OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS); + fileStatusProcessingUpdateService.ocrFailed(ocrRequestMessage.getDossierId(), + ocrRequestMessage.getFileId(), + new FileErrorInfo(errorMessage, MessagingConfiguration.OCR_DLQ, "ocr-service", timestamp)); + } + } -- 2.47.2 From 47a9bc034343b4dd3b5048f5a3d7f3ff16801d7e Mon Sep 17 00:00:00 2001 From: Timo Bejan Date: Tue, 5 Mar 2024 10:48:45 +0200 Subject: [PATCH 2/2] Retry ocr --- .../FileStatusProcessingUpdateService.java | 15 ++++++++++++ .../processor/service/FileStatusService.java | 2 +- .../queue/OCRProcessingMessageReceiver.java | 2 +- .../src/main/resources/application-dev.yaml | 24 +++++++++---------- .../src/main/resources/application.yaml | 1 + 5 files changed, 30 insertions(+), 14 deletions(-) diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusProcessingUpdateService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusProcessingUpdateService.java index 2e5151f5c..1ff6dc088 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusProcessingUpdateService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusProcessingUpdateService.java @@ -115,7 +115,22 @@ public class FileStatusProcessingUpdateService { }); } + + public void requeueOCROrMarkFailed(String dossierId, String fileId, FileErrorInfo fileErrorInfo) { + + var fileEntity = fileStatusPersistenceService.getStatus(fileId); + if (fileEntity.getProcessingErrorCounter() > settings.getMaxErrorRetries()) { + ocrFailed(dossierId, fileId, fileErrorInfo); + } else { + fileStatusService.setStatusOcrProcessing(fileId, + fileEntity.getProcessingStatus().equals(ProcessingStatus.OCR_PROCESSING) ? fileEntity.getProcessingErrorCounter() + 1 : 0); + fileStatusService.addToOcrQueue(dossierId, fileId, 2); + } + } + + public void ocrProcessingUpdateOnly(String fileId) { + var fileEntity = fileStatusPersistenceService.getStatus(fileId); retryTemplate.execute(retryContext -> { diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusService.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusService.java index 50152ea51..8d5cb009c 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusService.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/FileStatusService.java @@ -456,7 +456,7 @@ public class FileStatusService { } - private void addToOcrQueue(String dossierId, String fileId, int priority) { + public void addToOcrQueue(String dossierId, String fileId, int priority) { rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_REQUEST_QUEUE, new DocumentRequest(dossierId, fileId), message -> { message.getMessageProperties().setPriority(priority); diff --git a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/OCRProcessingMessageReceiver.java b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/OCRProcessingMessageReceiver.java index ef9168309..aae0f6861 100644 --- a/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/OCRProcessingMessageReceiver.java +++ b/persistence-service-v1/persistence-service-processor-v1/src/main/java/com/iqser/red/service/persistence/management/v1/processor/service/queue/OCRProcessingMessageReceiver.java @@ -76,7 +76,7 @@ public class OCRProcessingMessageReceiver { String errorMessage = failedMessage.getMessageProperties().getHeader(MessagingConfiguration.X_ERROR_INFO_HEADER); OffsetDateTime timestamp = failedMessage.getMessageProperties().getHeader(MessagingConfiguration.X_ERROR_INFO_TIMESTAMP_HEADER); timestamp = timestamp != null ? timestamp : OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS); - fileStatusProcessingUpdateService.ocrFailed(ocrRequestMessage.getDossierId(), + fileStatusProcessingUpdateService.requeueOCROrMarkFailed(ocrRequestMessage.getDossierId(), ocrRequestMessage.getFileId(), new FileErrorInfo(errorMessage, MessagingConfiguration.OCR_DLQ, "ocr-service", timestamp)); } diff --git a/persistence-service-v1/persistence-service-server-v1/src/main/resources/application-dev.yaml b/persistence-service-v1/persistence-service-server-v1/src/main/resources/application-dev.yaml index 369f33352..ca4607034 100644 --- a/persistence-service-v1/persistence-service-server-v1/src/main/resources/application-dev.yaml +++ b/persistence-service-v1/persistence-service-server-v1/src/main/resources/application-dev.yaml @@ -39,18 +39,18 @@ fforesight: auth-server-url: 'http://localhost:8080' jobs: enabled: true - datasource: - url: jdbc:postgresql://${PSQL_HOST:localhost}:${PSQL_PORT:25432}/${PSQL_DATABASE:tenantmanager}?ApplicationName=${spring.application.name:}-scheduler&cachePrepStmts=true&useServerPrepStmts=true&rewriteBatchedStatements=true - driverClassName: org.postgresql.Driver - username: ${PSQL_USERNAME:tenantmanager} - password: ${PSQL_PASSWORD:r3dact3d} - platform: org.hibernate.dialect.PostgreSQL95Dialect - hikari: - maximumPoolSize: 2 - data-source-properties: - cachePrepStmts: true - prepStmtCacheSize: 1000 - prepStmtCacheSqlLimit: 2048 +# datasource: +# url: jdbc:postgresql://${PSQL_HOST:localhost}:${PSQL_PORT:5432}/${PSQL_DATABASE:tenantmanager}?ApplicationName=${spring.application.name:}-scheduler&cachePrepStmts=true&useServerPrepStmts=true&rewriteBatchedStatements=true +# driverClassName: org.postgresql.Driver +# username: ${PSQL_USERNAME:tenantmanager} +# password: ${PSQL_PASSWORD:r3dact3d} +# platform: org.hibernate.dialect.PostgreSQL95Dialect +# hikari: +# maximumPoolSize: 2 +# data-source-properties: +# cachePrepStmts: true +# prepStmtCacheSize: 1000 +# prepStmtCacheSqlLimit: 2048 management: tracing: diff --git a/persistence-service-v1/persistence-service-server-v1/src/main/resources/application.yaml b/persistence-service-v1/persistence-service-server-v1/src/main/resources/application.yaml index 0af757fe1..b9545234c 100644 --- a/persistence-service-v1/persistence-service-server-v1/src/main/resources/application.yaml +++ b/persistence-service-v1/persistence-service-server-v1/src/main/resources/application.yaml @@ -125,6 +125,7 @@ fforesight: keycloak: ignored-endpoints: [ '/redaction-gateway-v1','/actuator/health/**', '/redaction-gateway-v1/async/download/with-ott/**', '/internal-api/**', '/redaction-gateway-v1/docs/swagger-ui', + '/redaction-gateway-v1/**', '/redaction-gateway-v1/docs/**','/redaction-gateway-v1/docs', '/api', '/api/','/api/docs/**','/api/docs','/api/docs/swagger-ui' ] enabled: true -- 2.47.2