Merge branch 'tenant-queues' into 'main'

Tenant queues

See merge request fforesight/llm-service!17
This commit is contained in:
Kilian Schüttler 2024-08-28 11:16:35 +02:00
commit 381c0a7e4b
11 changed files with 133 additions and 62 deletions

View File

@ -2,8 +2,10 @@ package com.knecon.fforesight.llm.service;
public class QueueNames {
public static final String LLM_NER_SERVICE_QUEUE = "llm_entity_request_queue";
public static final String LLM_NER_SERVICE_RESPONSE_QUEUE = "llm_entity_response_queue";
public static final String LLM_NER_SERVICE_DLQ = "llm_entity_dead_letter_queue";
public static final String LLM_NER_REQUEST_QUEUE_PREFIX = "llm_entity_request_queue";
public static final String LLM_NER_REQUEST_EXCHANGE = "llm_entity_request_exchange";
public static final String LLM_NER_RESPONSE_QUEUE_PREFIX = "llm_entity_response_queue";
public static final String LLM_NER_RESPONSE_EXCHANGE = "llm_entity_response_exchange";
public static final String LLM_NER_DLQ = "llm_entity_dlq";
}

View File

@ -13,10 +13,10 @@ public class SystemMessages {
The output should be strictly JSON format and nothing else, formatted as such:
```
{
PII: ["Jennifer Durando, BS", "01223 45678", "mimi.lang@smithcorp.com", "+44 (0)1252 392460"],
ADDRESS: ["Product Safety Labs 2394 US Highway 130 Dayton, NJ 08810 USA", "Syngenta Crop Protection, LLC 410 Swing Road Post Office Box 18300 Greensboro, NC 27419-8300 USA"]
COMPANY: ["Syngenta", "EFSA"]
COUNTRY: ["USA"]
"PII": ["Jennifer Durando, BS", "01223 45678", "mimi.lang@smithcorp.com", "+44 (0)1252 392460"],
"ADDRESS": ["Product Safety Labs 2394 US Highway 130 Dayton, NJ 08810 USA", "Syngenta Crop Protection, LLC 410 Swing Road Post Office Box 18300 Greensboro, NC 27419-8300 USA"]
"COMPANY": ["Syngenta", "EFSA"]
"COUNTRY": ["USA"]
}
```
Always replace linebreaks with whitespaces, but except that, ensure the entities match the text in the document exactly.

View File

@ -74,19 +74,20 @@ public class LlmNerService {
Document document = buildDocument(llmNerMessage);
ChunkingResponse chunks = readChunks(llmNerMessage.getChunksStorageId());
List<LlmNerEntity> allEntities = Collections.synchronizedList(new LinkedList<>());
log.info("Finished data prep for {}", llmNerMessage.getIdentifier());
List<CompletableFuture<EntitiesWithUsage>> entityFutures = chunks.getData()
.stream()
.map(chunk -> getLlmNerEntitiesFuture(chunk, document))
.toList();
log.info("Awaiting api calls for {}", llmNerMessage.getIdentifier());
for (CompletableFuture<EntitiesWithUsage> entityFuture : entityFutures) {
EntitiesWithUsage entitiesWithUsage = entityFuture.get();
allEntities.addAll(entitiesWithUsage.entities());
completionTokenCount += entitiesWithUsage.completionsUsage().getCompletionTokens();
promptTokenCount += entitiesWithUsage.completionsUsage().getPromptTokens();
}
log.info("Storing files for {}", llmNerMessage.getIdentifier());
storageService.storeJSONObject(TenantContext.getTenantId(), llmNerMessage.getResultStorageId(), new LlmNerEntities(allEntities));
long duration = System.currentTimeMillis() - start;
log.info("Found {} named entities for {} in {} with {} prompt tokens and {} completion tokens.",

View File

@ -2,7 +2,6 @@ package com.knecon.fforesight.llm.service.services;
import java.util.concurrent.Semaphore;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.azure.ai.openai.OpenAIAsyncClient;
@ -10,7 +9,7 @@ import com.azure.ai.openai.OpenAIClient;
import com.azure.ai.openai.OpenAIClientBuilder;
import com.azure.ai.openai.models.ChatCompletions;
import com.azure.ai.openai.models.ChatCompletionsOptions;
import com.azure.core.credential.AzureKeyCredential;
import com.azure.core.credential.KeyCredential;
import com.knecon.fforesight.llm.service.LlmServiceSettings;
import lombok.AccessLevel;
@ -29,12 +28,14 @@ public class LlmRessource {
Semaphore concurrencyLimitingSemaphore;
public LlmRessource(@Value("${llm-service.azureOpenAiEndpoint}") String azureEndpoint, @Value("${llm-service.azureOpenAiKey}") String azureKey, LlmServiceSettings settings) {
public LlmRessource(LlmServiceSettings settings) {
this.settings = settings;
this.concurrencyLimitingSemaphore = new Semaphore(settings.getConcurrency());
this.asyncClient = new OpenAIClientBuilder().credential(new AzureKeyCredential(azureKey)).endpoint(azureEndpoint).buildAsyncClient();
this.client = new OpenAIClientBuilder().credential(new AzureKeyCredential(azureKey)).endpoint(azureEndpoint).buildClient();
this.asyncClient = new OpenAIClientBuilder().credential(new KeyCredential(settings.getAzureOpenAiKey())).endpoint(settings.getAzureOpenAiEndpoint()).buildAsyncClient();
this.client = new OpenAIClientBuilder().credential(new KeyCredential(settings.getAzureOpenAiKey())).endpoint(settings.getAzureOpenAiEndpoint()).buildClient();
log.info("Initialized client for endpoint {} and key {}", settings.getAzureOpenAiEndpoint(), settings.getAzureOpenAiKey());
}
@ -50,13 +51,8 @@ public class LlmRessource {
concurrencyLimitingSemaphore.acquire();
ChatCompletions chatCompletions = client.getChatCompletions(settings.getModel(), options);
concurrencyLimitingSemaphore.release();
return chatCompletions;
}
public int getCurrentConcurrency() {
return settings.getConcurrency() - concurrencyLimitingSemaphore.availablePermits();
}
}

View File

@ -32,7 +32,8 @@ dependencies {
implementation("org.springframework.boot:spring-boot-starter-websocket:$springBootVersion")
implementation("org.springframework.security:spring-security-messaging:$springSecurityVersion")
implementation("com.iqser.red.commons:storage-commons:2.49.0")
implementation("com.knecon.fforesight:keycloak-commons:0.29.0")
implementation("com.knecon.fforesight:keycloak-commons:0.30.0")
implementation("com.knecon.fforesight:tenant-commons:0.28.0")
implementation("com.knecon.fforesight:swagger-commons:0.7.0")
implementation("ch.qos.logback:logback-classic")

View File

@ -1,8 +1,5 @@
package com.knecon.fforesight.llm.service.queue;
import static com.knecon.fforesight.llm.service.QueueNames.LLM_NER_SERVICE_QUEUE;
import static com.knecon.fforesight.llm.service.QueueNames.LLM_NER_SERVICE_RESPONSE_QUEUE;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
@ -13,11 +10,12 @@ import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.knecon.fforesight.llm.service.LlmNerMessage;
import com.knecon.fforesight.llm.service.LlmNerResponseMessage;
import com.knecon.fforesight.llm.service.QueueNames;
import com.knecon.fforesight.llm.service.services.LlmNerService;
import com.knecon.fforesight.tenantcommons.TenantContext;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.experimental.FieldDefaults;
import lombok.extern.slf4j.Slf4j;
@ -27,36 +25,44 @@ import lombok.extern.slf4j.Slf4j;
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
public class MessageHandler {
public static final String LLM_NER_REQUEST_LISTENER_ID = "llm-ner-request-listener";
LlmNerService llmNerService;
ObjectMapper mapper;
RabbitTemplate rabbitTemplate;
@RabbitHandler
@RabbitListener(queues = LLM_NER_SERVICE_QUEUE)
@RabbitListener(id = LLM_NER_REQUEST_LISTENER_ID, concurrency = "1")
public void receiveNerRequest(Message message) {
if (message.getMessageProperties().isRedelivered()) {
throw new AmqpRejectAndDontRequeueException("Redelivered OCR Request, aborting...");
throw new AmqpRejectAndDontRequeueException("Redelivered LLM NER Request, aborting...");
}
LlmNerMessage llmNerMessage = parseLlmNerMessage(message);
log.info("Starting NER with LLM for {}", llmNerMessage.getIdentifier());
LlmNerService.Usage usage = llmNerService.runNer(llmNerMessage);
LlmNerResponseMessage llmNerResponseMessage = new LlmNerResponseMessage(llmNerMessage.getIdentifier(),
usage.completionTokenCount(),
usage.promptTokenCount(),
usage.completionTokenCount(),
Math.toIntExact(usage.durationMillis()));
rabbitTemplate.convertAndSend(LLM_NER_SERVICE_RESPONSE_QUEUE, llmNerResponseMessage);
log.info("LLM NER finished for {}", llmNerMessage.getIdentifier());
rabbitTemplate.convertAndSend(QueueNames.LLM_NER_RESPONSE_EXCHANGE, TenantContext.getTenantId(), llmNerResponseMessage);
}
@SneakyThrows
private LlmNerMessage parseLlmNerMessage(Message message) {
return mapper.readValue(message.getBody(), LlmNerMessage.class);
try {
return mapper.readValue(message.getBody(), LlmNerMessage.class);
} catch (Exception e) {
log.error("Failed to parse LLM NER message:\n {}", new String(message.getBody()));
throw new RuntimeException(e);
}
}
}

View File

@ -1,8 +1,6 @@
package com.knecon.fforesight.llm.service.queue;
import static com.knecon.fforesight.llm.service.QueueNames.LLM_NER_SERVICE_DLQ;
import static com.knecon.fforesight.llm.service.QueueNames.LLM_NER_SERVICE_QUEUE;
import static com.knecon.fforesight.llm.service.QueueNames.LLM_NER_SERVICE_RESPONSE_QUEUE;
import static com.knecon.fforesight.llm.service.QueueNames.LLM_NER_DLQ;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
@ -15,27 +13,10 @@ import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class MessagingConfiguration {
@Bean
public Queue llmNerRequestQueue() {
return QueueBuilder.durable(LLM_NER_SERVICE_QUEUE).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", LLM_NER_SERVICE_DLQ).build();
}
@Bean
public Queue llmNerResponseQueue() {
return QueueBuilder.durable(LLM_NER_SERVICE_RESPONSE_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", LLM_NER_SERVICE_DLQ)
.build();
}
@Bean
public Queue llmNerResponseDLQ() {
return QueueBuilder.durable(LLM_NER_SERVICE_DLQ).build();
return QueueBuilder.durable(LLM_NER_DLQ).build();
}
}

View File

@ -0,0 +1,67 @@
package com.knecon.fforesight.llm.service.queue;
import java.util.Map;
import java.util.Set;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import com.knecon.fforesight.llm.service.QueueNames;
import com.knecon.fforesight.tenantcommons.TenantProvider;
import com.knecon.fforesight.tenantcommons.model.TenantCreatedEvent;
import com.knecon.fforesight.tenantcommons.model.TenantQueueConfiguration;
import com.knecon.fforesight.tenantcommons.model.TenantResponse;
import com.knecon.fforesight.tenantcommons.queue.RabbitQueueFromExchangeService;
import com.knecon.fforesight.tenantcommons.queue.TenantExchangeMessageReceiver;
@Service
public class TenantExchangeMessageReceiverImpl extends TenantExchangeMessageReceiver {
public TenantExchangeMessageReceiverImpl(RabbitQueueFromExchangeService rabbitQueueService, TenantProvider tenantProvider) {
super(rabbitQueueService, tenantProvider);
}
@Override
protected Set<TenantQueueConfiguration> getTenantQueueConfigs() {
return Set.of(TenantQueueConfiguration.builder()
.listenerId(MessageHandler.LLM_NER_REQUEST_LISTENER_ID)
.exchangeName(QueueNames.LLM_NER_REQUEST_EXCHANGE)
.queuePrefix(QueueNames.LLM_NER_REQUEST_QUEUE_PREFIX)
.dlqName(QueueNames.LLM_NER_DLQ)
.arguments(Map.of("x-max-priority", 2))
.build());
}
@EventListener(ApplicationReadyEvent.class)
public void onApplicationReady() {
System.out.println("application ready invoked");
super.initializeQueues();
}
@RabbitHandler
@RabbitListener(queues = "#{tenantMessagingConfigurationImpl.getTenantCreatedQueueName()}")
public void reactToTenantCreation(TenantCreatedEvent tenantCreatedEvent) {
super.reactToTenantCreation(tenantCreatedEvent);
}
@RabbitHandler
@RabbitListener(queues = "#{tenantMessagingConfigurationImpl.getTenantDeletedQueueName()}")
public void reactToTenantDeletion(TenantResponse tenantResponse) {
super.reactToTenantDeletion(tenantResponse);
}
}

View File

@ -0,0 +1,11 @@
package com.knecon.fforesight.llm.service.queue;
import org.springframework.context.annotation.Configuration;
import com.knecon.fforesight.tenantcommons.queue.TenantMessagingConfiguration;
@Configuration
public class TenantMessagingConfigurationImpl extends TenantMessagingConfiguration {
}

View File

@ -6,10 +6,11 @@ import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
import org.springframework.boot.test.autoconfigure.actuate.observability.AutoConfigureObservability;
import org.springframework.boot.test.context.SpringBootTest;
@ -27,12 +28,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.iqser.red.storage.commons.StorageAutoConfiguration;
import com.iqser.red.storage.commons.service.StorageService;
import com.iqser.red.storage.commons.utils.FileSystemBackedStorageService;
import com.knecon.fforesight.keycloakcommons.DefaultKeyCloakCommonsAutoConfiguration;
import com.knecon.fforesight.swaggercommons.SpringDocAutoConfiguration;
import com.knecon.fforesight.tenantcommons.MultiTenancyAutoConfiguration;
import com.knecon.fforesight.tenantcommons.TenantContext;
import com.knecon.fforesight.tenantcommons.TenantsClient;
import com.knecon.fforesight.tenantcommons.model.TenantResponse;
import com.knecon.fforesight.tenantcommons.queue.TenantMessagingConfiguration;
@ComponentScan
@ExtendWith(SpringExtension.class)
@ -47,16 +46,22 @@ public abstract class AbstractLlmServiceIntegrationTest {
protected StorageService storageService;
@MockBean
TenantsClient tenantsClient;
@Autowired
ObjectMapper objectMapper;
@MockBean
RabbitTemplate rabbitTemplate;
@MockBean
RabbitAdmin rabbitAdmin;
@MockBean
RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
@MockBean
TenantMessagingConfiguration tenantMessagingConfiguration;
@BeforeEach
public void setupOptimize() {
var tenant = TenantResponse.builder()
.tenantId(TEST_TENANT)
.build();
var tenant = TenantResponse.builder().tenantId(TEST_TENANT).build();
TenantContext.setTenantId(TEST_TENANT);
@ -68,7 +73,6 @@ public abstract class AbstractLlmServiceIntegrationTest {
@SuppressWarnings("PMD.TestClassWithoutTestCases")
@Configuration
@EnableAutoConfiguration(exclude = {RabbitAutoConfiguration.class})
@ImportAutoConfiguration({StorageAutoConfiguration.class, MultiTenancyAutoConfiguration.class, SpringDocAutoConfiguration.class, DefaultKeyCloakCommonsAutoConfiguration.class})
@ComponentScan(excludeFilters = {@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = StorageAutoConfiguration.class)})
public static class TestConfiguration {

View File

@ -4,6 +4,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.Set;
import org.junit.jupiter.api.Disabled;
@ -33,7 +34,7 @@ public class LlmNerServiceTest extends AbstractLlmServiceIntegrationTest {
@SneakyThrows
public void testLlmNer() {
Path folder = Path.of("/home/kschuettler/Downloads/New Folder (2)/2f4cc06f-d941-4f87-8928-b5d8a9476387/75ecec8c698f561c91d1a3e9f81dad7c");
Path folder = Path.of("/home/kschuettler/Downloads/New Folder (5)/18299ec0-7659-496a-a44a-194bbffb1700/1fb7d49ae389469f60db516cf81a3510");
LlmNerMessage message = prepStorage(folder);
llmNerService.runNer(message);
Path tmpFile = Path.of("tmp", "AAA_LLM_ENTITIES", "entities.json");
@ -71,6 +72,7 @@ public class LlmNerServiceTest extends AbstractLlmServiceIntegrationTest {
private static LlmNerMessage buildMessage(Path folder) {
return LlmNerMessage.builder()
.identifier(Map.of("file", folder.getFileName().toString()))
.chunksStorageId(folder + DOCUMENT_CHUNKS)
.documentPagesStorageId(folder + DOCUMENT_PAGES)
.documentTextStorageId(folder + DOCUMENT_TEXT)