Merge branch 'RED-9331' into 'main'

RED-9331: Explore possibilities for fair upload / analysis processing per tenant

See merge request fforesight/azure-ocr-service!8
This commit is contained in:
Maverick Studer 2024-08-27 09:26:21 +02:00
commit d2d8544439
7 changed files with 101 additions and 10 deletions

View File

@ -15,7 +15,7 @@ dependencies {
api("net.sourceforge.tess4j:tess4j:5.8.0")
api("com.iqser.red.commons:metric-commons:2.1.0")
api("com.iqser.red.commons:storage-commons:2.49.0")
api("com.knecon.fforesight:tenant-commons:0.21.0")
api("com.knecon.fforesight:tenant-commons:0.28.0")
api("com.pdftron:PDFNet:10.7.0")
api("org.apache.pdfbox:pdfbox:3.0.0")
api("org.apache.commons:commons-math3:3.6.1")

View File

@ -8,9 +8,13 @@ import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class MessagingConfiguration {
public static final String OCR_REQUEST_QUEUE = "ocr_request_queue";
public static final String OCR_RESPONSE_QUEUE = "ocr_response_queue";
public static final String OCR_STATUS_UPDATE_RESPONSE_QUEUE = "ocr_status_update_response_queue";
public static final String OCR_REQUEST_QUEUE_PREFIX = "ocr_request_queue";
public static final String OCR_REQUEST_EXCHANGE = "ocr_request_exchange";
public static final String OCR_DLQ = "ocr_dlq";
public static final String OCR_RESPONSE_EXCHANGE = "ocr_response_exchange";
public static final String OCR_STATUS_UPDATE_EXCHANGE = "ocr_status_update_exchange";
public static final String OCR_STATUS_UPDATE_DLQ = "ocr_status_update_dlq";
public static final String X_ERROR_INFO_HEADER = "x-error-message";
public static final String X_ERROR_INFO_TIMESTAMP_HEADER = "x-error-message-timestamp";

View File

@ -0,0 +1,11 @@
package com.knecon.fforesight.service.ocr.v1.server.configuration;
import org.springframework.context.annotation.Configuration;
import com.knecon.fforesight.tenantcommons.queue.TenantMessagingConfiguration;
@Configuration
public class TenantMessagingConfigurationImpl extends TenantMessagingConfiguration {
}

View File

@ -39,7 +39,7 @@ public class NoStatusUpdateOcrMessageSender implements IOcrMessageSender {
public void sendOcrResponse(String dossierId, String fileId) {
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_RESPONSE_QUEUE, TenantContext.getTenantId(), new DocumentRequest(dossierId, fileId));
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_RESPONSE_EXCHANGE, TenantContext.getTenantId(), new DocumentRequest(dossierId, fileId));
}
}

View File

