Merge branch 'RED-9500' into 'master'

RED-9500: Use Redis pub/sub to syncronize websocket topic messages accross multiple pods

Closes RED-9500

See merge request redactmanager/redaction-service!455
This commit is contained in:
Dominique Eifländer 2024-07-02 15:53:51 +02:00
commit 71b2dda7cd
13 changed files with 144 additions and 12 deletions

View File

@ -0,0 +1,42 @@
package com.iqser.red.service.redaction.v1.server.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iqser.red.service.redaction.v1.server.service.websocket.RedisPubsubReceiver;
import lombok.RequiredArgsConstructor;
@Configuration
@RequiredArgsConstructor
public class RedisPubsubConfiguration {
private final SimpMessagingTemplate template;
private final ObjectMapper mapper;
private final RedisConnectionFactory connectionFactory;
@Bean
public RedisPubsubReceiver redisPubsubReceiver() {
return new RedisPubsubReceiver(template, mapper);
}
@Bean
public MessageListenerAdapter redisPubsubListenerAdapter() {
return new MessageListenerAdapter(redisPubsubReceiver(), "receiveMessage");
}
@Bean
public RedisMessageListenerContainer redisPubsubContainer() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(redisPubsubListenerAdapter(), new PatternTopic("redaction-service-websocket-messages"));
return container;
}
}

View File

