CLARI-30 - reworked ocr service to use queues for request/response, moved DLQ listener to consumer of this service. Removed rest API calls
This commit is contained in:
parent
c4c20d15ae
commit
2e37b8eec9
12
README.md
12
README.md
@ -74,12 +74,14 @@ String languages = "deu+eng"; // Defines languages loaded into Tesseract as 3-ch
|
||||
```
|
||||
## Integration
|
||||
|
||||
The OCR-service communicates via RabbitMQ and uses the queues `ocrQueue`, `ocrDLQ`, and `ocr_status_update_response_queue`.
|
||||
The OCR-service communicates via RabbitMQ and uses the queues `ocr_request_queue`, `ocr_response_queue`,
|
||||
`ocr_dead_letter_queue`, and `ocr_status_update_response_queue`.
|
||||
|
||||
### ocrQueue
|
||||
### ocr_request_queue
|
||||
This queue is used to start the OCR process, a DocumentRequest must be passed as a message. The service will then download the PDF from the provided cloud storage.
|
||||
### ocr_response_queue
|
||||
This queue is also used to signal the end of processing.
|
||||
### ocr_dead_letter_queue
|
||||
This queue is used to signal an error has occurred during processing.
|
||||
### ocr_status_update_response_queue
|
||||
This queue is used by the OCR service to give updates about the progress of the ongoing OCR on a image per image basis. The total amount may change, when less images are found than initially assumed.
|
||||
This queue is also used to signal the end of processing.
|
||||
### ocrDLQ
|
||||
This queue is used to signal an error has occurred during processing.
|
||||
|
||||
@ -15,5 +15,6 @@ public class OCRStatusUpdateResponse {
|
||||
private int numberOfPagesToOCR;
|
||||
private int numberOfOCRedPages;
|
||||
private boolean ocrFinished;
|
||||
private boolean ocrStarted;
|
||||
|
||||
}
|
||||
|
||||
@ -7,7 +7,10 @@ public interface IOcrMessageSender {
|
||||
|
||||
void sendUpdate(String fileId, int finishedImages, int totalImages);
|
||||
|
||||
void sendOCRStarted(String fileId);
|
||||
|
||||
void sendOcrFinished(String fileId, int totalImages);
|
||||
|
||||
void sendOcrResponse(String dossierId, String fileId);
|
||||
|
||||
}
|
||||
|
||||
@ -26,5 +26,5 @@ public class OcrServiceSettings {
|
||||
COSName ocrMarkedContentTag = COSName.getPDFName("KNECON_OCR");
|
||||
boolean boldDetection = true; // if true, bold detection will be attempted
|
||||
double boldThreshold = 0.5; // Words are opened with a brick of average stroke width, if the ratio of remaining pixels is higher the word is determined bold.
|
||||
|
||||
boolean sendStatusUpdates = true;
|
||||
}
|
||||
|
||||
@ -5,7 +5,6 @@ import org.springframework.boot.actuate.autoconfigure.security.servlet.Managemen
|
||||
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
|
||||
import org.springframework.cloud.openfeign.EnableFeignClients;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.scheduling.annotation.EnableAsync;
|
||||
@ -13,7 +12,6 @@ import org.springframework.scheduling.annotation.EnableAsync;
|
||||
import com.iqser.red.pdftronlogic.commons.InvisibleElementRemovalService;
|
||||
import com.iqser.red.pdftronlogic.commons.WatermarkRemovalService;
|
||||
import com.knecon.fforesight.service.ocr.processor.OcrServiceProcessorConfiguration;
|
||||
import com.knecon.fforesight.service.ocr.v1.server.client.FileStatusProcessingUpdateClient;
|
||||
import com.knecon.fforesight.service.ocr.v1.server.queue.MessagingConfiguration;
|
||||
import com.iqser.red.storage.commons.StorageAutoConfiguration;
|
||||
import com.knecon.fforesight.tenantcommons.MultiTenancyAutoConfiguration;
|
||||
@ -25,7 +23,6 @@ import io.micrometer.core.instrument.MeterRegistry;
|
||||
@ImportAutoConfiguration({MultiTenancyAutoConfiguration.class})
|
||||
@SpringBootApplication(exclude = {SecurityAutoConfiguration.class, ManagementWebSecurityAutoConfiguration.class})
|
||||
@Import({MessagingConfiguration.class, StorageAutoConfiguration.class, OcrServiceProcessorConfiguration.class})
|
||||
@EnableFeignClients(basePackageClasses = FileStatusProcessingUpdateClient.class)
|
||||
public class Application {
|
||||
|
||||
/**
|
||||
|
||||
@ -1,10 +0,0 @@
|
||||
package com.knecon.fforesight.service.ocr.v1.server.client;
|
||||
|
||||
import org.springframework.cloud.openfeign.FeignClient;
|
||||
|
||||
import com.iqser.red.service.persistence.service.v1.api.internal.resources.FileStatusProcessingUpdateResource;
|
||||
|
||||
@FeignClient(name = "FileStatusProcessingUpdateResource", url = "${persistence-service.url}")
|
||||
public interface FileStatusProcessingUpdateClient extends FileStatusProcessingUpdateResource {
|
||||
|
||||
}
|
||||
@ -11,8 +11,9 @@ import lombok.RequiredArgsConstructor;
|
||||
@RequiredArgsConstructor
|
||||
public class MessagingConfiguration {
|
||||
|
||||
public static final String OCR_QUEUE = "ocrQueue";
|
||||
public static final String OCR_DLQ = "ocrDLQ";
|
||||
public static final String OCR_REQUEST_QUEUE = "ocr_request_queue";
|
||||
public static final String OCR_RESPONSE_QUEUE = "ocr_response_queue";
|
||||
public static final String OCR_DLQ = "ocr_dead_letter_queue";
|
||||
|
||||
public static final String X_DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange";
|
||||
public static final String X_DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
|
||||
@ -27,7 +28,7 @@ public class MessagingConfiguration {
|
||||
@Bean
|
||||
public Queue ocrQueue() {
|
||||
|
||||
return QueueBuilder.durable(OCR_QUEUE)
|
||||
return QueueBuilder.durable(OCR_REQUEST_QUEUE)
|
||||
.withArgument(X_DEAD_LETTER_EXCHANGE, "")
|
||||
.withArgument(X_DEAD_LETTER_ROUTING_KEY, OCR_DLQ)
|
||||
.withArgument(X_MAX_PRIORITY, 2)
|
||||
|
||||
@ -0,0 +1,42 @@
|
||||
package com.knecon.fforesight.service.ocr.v1.server.queue;
|
||||
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.knecon.fforesight.service.ocr.processor.service.IOcrMessageSender;
|
||||
import com.knecon.fforesight.service.ocr.v1.api.model.DocumentRequest;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.experimental.FieldDefaults;
|
||||
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
|
||||
@ConditionalOnProperty(value = "ocr-service.sendStatusUpdates", havingValue = "false")
|
||||
public class NoStatusUpdateOcrMessageSender implements IOcrMessageSender {
|
||||
|
||||
RabbitTemplate rabbitTemplate;
|
||||
|
||||
|
||||
public void sendOcrFinished(String fileId, int totalImages) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
public void sendOCRStarted(String fileId) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
public void sendUpdate(String fileId, int finishedImages, int totalImages) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
public void sendOcrResponse(String dossierId, String fileId) {
|
||||
|
||||
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_RESPONSE_QUEUE, new DocumentRequest(dossierId, fileId));
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,31 +1,23 @@
|
||||
package com.knecon.fforesight.service.ocr.v1.server.queue;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.FileSystemUtils;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.knecon.fforesight.service.ocr.processor.service.OsUtils;
|
||||
import com.knecon.fforesight.service.ocr.v1.server.client.FileStatusProcessingUpdateClient;
|
||||
import com.knecon.fforesight.service.ocr.processor.service.FileStorageService;
|
||||
import com.knecon.fforesight.service.ocr.processor.service.OCRService;
|
||||
import com.knecon.fforesight.service.ocr.processor.service.OsUtils;
|
||||
import com.knecon.fforesight.service.ocr.v1.api.model.DocumentRequest;
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemplate.dossier.file.FileErrorInfo;
|
||||
|
||||
import feign.FeignException;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.experimental.FieldDefaults;
|
||||
@ -34,17 +26,18 @@ import lombok.extern.slf4j.Slf4j;
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@ConditionalOnProperty(value = "ocr-service.sendStatusUpdates", havingValue = "true")
|
||||
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
|
||||
public class OcrMessageReceiver {
|
||||
|
||||
FileStorageService fileStorageService;
|
||||
ObjectMapper objectMapper;
|
||||
FileStatusProcessingUpdateClient fileStatusProcessingUpdateClient;
|
||||
OCRService ocrService;
|
||||
OcrMessageSender ocrMessageSender;
|
||||
|
||||
|
||||
@RabbitHandler
|
||||
@RabbitListener(queues = MessagingConfiguration.OCR_QUEUE, concurrency = "1")
|
||||
@RabbitListener(queues = MessagingConfiguration.OCR_REQUEST_QUEUE, concurrency = "1")
|
||||
public void receiveOcr(Message in) throws IOException {
|
||||
|
||||
DocumentRequest ocrRequestMessage = objectMapper.readValue(in.getBody(), DocumentRequest.class);
|
||||
@ -56,7 +49,7 @@ public class OcrMessageReceiver {
|
||||
log.info("--------------------------------------------------------------------------");
|
||||
log.info("Start ocr for file with dossierId {} and fileId {}", dossierId, fileId);
|
||||
|
||||
setStatusOcrProcessing(dossierId, fileId);
|
||||
ocrMessageSender.sendOCRStarted(fileId);
|
||||
|
||||
tmpDir.toFile().mkdirs();
|
||||
File documentFile = tmpDir.resolve("document.pdf").toFile();
|
||||
@ -68,7 +61,7 @@ public class OcrMessageReceiver {
|
||||
|
||||
fileStorageService.storeFiles(dossierId, fileId, documentFile, viewerDocumentFile);
|
||||
|
||||
fileStatusProcessingUpdateClient.ocrSuccessful(dossierId, fileId);
|
||||
ocrMessageSender.sendOcrResponse(dossierId, fileId);
|
||||
} catch (Exception e) {
|
||||
log.warn("An exception occurred in ocr file stage: {}", e.getMessage());
|
||||
in.getMessageProperties().getHeaders().put(MessagingConfiguration.X_ERROR_INFO_HEADER, e.getMessage());
|
||||
@ -79,32 +72,4 @@ public class OcrMessageReceiver {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@RabbitHandler
|
||||
@RabbitListener(queues = MessagingConfiguration.OCR_DLQ, concurrency = "1")
|
||||
public void receiveOcrDLQ(Message failedMessage) throws IOException {
|
||||
|
||||
DocumentRequest ocrRequestMessage = objectMapper.readValue(failedMessage.getBody(), DocumentRequest.class);
|
||||
|
||||
log.info("OCR DQL received: {}", ocrRequestMessage);
|
||||
String errorMessage = failedMessage.getMessageProperties().getHeader(MessagingConfiguration.X_ERROR_INFO_HEADER);
|
||||
OffsetDateTime timestamp = failedMessage.getMessageProperties().getHeader(MessagingConfiguration.X_ERROR_INFO_TIMESTAMP_HEADER);
|
||||
timestamp = timestamp != null ? timestamp : OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS);
|
||||
fileStatusProcessingUpdateClient.ocrFailed(ocrRequestMessage.getDossierId(),
|
||||
ocrRequestMessage.getFileId(),
|
||||
new FileErrorInfo(errorMessage, MessagingConfiguration.OCR_DLQ, "ocr-service", timestamp));
|
||||
}
|
||||
|
||||
|
||||
private void setStatusOcrProcessing(String dossierId, String fileId) {
|
||||
|
||||
try {
|
||||
fileStatusProcessingUpdateClient.ocrProcessing(dossierId, fileId);
|
||||
} catch (FeignException e) {
|
||||
if (e.status() == HttpStatus.CONFLICT.value()) {
|
||||
throw new AmqpRejectAndDontRequeueException(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -4,6 +4,7 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.knecon.fforesight.service.ocr.processor.service.IOcrMessageSender;
|
||||
import com.knecon.fforesight.service.ocr.v1.api.model.DocumentRequest;
|
||||
import com.knecon.fforesight.service.ocr.v1.api.model.OCRStatusUpdateResponse;
|
||||
|
||||
import lombok.AccessLevel;
|
||||
@ -17,13 +18,19 @@ public class OcrMessageSender implements IOcrMessageSender {
|
||||
|
||||
RabbitTemplate rabbitTemplate;
|
||||
|
||||
|
||||
public void sendOcrFinished(String fileId, int totalImages) {
|
||||
|
||||
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE,
|
||||
OCRStatusUpdateResponse.builder().fileId(fileId).numberOfPagesToOCR(totalImages).numberOfOCRedPages(totalImages).ocrFinished(true).build());
|
||||
}
|
||||
|
||||
public void sendOCRStarted(String fileId) {
|
||||
|
||||
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE,
|
||||
OCRStatusUpdateResponse.builder().fileId(fileId).ocrStarted(true).build());
|
||||
|
||||
}
|
||||
|
||||
|
||||
public void sendUpdate(String fileId, int finishedImages, int totalImages) {
|
||||
|
||||
@ -32,4 +39,10 @@ public class OcrMessageSender implements IOcrMessageSender {
|
||||
|
||||
}
|
||||
|
||||
|
||||
public void sendOcrResponse(String dossierId, String fileId){
|
||||
|
||||
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_RESPONSE_QUEUE, new DocumentRequest(dossierId,fileId));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user