From fe7b43c32aaf8990947fea2cbdf162259bf3cad9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dominique=20Eifl=C3=A4nder?= Date: Tue, 2 Jul 2024 14:41:03 +0200 Subject: [PATCH] RED-9500: Use Redis pub/sub to syncronize websocket topic messages accross multiple pods --- .../config/RedisPubsubConfiguration.java | 42 +++++++++++++++++++ .../v1/server/logger/RulesLogger.java | 3 +- .../ComponentDroolsExecutionService.java | 4 +- .../drools/EntityDroolsExecutionService.java | 4 +- .../websocket/RedisPubsubReceiver.java | 22 ++++++++++ .../RedisSyncedWebSocketService.java | 25 +++++++++++ .../service/websocket/WebSocketMessage.java | 17 ++++++++ .../{ => websocket}/WebSocketService.java | 7 ++-- .../AbstractRedactionIntegrationTest.java | 8 ++++ .../v1/server/DictionaryServiceTest.java | 8 ++++ .../redaction/v1/server/RulesTest.java | 6 +++ .../v1/server/logger/RulesLoggerTest.java | 2 +- .../realdata/LiveDataIntegrationTest.java | 8 ++++ 13 files changed, 144 insertions(+), 12 deletions(-) create mode 100644 redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/config/RedisPubsubConfiguration.java create mode 100644 redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/websocket/RedisPubsubReceiver.java create mode 100644 redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/websocket/RedisSyncedWebSocketService.java create mode 100644 redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/websocket/WebSocketMessage.java rename redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/{ => websocket}/WebSocketService.java (75%) diff --git a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/config/RedisPubsubConfiguration.java b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/config/RedisPubsubConfiguration.java new file mode 100644 index 00000000..949286f4 --- /dev/null +++ b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/config/RedisPubsubConfiguration.java @@ -0,0 +1,42 @@ +package com.iqser.red.service.redaction.v1.server.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.listener.PatternTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; +import org.springframework.messaging.simp.SimpMessagingTemplate; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.iqser.red.service.redaction.v1.server.service.websocket.RedisPubsubReceiver; + +import lombok.RequiredArgsConstructor; + +@Configuration +@RequiredArgsConstructor +public class RedisPubsubConfiguration { + + private final SimpMessagingTemplate template; + private final ObjectMapper mapper; + private final RedisConnectionFactory connectionFactory; + + @Bean + public RedisPubsubReceiver redisPubsubReceiver() { + return new RedisPubsubReceiver(template, mapper); + } + + @Bean + public MessageListenerAdapter redisPubsubListenerAdapter() { + return new MessageListenerAdapter(redisPubsubReceiver(), "receiveMessage"); + } + + @Bean + public RedisMessageListenerContainer redisPubsubContainer() { + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + container.addMessageListener(redisPubsubListenerAdapter(), new PatternTopic("redaction-service-websocket-messages")); + return container; + } + +} \ No newline at end of file diff --git a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/logger/RulesLogger.java b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/logger/RulesLogger.java index 702f7fcc..abd260e8 100644 --- a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/logger/RulesLogger.java +++ b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/logger/RulesLogger.java @@ -3,10 +3,9 @@ package com.iqser.red.service.redaction.v1.server.logger; import java.time.OffsetDateTime; import java.util.regex.Pattern; -import com.iqser.red.service.redaction.v1.server.service.WebSocketService; +import com.iqser.red.service.redaction.v1.server.service.websocket.WebSocketService; import lombok.RequiredArgsConstructor; -import lombok.Setter; @RequiredArgsConstructor public class RulesLogger { diff --git a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/drools/ComponentDroolsExecutionService.java b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/drools/ComponentDroolsExecutionService.java index c93340de..7c385df5 100644 --- a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/drools/ComponentDroolsExecutionService.java +++ b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/drools/ComponentDroolsExecutionService.java @@ -9,8 +9,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.drools.model.Global; -import org.kie.api.definition.KiePackage; import org.kie.api.runtime.KieContainer; import org.kie.api.runtime.KieSession; import org.kie.api.runtime.rule.QueryResults; @@ -29,7 +27,7 @@ import com.iqser.red.service.redaction.v1.server.logger.RulesLogger; import com.iqser.red.service.redaction.v1.server.model.component.Component; import com.iqser.red.service.redaction.v1.server.model.component.Entity; import com.iqser.red.service.redaction.v1.server.model.document.nodes.Document; -import com.iqser.red.service.redaction.v1.server.service.WebSocketService; +import com.iqser.red.service.redaction.v1.server.service.websocket.WebSocketService; import com.iqser.red.service.redaction.v1.server.service.components.ComponentMappingMemoryCache; import com.iqser.red.service.redaction.v1.server.service.components.ComponentMappingService; import com.iqser.red.service.redaction.v1.server.service.document.ComponentComparator; diff --git a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/drools/EntityDroolsExecutionService.java b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/drools/EntityDroolsExecutionService.java index 42675b01..b8c817f6 100644 --- a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/drools/EntityDroolsExecutionService.java +++ b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/drools/EntityDroolsExecutionService.java @@ -10,8 +10,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.drools.model.Global; -import org.kie.api.KieBase; import org.kie.api.runtime.KieContainer; import org.kie.api.runtime.KieSession; import org.kie.api.runtime.rule.QueryResults; @@ -30,7 +28,7 @@ import com.iqser.red.service.redaction.v1.server.model.dictionary.Dictionary; import com.iqser.red.service.redaction.v1.server.model.document.nodes.Document; import com.iqser.red.service.redaction.v1.server.model.document.nodes.SemanticNode; import com.iqser.red.service.redaction.v1.server.service.ManualChangesApplicationService; -import com.iqser.red.service.redaction.v1.server.service.WebSocketService; +import com.iqser.red.service.redaction.v1.server.service.websocket.WebSocketService; import com.iqser.red.service.redaction.v1.server.service.document.EntityCreationService; import com.iqser.red.service.redaction.v1.server.service.document.EntityEnrichmentService; import com.iqser.red.service.redaction.v1.server.utils.exception.DroolsTimeoutException; diff --git a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/websocket/RedisPubsubReceiver.java b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/websocket/RedisPubsubReceiver.java new file mode 100644 index 00000000..e36460d1 --- /dev/null +++ b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/websocket/RedisPubsubReceiver.java @@ -0,0 +1,22 @@ +package com.iqser.red.service.redaction.v1.server.service.websocket; + +import org.springframework.messaging.simp.SimpMessagingTemplate; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; + +@RequiredArgsConstructor +public class RedisPubsubReceiver { + + private final SimpMessagingTemplate websocketTemplate; + private final ObjectMapper mapper; + + @SneakyThrows + public void receiveMessage(String message) { + var data = mapper.readValue(message, WebSocketMessage.class); + websocketTemplate.convertAndSend(data.getTopic(), data.getMessage()); + } + +} \ No newline at end of file diff --git a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/websocket/RedisSyncedWebSocketService.java b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/websocket/RedisSyncedWebSocketService.java new file mode 100644 index 00000000..fa70ef14 --- /dev/null +++ b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/websocket/RedisSyncedWebSocketService.java @@ -0,0 +1,25 @@ +package com.iqser.red.service.redaction.v1.server.service.websocket; + +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; + +@Service +@RequiredArgsConstructor +public class RedisSyncedWebSocketService { + + private final StringRedisTemplate redisTemplate; + private final ObjectMapper mapper; + + @SneakyThrows + public void convertAndSend(String topic, Object message) { + var socketData = new WebSocketMessage(topic, mapper.writeValueAsString(message)); + String data = mapper.writeValueAsString(socketData); + redisTemplate.convertAndSend("redaction-service-websocket-messages", data); + } + +} \ No newline at end of file diff --git a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/websocket/WebSocketMessage.java b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/websocket/WebSocketMessage.java new file mode 100644 index 00000000..33ce9694 --- /dev/null +++ b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/websocket/WebSocketMessage.java @@ -0,0 +1,17 @@ +package com.iqser.red.service.redaction.v1.server.service.websocket; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WebSocketMessage { + + private String topic; + private String message; + +} diff --git a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/WebSocketService.java b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/websocket/WebSocketService.java similarity index 75% rename from redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/WebSocketService.java rename to redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/websocket/WebSocketService.java index a14295fa..c5774950 100644 --- a/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/WebSocketService.java +++ b/redaction-service-v1/redaction-service-server-v1/src/main/java/com/iqser/red/service/redaction/v1/server/service/websocket/WebSocketService.java @@ -1,6 +1,5 @@ -package com.iqser.red.service.redaction.v1.server.service; +package com.iqser.red.service.redaction.v1.server.service.websocket; -import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Service; import com.iqser.red.service.redaction.v1.server.logger.RuleLogEvent; @@ -13,7 +12,8 @@ import lombok.extern.slf4j.Slf4j; @RequiredArgsConstructor public class WebSocketService { - private final SimpMessagingTemplate template; + private final RedisSyncedWebSocketService template; + public void sendLogEvent(RuleLogEvent ruleLogEvent) { @@ -21,4 +21,5 @@ public class WebSocketService { log.info("Sending message to url {}", destination); template.convertAndSend(destination, ruleLogEvent); } + } diff --git a/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/AbstractRedactionIntegrationTest.java b/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/AbstractRedactionIntegrationTest.java index 53f69695..8154d748 100644 --- a/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/AbstractRedactionIntegrationTest.java +++ b/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/AbstractRedactionIntegrationTest.java @@ -34,6 +34,7 @@ import org.springframework.boot.test.util.TestPropertyValues; import org.springframework.context.ApplicationContextInitializer; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.core.io.ClassPathResource; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.test.context.ContextConfiguration; import com.fasterxml.jackson.databind.ObjectMapper; @@ -52,6 +53,7 @@ import com.iqser.red.service.redaction.v1.server.client.RulesClient; import com.iqser.red.service.redaction.v1.server.controller.RedactionController; import com.iqser.red.service.redaction.v1.server.service.AnalyzeService; import com.iqser.red.service.redaction.v1.server.service.UnprocessedChangesService; +import com.iqser.red.service.redaction.v1.server.service.websocket.RedisSyncedWebSocketService; import com.iqser.red.service.redaction.v1.server.storage.RedactionStorageService; import com.iqser.red.service.redaction.v1.server.testcontainers.MongoDBTestContainer; import com.iqser.red.service.redaction.v1.server.utils.LayoutParsingRequestProvider; @@ -167,6 +169,12 @@ public abstract class AbstractRedactionIntegrationTest { @MockBean private TenantProvider tenantProvider; + @MockBean + private RedisSyncedWebSocketService redisSyncedWebSocketService; + + @MockBean + private RedisMessageListenerContainer redisPubsubContainer; + @Autowired protected MongoTestConfig mongoTestConfig; diff --git a/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/DictionaryServiceTest.java b/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/DictionaryServiceTest.java index b7fb9300..6738388c 100644 --- a/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/DictionaryServiceTest.java +++ b/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/DictionaryServiceTest.java @@ -25,6 +25,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Primary; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.test.context.junit.jupiter.SpringExtension; import com.google.common.collect.Sets; @@ -36,6 +37,7 @@ import com.iqser.red.service.persistence.service.v1.api.shared.model.dossiertemp import com.iqser.red.service.redaction.v1.server.client.DictionaryClient; import com.iqser.red.service.redaction.v1.server.model.dictionary.DictionaryVersion; import com.iqser.red.service.redaction.v1.server.service.DictionaryService; +import com.iqser.red.service.redaction.v1.server.service.websocket.RedisSyncedWebSocketService; import com.iqser.red.storage.commons.service.StorageService; import com.iqser.red.storage.commons.utils.FileSystemBackedStorageService; import com.knecon.fforesight.keycloakcommons.security.TenantAuthenticationManagerResolver; @@ -53,6 +55,12 @@ public class DictionaryServiceTest { @MockBean private TenantsClient tenantsClient; + @MockBean + private RedisSyncedWebSocketService redisSyncedWebSocketService; + + @MockBean + private RedisMessageListenerContainer redisPubsubContainer; + @MockBean protected KieContainer kieContainer; diff --git a/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/RulesTest.java b/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/RulesTest.java index 5af5a034..56ce9aaf 100644 --- a/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/RulesTest.java +++ b/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/RulesTest.java @@ -53,6 +53,7 @@ import org.springframework.context.annotation.FilterType; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Primary; import org.springframework.core.io.ClassPathResource; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.test.context.junit.jupiter.SpringExtension; import com.fasterxml.jackson.databind.DeserializationFeature; @@ -81,6 +82,7 @@ import com.iqser.red.service.redaction.v1.server.client.LegalBasisClient; import com.iqser.red.service.redaction.v1.server.client.RulesClient; import com.iqser.red.service.redaction.v1.server.controller.RedactionController; import com.iqser.red.service.redaction.v1.server.service.AnalyzeService; +import com.iqser.red.service.redaction.v1.server.service.websocket.RedisSyncedWebSocketService; import com.iqser.red.service.redaction.v1.server.storage.RedactionStorageService; import com.iqser.red.service.redaction.v1.server.utils.LayoutParsingRequestProvider; import com.iqser.red.service.redaction.v1.server.utils.ResourceLoader; @@ -257,6 +259,10 @@ public class RulesTest { private TenantsClient tenantsClient; @MockBean private TenantAuthenticationManagerResolver tenantAuthenticationManagerResolver; + @MockBean + private RedisSyncedWebSocketService redisSyncedWebSocketService; + @MockBean + private RedisMessageListenerContainer redisPubsubContainer; @BeforeEach diff --git a/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/logger/RulesLoggerTest.java b/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/logger/RulesLoggerTest.java index 1e3a196f..7ac8b58c 100644 --- a/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/logger/RulesLoggerTest.java +++ b/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/logger/RulesLoggerTest.java @@ -21,7 +21,7 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import com.iqser.red.service.redaction.v1.server.service.WebSocketService; +import com.iqser.red.service.redaction.v1.server.service.websocket.WebSocketService; @ExtendWith(MockitoExtension.class) class RulesLoggerTest { diff --git a/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/realdata/LiveDataIntegrationTest.java b/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/realdata/LiveDataIntegrationTest.java index 7996910a..3f845ec5 100644 --- a/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/realdata/LiveDataIntegrationTest.java +++ b/redaction-service-v1/redaction-service-server-v1/src/test/java/com/iqser/red/service/redaction/v1/server/realdata/LiveDataIntegrationTest.java @@ -34,6 +34,7 @@ import org.springframework.context.annotation.Primary; import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.Resource; import org.springframework.core.io.support.ResourcePatternResolver; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.test.context.junit.jupiter.SpringExtension; import com.fasterxml.jackson.core.type.TypeReference; @@ -52,6 +53,7 @@ import com.iqser.red.service.redaction.v1.server.client.LegalBasisClient; import com.iqser.red.service.redaction.v1.server.client.RulesClient; import com.iqser.red.service.redaction.v1.server.queue.RedactionMessageReceiver; import com.iqser.red.service.redaction.v1.server.service.DictionaryService; +import com.iqser.red.service.redaction.v1.server.service.websocket.RedisSyncedWebSocketService; import com.iqser.red.service.redaction.v1.server.storage.RedactionStorageService; import com.iqser.red.service.redaction.v1.server.utils.ExceptionProvider; import com.iqser.red.storage.commons.StorageAutoConfiguration; @@ -93,6 +95,12 @@ public class LiveDataIntegrationTest { @MockBean private LegalBasisClient legalBasisClient; + @MockBean + private RedisSyncedWebSocketService redisSyncedWebSocketService; + + @MockBean + private RedisMessageListenerContainer redisPubsubContainer; + @MockBean protected TenantAuthenticationManagerResolver tenantAuthenticationManagerResolver;