@ -32,6 +32,8 @@ import lombok.extern.slf4j.Slf4j;
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
public class OcrMessageReceiver {
public static final String OCR_REQUEST_LISTENER_ID = "ocr-request-listener";
FileStorageService fileStorageService;
ObjectMapper objectMapper;
OCRService ocrService;
@ -39,7 +41,7 @@ public class OcrMessageReceiver {
@RabbitHandler
@RabbitListener(queues = MessagingConfiguration.OCR_REQUEST_QUEUE, concurrency = "1")
@RabbitListener(id = OCR_REQUEST_LISTENER_ID, concurrency = "1")
public void receiveOcr(Message in) throws IOException {
if (in.getMessageProperties().isRedelivered()) {

View File

@ -8,6 +8,7 @@ import com.knecon.fforesight.service.ocr.processor.service.IOcrMessageSender;
import com.knecon.fforesight.service.ocr.v1.api.model.DocumentRequest;
import com.knecon.fforesight.service.ocr.v1.api.model.OCRStatusUpdateResponse;
import com.knecon.fforesight.service.ocr.v1.server.configuration.MessagingConfiguration;
import com.knecon.fforesight.tenantcommons.TenantContext;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
@ -26,7 +27,8 @@ public class OcrMessageSender implements IOcrMessageSender {
public void sendOcrFinished(String fileId, int totalImages) {
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE,
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_STATUS_UPDATE_EXCHANGE,
TenantContext.getTenantId(),
OCRStatusUpdateResponse.builder().fileId(fileId).numberOfPagesToOCR(totalImages).numberOfOCRedPages(totalImages).ocrFinished(true).build());
}
@ -34,7 +36,8 @@ public class OcrMessageSender implements IOcrMessageSender {
public void sendOCRStarted(String fileId) {
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE,
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_STATUS_UPDATE_EXCHANGE,
TenantContext.getTenantId(),
OCRStatusUpdateResponse.builder().fileId(fileId).ocrStarted(true).build());
@ -43,7 +46,8 @@ public class OcrMessageSender implements IOcrMessageSender {
public void sendUpdate(String fileId, int finishedImages, int totalImages) {
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_STATUS_UPDATE_RESPONSE_QUEUE,
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_STATUS_UPDATE_EXCHANGE,
TenantContext.getTenantId(),
OCRStatusUpdateResponse.builder().fileId(fileId).numberOfPagesToOCR(totalImages).numberOfOCRedPages(finishedImages).build());
@ -52,7 +56,7 @@ public class OcrMessageSender implements IOcrMessageSender {
public void sendOcrResponse(String dossierId, String fileId) {
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_RESPONSE_QUEUE, new DocumentRequest(dossierId, fileId));
rabbitTemplate.convertAndSend(MessagingConfiguration.OCR_RESPONSE_EXCHANGE, TenantContext.getTenantId(), new DocumentRequest(dossierId, fileId));
}
}

View File

@ -0,0 +1,70 @@
package com.knecon.fforesight.service.ocr.v1.server.queue;
import static com.knecon.fforesight.service.ocr.v1.server.configuration.MessagingConfiguration.OCR_DLQ;
import static com.knecon.fforesight.service.ocr.v1.server.configuration.MessagingConfiguration.OCR_REQUEST_EXCHANGE;
import static com.knecon.fforesight.service.ocr.v1.server.configuration.MessagingConfiguration.OCR_REQUEST_QUEUE_PREFIX;
import java.util.Map;
import java.util.Set;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import com.knecon.fforesight.tenantcommons.TenantProvider;
import com.knecon.fforesight.tenantcommons.model.TenantCreatedEvent;
import com.knecon.fforesight.tenantcommons.model.TenantQueueConfiguration;
import com.knecon.fforesight.tenantcommons.model.TenantResponse;
import com.knecon.fforesight.tenantcommons.queue.RabbitQueueFromExchangeService;
import com.knecon.fforesight.tenantcommons.queue.TenantExchangeMessageReceiver;
@Service
public class TenantExchangeMessageReceiverImpl extends TenantExchangeMessageReceiver {
public TenantExchangeMessageReceiverImpl(RabbitQueueFromExchangeService rabbitQueueService, TenantProvider tenantProvider) {
super(rabbitQueueService, tenantProvider);
}
@Override
protected Set<TenantQueueConfiguration> getTenantQueueConfigs() {
return Set.of(TenantQueueConfiguration.builder()
.listenerId(OcrMessageReceiver.OCR_REQUEST_LISTENER_ID)
.exchangeName(OCR_REQUEST_EXCHANGE)
.queuePrefix(OCR_REQUEST_QUEUE_PREFIX)
.dlqName(OCR_DLQ)
.arguments(Map.of("x-max-priority", 2))
.build());
}
@EventListener(ApplicationReadyEvent.class)
public void onApplicationReady() {
System.out.println("application ready invoked");
super.initializeQueues();
}
@RabbitHandler
@RabbitListener(queues = "#{tenantMessagingConfigurationImpl.getTenantCreatedQueueName()}")
public void reactToTenantCreation(TenantCreatedEvent tenantCreatedEvent) {
super.reactToTenantCreation(tenantCreatedEvent);
}
@RabbitHandler
@RabbitListener(queues = "#{tenantMessagingConfigurationImpl.getTenantDeletedQueueName()}")
public void reactToTenantDeletion(TenantResponse tenantResponse) {
super.reactToTenantDeletion(tenantResponse);
}
}