diff --git a/llm-service/llm-service-api/src/main/java/com/knecon/fforesight/llm/service/QueueNames.java b/llm-service/llm-service-api/src/main/java/com/knecon/fforesight/llm/service/QueueNames.java index 8517b83..423b640 100644 --- a/llm-service/llm-service-api/src/main/java/com/knecon/fforesight/llm/service/QueueNames.java +++ b/llm-service/llm-service-api/src/main/java/com/knecon/fforesight/llm/service/QueueNames.java @@ -2,8 +2,10 @@ package com.knecon.fforesight.llm.service; public class QueueNames { - public static final String LLM_NER_SERVICE_QUEUE = "llm_entity_request_queue"; - public static final String LLM_NER_SERVICE_RESPONSE_QUEUE = "llm_entity_response_queue"; - public static final String LLM_NER_SERVICE_DLQ = "llm_entity_dead_letter_queue"; + public static final String LLM_NER_REQUEST_QUEUE_PREFIX = "llm_entity_request_queue"; + public static final String LLM_NER_REQUEST_EXCHANGE = "llm_entity_request_exchange"; + public static final String LLM_NER_RESPONSE_QUEUE_PREFIX = "llm_entity_response_queue"; + public static final String LLM_NER_RESPONSE_EXCHANGE = "llm_entity_response_exchange"; + public static final String LLM_NER_DLQ = "llm_entity_dlq"; } diff --git a/llm-service/llm-service-processor/src/main/java/com/knecon/fforesight/llm/service/SystemMessages.java b/llm-service/llm-service-processor/src/main/java/com/knecon/fforesight/llm/service/SystemMessages.java index f8f4094..fbad384 100644 --- a/llm-service/llm-service-processor/src/main/java/com/knecon/fforesight/llm/service/SystemMessages.java +++ b/llm-service/llm-service-processor/src/main/java/com/knecon/fforesight/llm/service/SystemMessages.java @@ -13,10 +13,10 @@ public class SystemMessages { The output should be strictly JSON format and nothing else, formatted as such: ``` { - PII: ["Jennifer Durando, BS", "01223 45678", "mimi.lang@smithcorp.com", "+44 (0)1252 392460"], - ADDRESS: ["Product Safety Labs 2394 US Highway 130 Dayton, NJ 08810 USA", "Syngenta Crop Protection, LLC 410 Swing Road Post Office Box 18300 Greensboro, NC 27419-8300 USA"] - COMPANY: ["Syngenta", "EFSA"] - COUNTRY: ["USA"] + "PII": ["Jennifer Durando, BS", "01223 45678", "mimi.lang@smithcorp.com", "+44 (0)1252 392460"], + "ADDRESS": ["Product Safety Labs 2394 US Highway 130 Dayton, NJ 08810 USA", "Syngenta Crop Protection, LLC 410 Swing Road Post Office Box 18300 Greensboro, NC 27419-8300 USA"] + "COMPANY": ["Syngenta", "EFSA"] + "COUNTRY": ["USA"] } ``` Always replace linebreaks with whitespaces, but except that, ensure the entities match the text in the document exactly. diff --git a/llm-service/llm-service-processor/src/main/java/com/knecon/fforesight/llm/service/services/LlmNerService.java b/llm-service/llm-service-processor/src/main/java/com/knecon/fforesight/llm/service/services/LlmNerService.java index 7aff33f..3501ca2 100644 --- a/llm-service/llm-service-processor/src/main/java/com/knecon/fforesight/llm/service/services/LlmNerService.java +++ b/llm-service/llm-service-processor/src/main/java/com/knecon/fforesight/llm/service/services/LlmNerService.java @@ -74,19 +74,20 @@ public class LlmNerService { Document document = buildDocument(llmNerMessage); ChunkingResponse chunks = readChunks(llmNerMessage.getChunksStorageId()); List allEntities = Collections.synchronizedList(new LinkedList<>()); - + log.info("Finished data prep for {}", llmNerMessage.getIdentifier()); List> entityFutures = chunks.getData() .stream() .map(chunk -> getLlmNerEntitiesFuture(chunk, document)) .toList(); + log.info("Awaiting api calls for {}", llmNerMessage.getIdentifier()); for (CompletableFuture entityFuture : entityFutures) { EntitiesWithUsage entitiesWithUsage = entityFuture.get(); allEntities.addAll(entitiesWithUsage.entities()); completionTokenCount += entitiesWithUsage.completionsUsage().getCompletionTokens(); promptTokenCount += entitiesWithUsage.completionsUsage().getPromptTokens(); } - + log.info("Storing files for {}", llmNerMessage.getIdentifier()); storageService.storeJSONObject(TenantContext.getTenantId(), llmNerMessage.getResultStorageId(), new LlmNerEntities(allEntities)); long duration = System.currentTimeMillis() - start; log.info("Found {} named entities for {} in {} with {} prompt tokens and {} completion tokens.", diff --git a/llm-service/llm-service-processor/src/main/java/com/knecon/fforesight/llm/service/services/LlmRessource.java b/llm-service/llm-service-processor/src/main/java/com/knecon/fforesight/llm/service/services/LlmRessource.java index ba5ea33..95fca32 100644 --- a/llm-service/llm-service-processor/src/main/java/com/knecon/fforesight/llm/service/services/LlmRessource.java +++ b/llm-service/llm-service-processor/src/main/java/com/knecon/fforesight/llm/service/services/LlmRessource.java @@ -2,7 +2,6 @@ package com.knecon.fforesight.llm.service.services; import java.util.concurrent.Semaphore; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import com.azure.ai.openai.OpenAIAsyncClient; @@ -10,7 +9,7 @@ import com.azure.ai.openai.OpenAIClient; import com.azure.ai.openai.OpenAIClientBuilder; import com.azure.ai.openai.models.ChatCompletions; import com.azure.ai.openai.models.ChatCompletionsOptions; -import com.azure.core.credential.AzureKeyCredential; +import com.azure.core.credential.KeyCredential; import com.knecon.fforesight.llm.service.LlmServiceSettings; import lombok.AccessLevel; @@ -29,12 +28,14 @@ public class LlmRessource { Semaphore concurrencyLimitingSemaphore; - public LlmRessource(@Value("${llm-service.azureOpenAiEndpoint}") String azureEndpoint, @Value("${llm-service.azureOpenAiKey}") String azureKey, LlmServiceSettings settings) { + public LlmRessource(LlmServiceSettings settings) { this.settings = settings; this.concurrencyLimitingSemaphore = new Semaphore(settings.getConcurrency()); - this.asyncClient = new OpenAIClientBuilder().credential(new AzureKeyCredential(azureKey)).endpoint(azureEndpoint).buildAsyncClient(); - this.client = new OpenAIClientBuilder().credential(new AzureKeyCredential(azureKey)).endpoint(azureEndpoint).buildClient(); + this.asyncClient = new OpenAIClientBuilder().credential(new KeyCredential(settings.getAzureOpenAiKey())).endpoint(settings.getAzureOpenAiEndpoint()).buildAsyncClient(); + this.client = new OpenAIClientBuilder().credential(new KeyCredential(settings.getAzureOpenAiKey())).endpoint(settings.getAzureOpenAiEndpoint()).buildClient(); + + log.info("Initialized client for endpoint {} and key {}", settings.getAzureOpenAiEndpoint(), settings.getAzureOpenAiKey()); } @@ -50,13 +51,8 @@ public class LlmRessource { concurrencyLimitingSemaphore.acquire(); ChatCompletions chatCompletions = client.getChatCompletions(settings.getModel(), options); concurrencyLimitingSemaphore.release(); + return chatCompletions; } - - public int getCurrentConcurrency() { - - return settings.getConcurrency() - concurrencyLimitingSemaphore.availablePermits(); - } - } diff --git a/llm-service/llm-service-server/build.gradle.kts b/llm-service/llm-service-server/build.gradle.kts index 91c4ed3..0982241 100644 --- a/llm-service/llm-service-server/build.gradle.kts +++ b/llm-service/llm-service-server/build.gradle.kts @@ -32,7 +32,8 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-websocket:$springBootVersion") implementation("org.springframework.security:spring-security-messaging:$springSecurityVersion") implementation("com.iqser.red.commons:storage-commons:2.49.0") - implementation("com.knecon.fforesight:keycloak-commons:0.29.0") + implementation("com.knecon.fforesight:keycloak-commons:0.30.0") + implementation("com.knecon.fforesight:tenant-commons:0.28.0") implementation("com.knecon.fforesight:swagger-commons:0.7.0") implementation("ch.qos.logback:logback-classic") diff --git a/llm-service/llm-service-server/src/main/java/com/knecon/fforesight/llm/service/queue/MessageHandler.java b/llm-service/llm-service-server/src/main/java/com/knecon/fforesight/llm/service/queue/MessageHandler.java index 67020c5..e856368 100644 --- a/llm-service/llm-service-server/src/main/java/com/knecon/fforesight/llm/service/queue/MessageHandler.java +++ b/llm-service/llm-service-server/src/main/java/com/knecon/fforesight/llm/service/queue/MessageHandler.java @@ -1,8 +1,5 @@ package com.knecon.fforesight.llm.service.queue; -import static com.knecon.fforesight.llm.service.QueueNames.LLM_NER_SERVICE_QUEUE; -import static com.knecon.fforesight.llm.service.QueueNames.LLM_NER_SERVICE_RESPONSE_QUEUE; - import org.springframework.amqp.AmqpRejectAndDontRequeueException; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; @@ -13,11 +10,12 @@ import org.springframework.stereotype.Service; import com.fasterxml.jackson.databind.ObjectMapper; import com.knecon.fforesight.llm.service.LlmNerMessage; import com.knecon.fforesight.llm.service.LlmNerResponseMessage; +import com.knecon.fforesight.llm.service.QueueNames; import com.knecon.fforesight.llm.service.services.LlmNerService; +import com.knecon.fforesight.tenantcommons.TenantContext; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; import lombok.experimental.FieldDefaults; import lombok.extern.slf4j.Slf4j; @@ -27,36 +25,44 @@ import lombok.extern.slf4j.Slf4j; @FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) public class MessageHandler { + public static final String LLM_NER_REQUEST_LISTENER_ID = "llm-ner-request-listener"; + LlmNerService llmNerService; ObjectMapper mapper; RabbitTemplate rabbitTemplate; @RabbitHandler - @RabbitListener(queues = LLM_NER_SERVICE_QUEUE) + @RabbitListener(id = LLM_NER_REQUEST_LISTENER_ID, concurrency = "1") public void receiveNerRequest(Message message) { if (message.getMessageProperties().isRedelivered()) { - throw new AmqpRejectAndDontRequeueException("Redelivered OCR Request, aborting..."); + throw new AmqpRejectAndDontRequeueException("Redelivered LLM NER Request, aborting..."); } LlmNerMessage llmNerMessage = parseLlmNerMessage(message); + log.info("Starting NER with LLM for {}", llmNerMessage.getIdentifier()); + LlmNerService.Usage usage = llmNerService.runNer(llmNerMessage); LlmNerResponseMessage llmNerResponseMessage = new LlmNerResponseMessage(llmNerMessage.getIdentifier(), - usage.completionTokenCount(), usage.promptTokenCount(), + usage.completionTokenCount(), Math.toIntExact(usage.durationMillis())); - - rabbitTemplate.convertAndSend(LLM_NER_SERVICE_RESPONSE_QUEUE, llmNerResponseMessage); + log.info("LLM NER finished for {}", llmNerMessage.getIdentifier()); + rabbitTemplate.convertAndSend(QueueNames.LLM_NER_RESPONSE_EXCHANGE, TenantContext.getTenantId(), llmNerResponseMessage); } - @SneakyThrows private LlmNerMessage parseLlmNerMessage(Message message) { - return mapper.readValue(message.getBody(), LlmNerMessage.class); + try { + return mapper.readValue(message.getBody(), LlmNerMessage.class); + } catch (Exception e) { + log.error("Failed to parse LLM NER message:\n {}", new String(message.getBody())); + throw new RuntimeException(e); + } } } diff --git a/llm-service/llm-service-server/src/main/java/com/knecon/fforesight/llm/service/queue/MessagingConfiguration.java b/llm-service/llm-service-server/src/main/java/com/knecon/fforesight/llm/service/queue/MessagingConfiguration.java index 40c6aba..ebdaa9c 100644 --- a/llm-service/llm-service-server/src/main/java/com/knecon/fforesight/llm/service/queue/MessagingConfiguration.java +++ b/llm-service/llm-service-server/src/main/java/com/knecon/fforesight/llm/service/queue/MessagingConfiguration.java @@ -1,8 +1,6 @@ package com.knecon.fforesight.llm.service.queue; -import static com.knecon.fforesight.llm.service.QueueNames.LLM_NER_SERVICE_DLQ; -import static com.knecon.fforesight.llm.service.QueueNames.LLM_NER_SERVICE_QUEUE; -import static com.knecon.fforesight.llm.service.QueueNames.LLM_NER_SERVICE_RESPONSE_QUEUE; +import static com.knecon.fforesight.llm.service.QueueNames.LLM_NER_DLQ; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; @@ -15,27 +13,10 @@ import lombok.RequiredArgsConstructor; @RequiredArgsConstructor public class MessagingConfiguration { - @Bean - public Queue llmNerRequestQueue() { - - return QueueBuilder.durable(LLM_NER_SERVICE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", LLM_NER_SERVICE_DLQ).build(); - } - - - @Bean - public Queue llmNerResponseQueue() { - - return QueueBuilder.durable(LLM_NER_SERVICE_RESPONSE_QUEUE) - .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", LLM_NER_SERVICE_DLQ) - .build(); - } - - @Bean public Queue llmNerResponseDLQ() { - return QueueBuilder.durable(LLM_NER_SERVICE_DLQ).build(); + return QueueBuilder.durable(LLM_NER_DLQ).build(); } } diff --git a/llm-service/llm-service-server/src/main/java/com/knecon/fforesight/llm/service/queue/TenantExchangeMessageReceiverImpl.java b/llm-service/llm-service-server/src/main/java/com/knecon/fforesight/llm/service/queue/TenantExchangeMessageReceiverImpl.java new file mode 100644 index 0000000..18fe904 --- /dev/null +++ b/llm-service/llm-service-server/src/main/java/com/knecon/fforesight/llm/service/queue/TenantExchangeMessageReceiverImpl.java @@ -0,0 +1,67 @@ +package com.knecon.fforesight.llm.service.queue; + +import java.util.Map; +import java.util.Set; + +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Service; + +import com.knecon.fforesight.llm.service.QueueNames; +import com.knecon.fforesight.tenantcommons.TenantProvider; +import com.knecon.fforesight.tenantcommons.model.TenantCreatedEvent; +import com.knecon.fforesight.tenantcommons.model.TenantQueueConfiguration; +import com.knecon.fforesight.tenantcommons.model.TenantResponse; +import com.knecon.fforesight.tenantcommons.queue.RabbitQueueFromExchangeService; +import com.knecon.fforesight.tenantcommons.queue.TenantExchangeMessageReceiver; + +@Service +public class TenantExchangeMessageReceiverImpl extends TenantExchangeMessageReceiver { + + public TenantExchangeMessageReceiverImpl(RabbitQueueFromExchangeService rabbitQueueService, TenantProvider tenantProvider) { + + super(rabbitQueueService, tenantProvider); + } + + + @Override + protected Set getTenantQueueConfigs() { + + return Set.of(TenantQueueConfiguration.builder() + .listenerId(MessageHandler.LLM_NER_REQUEST_LISTENER_ID) + .exchangeName(QueueNames.LLM_NER_REQUEST_EXCHANGE) + .queuePrefix(QueueNames.LLM_NER_REQUEST_QUEUE_PREFIX) + .dlqName(QueueNames.LLM_NER_DLQ) + .arguments(Map.of("x-max-priority", 2)) + .build()); + } + + + @EventListener(ApplicationReadyEvent.class) + public void onApplicationReady() { + + System.out.println("application ready invoked"); + super.initializeQueues(); + } + + + @RabbitHandler + @RabbitListener(queues = "#{tenantMessagingConfigurationImpl.getTenantCreatedQueueName()}") + public void reactToTenantCreation(TenantCreatedEvent tenantCreatedEvent) { + + super.reactToTenantCreation(tenantCreatedEvent); + } + + + @RabbitHandler + @RabbitListener(queues = "#{tenantMessagingConfigurationImpl.getTenantDeletedQueueName()}") + public void reactToTenantDeletion(TenantResponse tenantResponse) { + + super.reactToTenantDeletion(tenantResponse); + + } + +} + diff --git a/llm-service/llm-service-server/src/main/java/com/knecon/fforesight/llm/service/queue/TenantMessagingConfigurationImpl.java b/llm-service/llm-service-server/src/main/java/com/knecon/fforesight/llm/service/queue/TenantMessagingConfigurationImpl.java new file mode 100644 index 0000000..631fa25 --- /dev/null +++ b/llm-service/llm-service-server/src/main/java/com/knecon/fforesight/llm/service/queue/TenantMessagingConfigurationImpl.java @@ -0,0 +1,11 @@ +package com.knecon.fforesight.llm.service.queue; + +import org.springframework.context.annotation.Configuration; + +import com.knecon.fforesight.tenantcommons.queue.TenantMessagingConfiguration; + +@Configuration +public class TenantMessagingConfigurationImpl extends TenantMessagingConfiguration { + + +} \ No newline at end of file diff --git a/llm-service/llm-service-server/src/test/java/com/knecon/fforesight/llm/service/AbstractLlmServiceIntegrationTest.java b/llm-service/llm-service-server/src/test/java/com/knecon/fforesight/llm/service/AbstractLlmServiceIntegrationTest.java index 8f6bcb9..1e448f7 100644 --- a/llm-service/llm-service-server/src/test/java/com/knecon/fforesight/llm/service/AbstractLlmServiceIntegrationTest.java +++ b/llm-service/llm-service-server/src/test/java/com/knecon/fforesight/llm/service/AbstractLlmServiceIntegrationTest.java @@ -6,10 +6,11 @@ import java.util.List; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration; import org.springframework.boot.test.autoconfigure.actuate.observability.AutoConfigureObservability; import org.springframework.boot.test.context.SpringBootTest; @@ -27,12 +28,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.iqser.red.storage.commons.StorageAutoConfiguration; import com.iqser.red.storage.commons.service.StorageService; import com.iqser.red.storage.commons.utils.FileSystemBackedStorageService; -import com.knecon.fforesight.keycloakcommons.DefaultKeyCloakCommonsAutoConfiguration; -import com.knecon.fforesight.swaggercommons.SpringDocAutoConfiguration; -import com.knecon.fforesight.tenantcommons.MultiTenancyAutoConfiguration; import com.knecon.fforesight.tenantcommons.TenantContext; import com.knecon.fforesight.tenantcommons.TenantsClient; import com.knecon.fforesight.tenantcommons.model.TenantResponse; +import com.knecon.fforesight.tenantcommons.queue.TenantMessagingConfiguration; @ComponentScan @ExtendWith(SpringExtension.class) @@ -47,16 +46,22 @@ public abstract class AbstractLlmServiceIntegrationTest { protected StorageService storageService; @MockBean TenantsClient tenantsClient; + @Autowired + ObjectMapper objectMapper; @MockBean RabbitTemplate rabbitTemplate; + @MockBean + RabbitAdmin rabbitAdmin; + @MockBean + RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry; + @MockBean + TenantMessagingConfiguration tenantMessagingConfiguration; @BeforeEach public void setupOptimize() { - var tenant = TenantResponse.builder() - .tenantId(TEST_TENANT) - .build(); + var tenant = TenantResponse.builder().tenantId(TEST_TENANT).build(); TenantContext.setTenantId(TEST_TENANT); @@ -68,7 +73,6 @@ public abstract class AbstractLlmServiceIntegrationTest { @SuppressWarnings("PMD.TestClassWithoutTestCases") @Configuration @EnableAutoConfiguration(exclude = {RabbitAutoConfiguration.class}) - @ImportAutoConfiguration({StorageAutoConfiguration.class, MultiTenancyAutoConfiguration.class, SpringDocAutoConfiguration.class, DefaultKeyCloakCommonsAutoConfiguration.class}) @ComponentScan(excludeFilters = {@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = StorageAutoConfiguration.class)}) public static class TestConfiguration { diff --git a/llm-service/llm-service-server/src/test/java/com/knecon/fforesight/llm/service/LlmNerServiceTest.java b/llm-service/llm-service-server/src/test/java/com/knecon/fforesight/llm/service/LlmNerServiceTest.java index 565c4c1..620171f 100644 --- a/llm-service/llm-service-server/src/test/java/com/knecon/fforesight/llm/service/LlmNerServiceTest.java +++ b/llm-service/llm-service-server/src/test/java/com/knecon/fforesight/llm/service/LlmNerServiceTest.java @@ -4,6 +4,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Map; import java.util.Set; import org.junit.jupiter.api.Disabled; @@ -33,7 +34,7 @@ public class LlmNerServiceTest extends AbstractLlmServiceIntegrationTest { @SneakyThrows public void testLlmNer() { - Path folder = Path.of("/home/kschuettler/Downloads/New Folder (2)/2f4cc06f-d941-4f87-8928-b5d8a9476387/75ecec8c698f561c91d1a3e9f81dad7c"); + Path folder = Path.of("/home/kschuettler/Downloads/New Folder (5)/18299ec0-7659-496a-a44a-194bbffb1700/1fb7d49ae389469f60db516cf81a3510"); LlmNerMessage message = prepStorage(folder); llmNerService.runNer(message); Path tmpFile = Path.of("tmp", "AAA_LLM_ENTITIES", "entities.json"); @@ -71,6 +72,7 @@ public class LlmNerServiceTest extends AbstractLlmServiceIntegrationTest { private static LlmNerMessage buildMessage(Path folder) { return LlmNerMessage.builder() + .identifier(Map.of("file", folder.getFileName().toString())) .chunksStorageId(folder + DOCUMENT_CHUNKS) .documentPagesStorageId(folder + DOCUMENT_PAGES) .documentTextStorageId(folder + DOCUMENT_TEXT)