RED-9331: Explore possibilities for fair upload / analysis processing per tenant

This commit is contained in:
Maverick Studer 2024-08-27 09:27:37 +02:00
parent b2fa14dde2
commit 3b33405cbf
10 changed files with 169 additions and 21 deletions

View File

@ -2,6 +2,9 @@ package com.knecon.fforesight.service.layoutparser.internal.api.queue;
public class LayoutParsingQueueNames {
public static final String LAYOUT_PARSING_REQUEST_QUEUE = "layout_parsing_request_queue";
public static final String LAYOUT_PARSING_FINISHED_EVENT_QUEUE = "layout_parsing_response_queue";
public static final String LAYOUT_PARSING_REQUEST_QUEUE_PREFIX = "layout_parsing_request_queue";
public static final String LAYOUT_PARSING_REQUEST_EXCHANGE = "layout_parsing_request_exchange";
public static final String LAYOUT_PARSING_RESPONSE_QUEUE_PREFIX = "layout_parsing_response_queue";
public static final String LAYOUT_PARSING_RESPONSE_EXCHANGE = "layout_parsing_response_exchange";
public static final String LAYOUT_PARSING_DLQ = "layout_parsing_dlq";
}

View File

@ -16,7 +16,7 @@ dependencies {
exclude("org.springframework.boot", "spring-boot-starter-security")
exclude("org.springframework.boot", "spring-boot-starter-validation")
}
implementation("com.knecon.fforesight:tenant-commons:0.21.0")
implementation("com.knecon.fforesight:tenant-commons:0.28.0")
implementation("com.iqser.red.commons:storage-commons:2.45.0")
implementation("org.apache.pdfbox:pdfbox:${pdfBoxVersion}")

View File

