From 3b33405cbfa43ccd5922e592b2c1b3dbf11470bc Mon Sep 17 00:00:00 2001 From: Maverick Studer Date: Tue, 27 Aug 2024 09:27:37 +0200 Subject: [PATCH] RED-9331: Explore possibilities for fair upload / analysis processing per tenant --- .../api/queue/LayoutParsingQueueNames.java | 7 +- .../build.gradle.kts | 2 +- .../build.gradle.kts | 2 +- .../configuration/MessagingConfiguration.java | 36 +++++++++++ .../TenantMessagingConfigurationImpl.java | 11 ++++ .../server/queue/MessageHandler.java | 9 ++- .../TenantExchangeMessageReceiverImpl.java | 64 +++++++++++++++++++ .../HeadlinesGoldStandardIntegrationTest.java | 50 +++++++++++---- .../server/utils/AbstractTest.java | 7 ++ .../src/test/resources/application.yml | 2 + 10 files changed, 169 insertions(+), 21 deletions(-) create mode 100644 layoutparser-service/layoutparser-service-server/src/main/java/com/knecon/fforesight/service/layoutparser/server/configuration/MessagingConfiguration.java create mode 100644 layoutparser-service/layoutparser-service-server/src/main/java/com/knecon/fforesight/service/layoutparser/server/configuration/TenantMessagingConfigurationImpl.java create mode 100644 layoutparser-service/layoutparser-service-server/src/main/java/com/knecon/fforesight/service/layoutparser/server/queue/TenantExchangeMessageReceiverImpl.java diff --git a/layoutparser-service/layoutparser-service-internal-api/src/main/java/com/knecon/fforesight/service/layoutparser/internal/api/queue/LayoutParsingQueueNames.java b/layoutparser-service/layoutparser-service-internal-api/src/main/java/com/knecon/fforesight/service/layoutparser/internal/api/queue/LayoutParsingQueueNames.java index 081f5ea..962a564 100644 --- a/layoutparser-service/layoutparser-service-internal-api/src/main/java/com/knecon/fforesight/service/layoutparser/internal/api/queue/LayoutParsingQueueNames.java +++ b/layoutparser-service/layoutparser-service-internal-api/src/main/java/com/knecon/fforesight/service/layoutparser/internal/api/queue/LayoutParsingQueueNames.java @@ -2,6 +2,9 @@ package com.knecon.fforesight.service.layoutparser.internal.api.queue; public class LayoutParsingQueueNames { - public static final String LAYOUT_PARSING_REQUEST_QUEUE = "layout_parsing_request_queue"; - public static final String LAYOUT_PARSING_FINISHED_EVENT_QUEUE = "layout_parsing_response_queue"; + public static final String LAYOUT_PARSING_REQUEST_QUEUE_PREFIX = "layout_parsing_request_queue"; + public static final String LAYOUT_PARSING_REQUEST_EXCHANGE = "layout_parsing_request_exchange"; + public static final String LAYOUT_PARSING_RESPONSE_QUEUE_PREFIX = "layout_parsing_response_queue"; + public static final String LAYOUT_PARSING_RESPONSE_EXCHANGE = "layout_parsing_response_exchange"; + public static final String LAYOUT_PARSING_DLQ = "layout_parsing_dlq"; } diff --git a/layoutparser-service/layoutparser-service-processor/build.gradle.kts b/layoutparser-service/layoutparser-service-processor/build.gradle.kts index 4889e40..90e4212 100644 --- a/layoutparser-service/layoutparser-service-processor/build.gradle.kts +++ b/layoutparser-service/layoutparser-service-processor/build.gradle.kts @@ -16,7 +16,7 @@ dependencies { exclude("org.springframework.boot", "spring-boot-starter-security") exclude("org.springframework.boot", "spring-boot-starter-validation") } - implementation("com.knecon.fforesight:tenant-commons:0.21.0") + implementation("com.knecon.fforesight:tenant-commons:0.28.0") implementation("com.iqser.red.commons:storage-commons:2.45.0") implementation("org.apache.pdfbox:pdfbox:${pdfBoxVersion}") diff --git a/layoutparser-service/layoutparser-service-server/build.gradle.kts b/layoutparser-service/layoutparser-service-server/build.gradle.kts index 590c419..efac098 100644 --- a/layoutparser-service/layoutparser-service-server/build.gradle.kts +++ b/layoutparser-service/layoutparser-service-server/build.gradle.kts @@ -30,7 +30,7 @@ dependencies { implementation(project(":layoutparser-service-internal-api")) implementation("com.iqser.red.commons:storage-commons:2.45.0") - implementation("com.knecon.fforesight:tenant-commons:0.21.0") + implementation("com.knecon.fforesight:tenant-commons:0.28.0") implementation("com.knecon.fforesight:tracing-commons:0.5.0") implementation("com.knecon.fforesight:lifecycle-commons:0.6.0") implementation("org.springframework.boot:spring-boot-starter-actuator:${springBootStarterVersion}") diff --git a/layoutparser-service/layoutparser-service-server/src/main/java/com/knecon/fforesight/service/layoutparser/server/configuration/MessagingConfiguration.java b/layoutparser-service/layoutparser-service-server/src/main/java/com/knecon/fforesight/service/layoutparser/server/configuration/MessagingConfiguration.java new file mode 100644 index 0000000..daa589c --- /dev/null +++ b/layoutparser-service/layoutparser-service-server/src/main/java/com/knecon/fforesight/service/layoutparser/server/configuration/MessagingConfiguration.java @@ -0,0 +1,36 @@ +package com.knecon.fforesight.service.layoutparser.server.configuration; + +import static com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames.LAYOUT_PARSING_DLQ; + +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames; + +@Configuration +public class MessagingConfiguration { + + @Bean + public DirectExchange layoutParsingResponseExchange() { + + return new DirectExchange(LayoutParsingQueueNames.LAYOUT_PARSING_RESPONSE_EXCHANGE); + } + + + @Bean + public DirectExchange layoutParsingRequestExchange() { + + return new DirectExchange(LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_EXCHANGE); + } + + + @Bean + public Queue layoutParsingDLQ() { + + return QueueBuilder.durable(LAYOUT_PARSING_DLQ).build(); + } + +} diff --git a/layoutparser-service/layoutparser-service-server/src/main/java/com/knecon/fforesight/service/layoutparser/server/configuration/TenantMessagingConfigurationImpl.java b/layoutparser-service/layoutparser-service-server/src/main/java/com/knecon/fforesight/service/layoutparser/server/configuration/TenantMessagingConfigurationImpl.java new file mode 100644 index 0000000..9389a9b --- /dev/null +++ b/layoutparser-service/layoutparser-service-server/src/main/java/com/knecon/fforesight/service/layoutparser/server/configuration/TenantMessagingConfigurationImpl.java @@ -0,0 +1,11 @@ +package com.knecon.fforesight.service.layoutparser.server.configuration; + +import org.springframework.context.annotation.Configuration; + +import com.knecon.fforesight.tenantcommons.queue.TenantMessagingConfiguration; + +@Configuration +public class TenantMessagingConfigurationImpl extends TenantMessagingConfiguration { + + +} diff --git a/layoutparser-service/layoutparser-service-server/src/main/java/com/knecon/fforesight/service/layoutparser/server/queue/MessageHandler.java b/layoutparser-service/layoutparser-service-server/src/main/java/com/knecon/fforesight/service/layoutparser/server/queue/MessageHandler.java index e4c4d8f..596b0a4 100644 --- a/layoutparser-service/layoutparser-service-server/src/main/java/com/knecon/fforesight/service/layoutparser/server/queue/MessageHandler.java +++ b/layoutparser-service/layoutparser-service-server/src/main/java/com/knecon/fforesight/service/layoutparser/server/queue/MessageHandler.java @@ -18,6 +18,7 @@ import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsi import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingRequest; import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingType; import com.knecon.fforesight.service.layoutparser.processor.LayoutParsingPipeline; +import com.knecon.fforesight.tenantcommons.TenantContext; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; @@ -28,15 +29,17 @@ import lombok.extern.slf4j.Slf4j; @RequiredArgsConstructor public class MessageHandler { + public static final String LAYOUT_PARSING_REQUEST_LISTENER_ID = "layout-parsing-request-listener"; + private final LayoutParsingPipeline layoutParsingPipeline; private final ObjectMapper objectMapper; private final RabbitTemplate rabbitTemplate; private final static String X_PIPELINE_PREFIX = "X-PIPE-"; - @RabbitHandler - @RabbitListener(queues = LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_QUEUE) @SneakyThrows + @RabbitHandler + @RabbitListener(id = LAYOUT_PARSING_REQUEST_LISTENER_ID) public void receiveLayoutParsingRequest(Message message) { LayoutParsingRequest layoutParsingRequest = objectMapper.readValue(message.getBody(), LayoutParsingRequest.class); @@ -57,7 +60,7 @@ public class MessageHandler { public void sendLayoutParsingFinishedEvent(LayoutParsingFinishedEvent layoutParsingFinishedEvent, Message message) { Arrays.stream(layoutParsingFinishedEvent.message().split("\n")).forEach(log::info); - rabbitTemplate.convertAndSend(LayoutParsingQueueNames.LAYOUT_PARSING_FINISHED_EVENT_QUEUE, layoutParsingFinishedEvent, m -> { + rabbitTemplate.convertAndSend(LayoutParsingQueueNames.LAYOUT_PARSING_RESPONSE_EXCHANGE, TenantContext.getTenantId(), layoutParsingFinishedEvent, m -> { var forwardHeaders = message.getMessageProperties() .getHeaders() .entrySet() diff --git a/layoutparser-service/layoutparser-service-server/src/main/java/com/knecon/fforesight/service/layoutparser/server/queue/TenantExchangeMessageReceiverImpl.java b/layoutparser-service/layoutparser-service-server/src/main/java/com/knecon/fforesight/service/layoutparser/server/queue/TenantExchangeMessageReceiverImpl.java new file mode 100644 index 0000000..5fd8533 --- /dev/null +++ b/layoutparser-service/layoutparser-service-server/src/main/java/com/knecon/fforesight/service/layoutparser/server/queue/TenantExchangeMessageReceiverImpl.java @@ -0,0 +1,64 @@ +package com.knecon.fforesight.service.layoutparser.server.queue; + +import java.util.Set; + +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Service; + +import com.knecon.fforesight.service.layoutparser.internal.api.queue.LayoutParsingQueueNames; +import com.knecon.fforesight.tenantcommons.TenantProvider; +import com.knecon.fforesight.tenantcommons.model.TenantCreatedEvent; +import com.knecon.fforesight.tenantcommons.model.TenantQueueConfiguration; +import com.knecon.fforesight.tenantcommons.model.TenantResponse; +import com.knecon.fforesight.tenantcommons.queue.RabbitQueueFromExchangeService; +import com.knecon.fforesight.tenantcommons.queue.TenantExchangeMessageReceiver; + +@Service +public class TenantExchangeMessageReceiverImpl extends TenantExchangeMessageReceiver { + + public TenantExchangeMessageReceiverImpl(RabbitQueueFromExchangeService rabbitQueueService, TenantProvider tenantProvider) { + + super(rabbitQueueService, tenantProvider); + } + + + @Override + protected Set getTenantQueueConfigs() { + + return Set.of(TenantQueueConfiguration.builder() + .listenerId(MessageHandler.LAYOUT_PARSING_REQUEST_LISTENER_ID) + .exchangeName(LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_EXCHANGE) + .queuePrefix(LayoutParsingQueueNames.LAYOUT_PARSING_REQUEST_QUEUE_PREFIX) + .dlqName(LayoutParsingQueueNames.LAYOUT_PARSING_DLQ) + .build()); + } + + + @EventListener(ApplicationReadyEvent.class) + public void onApplicationReady() { + + System.out.println("application ready invoked"); + super.initializeQueues(); + } + + + @RabbitHandler + @RabbitListener(queues = "#{tenantMessagingConfigurationImpl.getTenantCreatedQueueName()}") + public void reactToTenantCreation(TenantCreatedEvent tenantCreatedEvent) { + + super.reactToTenantCreation(tenantCreatedEvent); + } + + + @RabbitHandler + @RabbitListener(queues = "#{tenantMessagingConfigurationImpl.getTenantDeletedQueueName()}") + public void reactToTenantDeletion(TenantResponse tenantResponse) { + + super.reactToTenantDeletion(tenantResponse); + + } + +} diff --git a/layoutparser-service/layoutparser-service-server/src/test/java/com/knecon/fforesight/service/layoutparser/server/HeadlinesGoldStandardIntegrationTest.java b/layoutparser-service/layoutparser-service-server/src/test/java/com/knecon/fforesight/service/layoutparser/server/HeadlinesGoldStandardIntegrationTest.java index c51c7ae..fb6d970 100644 --- a/layoutparser-service/layoutparser-service-server/src/test/java/com/knecon/fforesight/service/layoutparser/server/HeadlinesGoldStandardIntegrationTest.java +++ b/layoutparser-service/layoutparser-service-server/src/test/java/com/knecon/fforesight/service/layoutparser/server/HeadlinesGoldStandardIntegrationTest.java @@ -10,7 +10,9 @@ import java.util.stream.Collectors; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration; @@ -55,6 +57,12 @@ public class HeadlinesGoldStandardIntegrationTest { @MockBean private RabbitTemplate rabbitTemplate; + @MockBean + private RabbitAdmin rabbitAdmin; + + @MockBean + private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry; + @Autowired private ObjectMapper objectMapper; @@ -70,13 +78,18 @@ public class HeadlinesGoldStandardIntegrationTest { List metrics = new ArrayList<>(); metrics.add(getMetrics("files/syngenta/CustomerFiles/01 - CGA100251 - Acute Oral Toxicity (Up and Down Procedure) - Rat (1).pdf", - "files/headlineTest/01 - CGA100251 - Acute Oral Toxicity (Up and Down Procedure) - Rat (1)_REDACTION_LOG.json")); + "files/headlineTest/01 - CGA100251 - Acute Oral Toxicity (Up and Down Procedure) - Rat (1)_REDACTION_LOG.json")); metrics.add(getMetrics("files/syngenta/CustomerFiles/91 Trinexapac-ethyl_RAR_01_Volume_1_2018-02-23.pdf", - "files/headlineTest/91 Trinexapac-ethyl_RAR_01_Volume_1_2018-02-23_REDACTION_LOG.json")); - metrics.add(getMetrics("files/syngenta/CustomerFiles/S-Metolachlor_RAR_01_Volume_1_2018-09-06.pdf", "files/headlineTest/S-Metolachlor_RAR_01_Volume_1_2018-09-06_REDACTION_LOG.json")); + "files/headlineTest/91 Trinexapac-ethyl_RAR_01_Volume_1_2018-02-23_REDACTION_LOG.json")); + metrics.add(getMetrics("files/syngenta/CustomerFiles/S-Metolachlor_RAR_01_Volume_1_2018-09-06.pdf", + "files/headlineTest/S-Metolachlor_RAR_01_Volume_1_2018-09-06_REDACTION_LOG.json")); - double precision = metrics.stream().mapToDouble(Metrics::getPrecision).average().orElse(1.0); - double recall = metrics.stream().mapToDouble(Metrics::getRecall).average().orElse(1.0); + double precision = metrics.stream() + .mapToDouble(Metrics::getPrecision).average() + .orElse(1.0); + double recall = metrics.stream() + .mapToDouble(Metrics::getRecall).average() + .orElse(1.0); System.out.println("Precision is: " + precision + " recall is: " + recall); @@ -93,21 +106,28 @@ public class HeadlinesGoldStandardIntegrationTest { Set goldStandardHeadlines = new HashSet<>(); var goldStandardLog = objectMapper.readValue(redactionLogResource.getInputStream(), RedactionLog.class); - goldStandardLog.getRedactionLogEntry().removeIf(r -> !r.isRedacted() || r.getChanges().get(r.getChanges().size() - 1).getType().equals(ChangeType.REMOVED)); - goldStandardLog.getRedactionLogEntry().forEach(e -> goldStandardHeadlines.add(new Headline(e.getPositions().get(0).getPage(), e.getValue()))); + goldStandardLog.getRedactionLogEntry() + .removeIf(r -> !r.isRedacted() || r.getChanges() + .get(r.getChanges().size() - 1).getType().equals(ChangeType.REMOVED)); + goldStandardLog.getRedactionLogEntry() + .forEach(e -> goldStandardHeadlines.add(new Headline(e.getPositions() + .get(0).getPage(), e.getValue()))); Document documentGraph = DocumentGraphFactory.buildDocumentGraph(LayoutParsingType.DOCUMINE, layoutParsingPipeline.parseLayout(LayoutParsingType.DOCUMINE, - pdfFileResource.getFile(), - new ImageServiceResponse(), - new TableServiceResponse(), - new VisualLayoutParsingResponse(), - Map.of("file",filePath))); + pdfFileResource.getFile(), + new ImageServiceResponse(), + new TableServiceResponse(), + new VisualLayoutParsingResponse(), + Map.of("file", filePath))); var foundHeadlines = documentGraph.streamAllSubNodes() .map(SemanticNode::getHeadline) .distinct() - .map(headlineNode -> new Headline(headlineNode.getPages().stream().findFirst().get().getNumber(), headlineNode.getTextBlock().getSearchText().stripTrailing())) + .map(headlineNode -> new Headline(headlineNode.getPages() + .stream() + .findFirst() + .get().getNumber(), headlineNode.getTextBlock().getSearchText().stripTrailing())) .toList(); Set correct = new HashSet<>(); @@ -121,7 +141,9 @@ public class HeadlinesGoldStandardIntegrationTest { } } - missing = goldStandardHeadlines.stream().filter(h -> !correct.contains(h)).collect(Collectors.toSet()); + missing = goldStandardHeadlines.stream() + .filter(h -> !correct.contains(h)) + .collect(Collectors.toSet()); float precision = (float) correct.size() / (float) foundHeadlines.size(); float recall = (float) correct.size() / ((float) correct.size() + (float) missing.size()); diff --git a/layoutparser-service/layoutparser-service-server/src/test/java/com/knecon/fforesight/service/layoutparser/server/utils/AbstractTest.java b/layoutparser-service/layoutparser-service-server/src/test/java/com/knecon/fforesight/service/layoutparser/server/utils/AbstractTest.java index 961c015..8648c00 100644 --- a/layoutparser-service/layoutparser-service-server/src/test/java/com/knecon/fforesight/service/layoutparser/server/utils/AbstractTest.java +++ b/layoutparser-service/layoutparser-service-server/src/test/java/com/knecon/fforesight/service/layoutparser/server/utils/AbstractTest.java @@ -10,7 +10,9 @@ import java.util.Optional; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration; @@ -53,6 +55,11 @@ public abstract class AbstractTest { @MockBean private RabbitTemplate rabbitTemplate; + @MockBean + private RabbitAdmin rabbitAdmin; + + @MockBean + private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry; protected final static String ORIGIN_FILE_ID = "origin"; protected final static String VISUAL_LAYOUT_FILE = "visual"; diff --git a/layoutparser-service/layoutparser-service-server/src/test/resources/application.yml b/layoutparser-service/layoutparser-service-server/src/test/resources/application.yml index 10dcef7..f13863b 100644 --- a/layoutparser-service/layoutparser-service-server/src/test/resources/application.yml +++ b/layoutparser-service/layoutparser-service-server/src/test/resources/application.yml @@ -39,3 +39,5 @@ management: prometheus.enabled: ${monitoring.enabled:false} health.enabled: true endpoints.web.exposure.include: prometheus, health + +POD_NAME: layoutparser-service