@ -3,10 +3,9 @@ package com.iqser.red.service.redaction.v1.server.logger;
import java.time.OffsetDateTime;
import java.util.regex.Pattern;
import com.iqser.red.service.redaction.v1.server.service.WebSocketService;
import com.iqser.red.service.redaction.v1.server.service.websocket.WebSocketService;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
@RequiredArgsConstructor
public class RulesLogger {

View File

@ -9,8 +9,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.drools.model.Global;
import org.kie.api.definition.KiePackage;
import org.kie.api.runtime.KieContainer;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.rule.QueryResults;
@ -29,7 +27,7 @@ import com.iqser.red.service.redaction.v1.server.logger.RulesLogger;
import com.iqser.red.service.redaction.v1.server.model.component.Component;
import com.iqser.red.service.redaction.v1.server.model.component.Entity;
import com.iqser.red.service.redaction.v1.server.model.document.nodes.Document;
import com.iqser.red.service.redaction.v1.server.service.WebSocketService;
import com.iqser.red.service.redaction.v1.server.service.websocket.WebSocketService;
import com.iqser.red.service.redaction.v1.server.service.components.ComponentMappingMemoryCache;
import com.iqser.red.service.redaction.v1.server.service.components.ComponentMappingService;
import com.iqser.red.service.redaction.v1.server.service.document.ComponentComparator;

View File

@ -10,8 +10,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.drools.model.Global;
import org.kie.api.KieBase;
import org.kie.api.runtime.KieContainer;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.rule.QueryResults;
@ -30,7 +28,7 @@ import com.iqser.red.service.redaction.v1.server.model.dictionary.Dictionary;
import com.iqser.red.service.redaction.v1.server.model.document.nodes.Document;
import com.iqser.red.service.redaction.v1.server.model.document.nodes.SemanticNode;
import com.iqser.red.service.redaction.v1.server.service.ManualChangesApplicationService;
import com.iqser.red.service.redaction.v1.server.service.WebSocketService;
import com.iqser.red.service.redaction.v1.server.service.websocket.WebSocketService;
import com.iqser.red.service.redaction.v1.server.service.document.EntityCreationService;
import com.iqser.red.service.redaction.v1.server.service.document.EntityEnrichmentService;
import com.iqser.red.service.redaction.v1.server.utils.exception.DroolsTimeoutException;

View File

@ -0,0 +1,22 @@
package com.iqser.red.service.redaction.v1.server.service.websocket;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
@RequiredArgsConstructor
public class RedisPubsubReceiver {
private final SimpMessagingTemplate websocketTemplate;
private final ObjectMapper mapper;
@SneakyThrows
public void receiveMessage(String message) {
var data = mapper.readValue(message, WebSocketMessage.class);
websocketTemplate.convertAndSend(data.getTopic(), data.getMessage());
}
}

View File

@ -0,0 +1,25 @@
package com.iqser.red.service.redaction.v1.server.service.websocket;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
@Service
@RequiredArgsConstructor
public class RedisSyncedWebSocketService {
private final StringRedisTemplate redisTemplate;
private final ObjectMapper mapper;
@SneakyThrows
public void convertAndSend(String topic, Object message) {
var socketData = new WebSocketMessage(topic, mapper.writeValueAsString(message));
String data = mapper.writeValueAsString(socketData);
redisTemplate.convertAndSend("redaction-service-websocket-messages", data);
}
}

View File

@ -0,0 +1,17 @@
package com.iqser.red.service.redaction.v1.server.service.websocket;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class WebSocketMessage {
private String topic;
private String message;
}

View File

@ -1,6 +1,5 @@
package com.iqser.red.service.redaction.v1.server.service;
package com.iqser.red.service.redaction.v1.server.service.websocket;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import com.iqser.red.service.redaction.v1.server.logger.RuleLogEvent;
@ -13,7 +12,8 @@ import lombok.extern.slf4j.Slf4j;
@RequiredArgsConstructor
public class WebSocketService {
private final SimpMessagingTemplate template;
private final RedisSyncedWebSocketService template;
public void sendLogEvent(RuleLogEvent ruleLogEvent) {
@ -21,4 +21,5 @@ public class WebSocketService {
log.info("Sending message to url {}", destination);
template.convertAndSend(destination, ruleLogEvent);
}
}

View File

@ -34,6 +34,7 @@ import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.test.context.ContextConfiguration;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -52,6 +53,7 @@ import com.iqser.red.service.redaction.v1.server.client.RulesClient;
import com.iqser.red.service.redaction.v1.server.controller.RedactionController;
import com.iqser.red.service.redaction.v1.server.service.AnalyzeService;
import com.iqser.red.service.redaction.v1.server.service.UnprocessedChangesService;
import com.iqser.red.service.redaction.v1.server.service.websocket.RedisSyncedWebSocketService;
import com.iqser.red.service.redaction.v1.server.storage.RedactionStorageService;
import com.iqser.red.service.redaction.v1.server.testcontainers.MongoDBTestContainer;
import com.iqser.red.service.redaction.v1.server.utils.LayoutParsingRequestProvider;
@ -167,6 +169,12 @@ public abstract class AbstractRedactionIntegrationTest {
@MockBean
private TenantProvider tenantProvider;
@MockBean
private RedisSyncedWebSocketService redisSyncedWebSocketService;
@MockBean
private RedisMessageListenerContainer redisPubsubContainer;
@Autowired
protected MongoTestConfig mongoTestConfig;

View File

@ -25,6 +25,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import com.google.common.collect.Sets;
@ -36,6 +37,7 @@ import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemp
import com.iqser.red.service.redaction.v1.server.client.DictionaryClient;
import com.iqser.red.service.redaction.v1.server.model.dictionary.DictionaryVersion;
import com.iqser.red.service.redaction.v1.server.service.DictionaryService;
import com.iqser.red.service.redaction.v1.server.service.websocket.RedisSyncedWebSocketService;
import com.iqser.red.storage.commons.service.StorageService;
import com.iqser.red.storage.commons.utils.FileSystemBackedStorageService;
import com.knecon.fforesight.keycloakcommons.security.TenantAuthenticationManagerResolver;
@ -53,6 +55,12 @@ public class DictionaryServiceTest {
@MockBean
private TenantsClient tenantsClient;
@MockBean
private RedisSyncedWebSocketService redisSyncedWebSocketService;
@MockBean
private RedisMessageListenerContainer redisPubsubContainer;
@MockBean
protected KieContainer kieContainer;

View File

@ -53,6 +53,7 @@ import org.springframework.context.annotation.FilterType;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import com.fasterxml.jackson.databind.DeserializationFeature;
@ -81,6 +82,7 @@ import com.iqser.red.service.redaction.v1.server.client.LegalBasisClient;
import com.iqser.red.service.redaction.v1.server.client.RulesClient;
import com.iqser.red.service.redaction.v1.server.controller.RedactionController;
import com.iqser.red.service.redaction.v1.server.service.AnalyzeService;
import com.iqser.red.service.redaction.v1.server.service.websocket.RedisSyncedWebSocketService;
import com.iqser.red.service.redaction.v1.server.storage.RedactionStorageService;
import com.iqser.red.service.redaction.v1.server.utils.LayoutParsingRequestProvider;
import com.iqser.red.service.redaction.v1.server.utils.ResourceLoader;
@ -257,6 +259,10 @@ public class RulesTest {
private TenantsClient tenantsClient;
@MockBean
private TenantAuthenticationManagerResolver tenantAuthenticationManagerResolver;
@MockBean
private RedisSyncedWebSocketService redisSyncedWebSocketService;
@MockBean
private RedisMessageListenerContainer redisPubsubContainer;
@BeforeEach

View File

@ -21,7 +21,7 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import com.iqser.red.service.redaction.v1.server.service.WebSocketService;
import com.iqser.red.service.redaction.v1.server.service.websocket.WebSocketService;
@ExtendWith(MockitoExtension.class)
class RulesLoggerTest {

View File

@ -34,6 +34,7 @@ import org.springframework.context.annotation.Primary;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import com.fasterxml.jackson.core.type.TypeReference;
@ -52,6 +53,7 @@ import com.iqser.red.service.redaction.v1.server.client.LegalBasisClient;
import com.iqser.red.service.redaction.v1.server.client.RulesClient;
import com.iqser.red.service.redaction.v1.server.queue.RedactionMessageReceiver;
import com.iqser.red.service.redaction.v1.server.service.DictionaryService;
import com.iqser.red.service.redaction.v1.server.service.websocket.RedisSyncedWebSocketService;
import com.iqser.red.service.redaction.v1.server.storage.RedactionStorageService;
import com.iqser.red.service.redaction.v1.server.utils.ExceptionProvider;
import com.iqser.red.storage.commons.StorageAutoConfiguration;
@ -93,6 +95,12 @@ public class LiveDataIntegrationTest {
@MockBean
private LegalBasisClient legalBasisClient;
@MockBean
private RedisSyncedWebSocketService redisSyncedWebSocketService;
@MockBean
private RedisMessageListenerContainer redisPubsubContainer;
@MockBean
protected TenantAuthenticationManagerResolver tenantAuthenticationManagerResolver;