@ -30,7 +30,7 @@ dependencies {
implementation(project(":layoutparser-service-internal-api"))
implementation("com.iqser.red.commons:storage-commons:2.45.0")
implementation("com.knecon.fforesight:tenant-commons:0.21.0")
implementation("com.knecon.fforesight:tenant-commons:0.28.0")
implementation("com.knecon.fforesight:tracing-commons:0.5.0")
implementation("com.knecon.fforesight:lifecycle-commons:0.6.0")
implementation("org.springframework.boot:spring-boot-starter-actuator:${springBootStarterVersion}")

View File

@ -0,0 +1,36 @@
package com.knecon.fforesight.service.layoutparser.server.configuration;
import static com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames.LAYOUT_PARSING_DLQ;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames;
@Configuration
public class MessagingConfiguration {
@Bean
public DirectExchange layoutParsingResponseExchange() {
return new DirectExchange(LayoutParsingQueueNames.LAYOUT_PARSING_RESPONSE_EXCHANGE);
}
@Bean
public DirectExchange layoutParsingRequestExchange() {
return new DirectExchange(LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_EXCHANGE);
}
@Bean
public Queue layoutParsingDLQ() {
return QueueBuilder.durable(LAYOUT_PARSING_DLQ).build();
}
}

View File

@ -0,0 +1,11 @@
package com.knecon.fforesight.service.layoutparser.server.configuration;
import org.springframework.context.annotation.Configuration;
import com.knecon.fforesight.tenantcommons.queue.TenantMessagingConfiguration;
@Configuration
public class TenantMessagingConfigurationImpl extends TenantMessagingConfiguration {
}

View File

@ -18,6 +18,7 @@ import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsi
import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingRequest;
import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingType;
import com.knecon.fforesight.service.layoutparser.processor.LayoutParsingPipeline;
import com.knecon.fforesight.tenantcommons.TenantContext;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
@ -28,15 +29,17 @@ import lombok.extern.slf4j.Slf4j;
@RequiredArgsConstructor
public class MessageHandler {
public static final String LAYOUT_PARSING_REQUEST_LISTENER_ID = "layout-parsing-request-listener";
private final LayoutParsingPipeline layoutParsingPipeline;
private final ObjectMapper objectMapper;
private final RabbitTemplate rabbitTemplate;
private final static String X_PIPELINE_PREFIX = "X-PIPE-";
@RabbitHandler
@RabbitListener(queues = LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_QUEUE)
@SneakyThrows
@RabbitHandler
@RabbitListener(id = LAYOUT_PARSING_REQUEST_LISTENER_ID)
public void receiveLayoutParsingRequest(Message message) {
LayoutParsingRequest layoutParsingRequest = objectMapper.readValue(message.getBody(), LayoutParsingRequest.class);
@ -57,7 +60,7 @@ public class MessageHandler {
public void sendLayoutParsingFinishedEvent(LayoutParsingFinishedEvent layoutParsingFinishedEvent, Message message) {
Arrays.stream(layoutParsingFinishedEvent.message().split("\n")).forEach(log::info);
rabbitTemplate.convertAndSend(LayoutParsingQueueNames.LAYOUT_PARSING_FINISHED_EVENT_QUEUE, layoutParsingFinishedEvent, m -> {
rabbitTemplate.convertAndSend(LayoutParsingQueueNames.LAYOUT_PARSING_RESPONSE_EXCHANGE, TenantContext.getTenantId(), layoutParsingFinishedEvent, m -> {
var forwardHeaders = message.getMessageProperties()
.getHeaders()
.entrySet()

View File

@ -0,0 +1,64 @@
package com.knecon.fforesight.service.layoutparser.server.queue;
import java.util.Set;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames;
import com.knecon.fforesight.tenantcommons.TenantProvider;
import com.knecon.fforesight.tenantcommons.model.TenantCreatedEvent;
import com.knecon.fforesight.tenantcommons.model.TenantQueueConfiguration;
import com.knecon.fforesight.tenantcommons.model.TenantResponse;
import com.knecon.fforesight.tenantcommons.queue.RabbitQueueFromExchangeService;
import com.knecon.fforesight.tenantcommons.queue.TenantExchangeMessageReceiver;
@Service
public class TenantExchangeMessageReceiverImpl extends TenantExchangeMessageReceiver {
public TenantExchangeMessageReceiverImpl(RabbitQueueFromExchangeService rabbitQueueService, TenantProvider tenantProvider) {
super(rabbitQueueService, tenantProvider);
}
@Override
protected Set<TenantQueueConfiguration> getTenantQueueConfigs() {
return Set.of(TenantQueueConfiguration.builder()
.listenerId(MessageHandler.LAYOUT_PARSING_REQUEST_LISTENER_ID)
.exchangeName(LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_EXCHANGE)
.queuePrefix(LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_QUEUE_PREFIX)
.dlqName(LayoutParsingQueueNames.LAYOUT_PARSING_DLQ)
.build());
}
@EventListener(ApplicationReadyEvent.class)
public void onApplicationReady() {
System.out.println("application ready invoked");
super.initializeQueues();
}
@RabbitHandler
@RabbitListener(queues = "#{tenantMessagingConfigurationImpl.getTenantCreatedQueueName()}")
public void reactToTenantCreation(TenantCreatedEvent tenantCreatedEvent) {
super.reactToTenantCreation(tenantCreatedEvent);
}
@RabbitHandler
@RabbitListener(queues = "#{tenantMessagingConfigurationImpl.getTenantDeletedQueueName()}")
public void reactToTenantDeletion(TenantResponse tenantResponse) {
super.reactToTenantDeletion(tenantResponse);
}
}

View File

@ -10,7 +10,9 @@ import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
@ -55,6 +57,12 @@ public class HeadlinesGoldStandardIntegrationTest {
@MockBean
private RabbitTemplate rabbitTemplate;
@MockBean
private RabbitAdmin rabbitAdmin;
@MockBean
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
@Autowired
private ObjectMapper objectMapper;
@ -70,13 +78,18 @@ public class HeadlinesGoldStandardIntegrationTest {
List<Metrics> metrics = new ArrayList<>();
metrics.add(getMetrics("files/syngenta/CustomerFiles/01 - CGA100251 - Acute Oral Toxicity (Up and Down Procedure) - Rat (1).pdf",
"files/headlineTest/01 - CGA100251 - Acute Oral Toxicity (Up and Down Procedure) - Rat (1)_REDACTION_LOG.json"));
"files/headlineTest/01 - CGA100251 - Acute Oral Toxicity (Up and Down Procedure) - Rat (1)_REDACTION_LOG.json"));
metrics.add(getMetrics("files/syngenta/CustomerFiles/91 Trinexapac-ethyl_RAR_01_Volume_1_2018-02-23.pdf",
"files/headlineTest/91 Trinexapac-ethyl_RAR_01_Volume_1_2018-02-23_REDACTION_LOG.json"));
metrics.add(getMetrics("files/syngenta/CustomerFiles/S-Metolachlor_RAR_01_Volume_1_2018-09-06.pdf", "files/headlineTest/S-Metolachlor_RAR_01_Volume_1_2018-09-06_REDACTION_LOG.json"));
"files/headlineTest/91 Trinexapac-ethyl_RAR_01_Volume_1_2018-02-23_REDACTION_LOG.json"));
metrics.add(getMetrics("files/syngenta/CustomerFiles/S-Metolachlor_RAR_01_Volume_1_2018-09-06.pdf",
"files/headlineTest/S-Metolachlor_RAR_01_Volume_1_2018-09-06_REDACTION_LOG.json"));
double precision = metrics.stream().mapToDouble(Metrics::getPrecision).average().orElse(1.0);
double recall = metrics.stream().mapToDouble(Metrics::getRecall).average().orElse(1.0);
double precision = metrics.stream()
.mapToDouble(Metrics::getPrecision).average()
.orElse(1.0);
double recall = metrics.stream()
.mapToDouble(Metrics::getRecall).average()
.orElse(1.0);
System.out.println("Precision is: " + precision + " recall is: " + recall);
@ -93,21 +106,28 @@ public class HeadlinesGoldStandardIntegrationTest {
Set<Headline> goldStandardHeadlines = new HashSet<>();
var goldStandardLog = objectMapper.readValue(redactionLogResource.getInputStream(), RedactionLog.class);
goldStandardLog.getRedactionLogEntry().removeIf(r -> !r.isRedacted() || r.getChanges().get(r.getChanges().size() - 1).getType().equals(ChangeType.REMOVED));
goldStandardLog.getRedactionLogEntry().forEach(e -> goldStandardHeadlines.add(new Headline(e.getPositions().get(0).getPage(), e.getValue())));
goldStandardLog.getRedactionLogEntry()
.removeIf(r -> !r.isRedacted() || r.getChanges()
.get(r.getChanges().size() - 1).getType().equals(ChangeType.REMOVED));
goldStandardLog.getRedactionLogEntry()
.forEach(e -> goldStandardHeadlines.add(new Headline(e.getPositions()
.get(0).getPage(), e.getValue())));
Document documentGraph = DocumentGraphFactory.buildDocumentGraph(LayoutParsingType.DOCUMINE,
layoutParsingPipeline.parseLayout(LayoutParsingType.DOCUMINE,
pdfFileResource.getFile(),
new ImageServiceResponse(),
new TableServiceResponse(),
new VisualLayoutParsingResponse(),
Map.of("file",filePath)));
pdfFileResource.getFile(),
new ImageServiceResponse(),
new TableServiceResponse(),
new VisualLayoutParsingResponse(),
Map.of("file", filePath)));
var foundHeadlines = documentGraph.streamAllSubNodes()
.map(SemanticNode::getHeadline)
.distinct()
.map(headlineNode -> new Headline(headlineNode.getPages().stream().findFirst().get().getNumber(), headlineNode.getTextBlock().getSearchText().stripTrailing()))
.map(headlineNode -> new Headline(headlineNode.getPages()
.stream()
.findFirst()
.get().getNumber(), headlineNode.getTextBlock().getSearchText().stripTrailing()))
.toList();
Set<Headline> correct = new HashSet<>();
@ -121,7 +141,9 @@ public class HeadlinesGoldStandardIntegrationTest {
}
}
missing = goldStandardHeadlines.stream().filter(h -> !correct.contains(h)).collect(Collectors.toSet());
missing = goldStandardHeadlines.stream()
.filter(h -> !correct.contains(h))
.collect(Collectors.toSet());
float precision = (float) correct.size() / (float) foundHeadlines.size();
float recall = (float) correct.size() / ((float) correct.size() + (float) missing.size());

View File

@ -10,7 +10,9 @@ import java.util.Optional;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
@ -53,6 +55,11 @@ public abstract class AbstractTest {
@MockBean
private RabbitTemplate rabbitTemplate;
@MockBean
private RabbitAdmin rabbitAdmin;
@MockBean
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
protected final static String ORIGIN_FILE_ID = "origin";
protected final static String VISUAL_LAYOUT_FILE = "visual";

View File

@ -39,3 +39,5 @@ management:
prometheus.enabled: ${monitoring.enabled:false}
health.enabled: true
endpoints.web.exposure.include: prometheus, health
POD_NAME: layoutparser-service