Tenants retry logic and queue renames
This commit is contained in:
parent
f7a549c1a3
commit
380b62333c
@ -22,7 +22,7 @@ configurations {
|
||||
val springBootStarterVersion = "3.1.5"
|
||||
|
||||
dependencies {
|
||||
api("com.knecon.fforesight:tenant-commons:0.28.0")
|
||||
api("com.knecon.fforesight:tenant-commons:0.29.0")
|
||||
api("com.knecon.fforesight:tracing-commons:0.5.0")
|
||||
api("com.knecon.fforesight:lifecycle-commons:0.6.0")
|
||||
api("com.google.guava:guava:31.1-jre")
|
||||
|
||||
@ -17,32 +17,32 @@ import lombok.RequiredArgsConstructor;
|
||||
@RequiredArgsConstructor
|
||||
public class MessagingConfiguration {
|
||||
|
||||
public static final String INDEXING_QUEUE_PREFIX = "indexing_queue";
|
||||
public static final String INDEXING_EXCHANGE = "indexing_exchange";
|
||||
public static final String INDEXING_DLQ = "indexing_dlq";
|
||||
public static final String INDEXING_REQUEST_QUEUE_PREFIX = "indexing_request";
|
||||
public static final String INDEXING_REQUEST_EXCHANGE = "indexing_request_exchange";
|
||||
public static final String INDEXING_DLQ = "indexing_error";
|
||||
|
||||
public static final String DELETE_FROM_INDEX_QUEUE_PREFIX = "delete_from_index_queue";
|
||||
public static final String DELETE_FROM_INDEX_EXCHANGE = "delete_from_index_exchange";
|
||||
public static final String DELETE_FROM_INDEX_DLQ = "delete_from_index_dlq";
|
||||
public static final String DELETE_FROM_INDEX_REQUEST_QUEUE_PREFIX = "delete_from_index_request";
|
||||
public static final String DELETE_FROM_INDEX_REQUEST_EXCHANGE = "delete_from_index_request_exchange";
|
||||
public static final String DELETE_FROM_INDEX_DLQ = "delete_from_index_error";
|
||||
|
||||
public static final String X_ERROR_INFO_HEADER = "x-error-message";
|
||||
public static final String X_ERROR_INFO_TIMESTAMP_HEADER = "x-error-message-timestamp";
|
||||
|
||||
@Value("${fforesight.multitenancy.tenant-delete-queue:tenant-delete-queue}")
|
||||
@Value("${fforesight.multitenancy.tenant-delete-queue:search-service-tenant-delete}")
|
||||
private String tenantDeleteEventQueueName;
|
||||
@Value("${fforesight.multitenancy.tenant-delete-dlq:tenant-delete-dlq}")
|
||||
@Value("${fforesight.multitenancy.tenant-delete-dlq:search-service-tenant-delete-error}")
|
||||
private String tenantDeleteDLQName;
|
||||
|
||||
@Value("${fforesight.multitenancy.tenant-updated-queue:tenant-updated-queue}")
|
||||
@Value("${fforesight.multitenancy.tenant-updated-queue:search-service-tenant-updated}")
|
||||
private String tenantUpdatedEventQueueName;
|
||||
@Value("${fforesight.multitenancy.tenant-updated-dlq:tenant-updated-dlq}")
|
||||
@Value("${fforesight.multitenancy.tenant-updated-dlq:search-service-tenant-updated-error}")
|
||||
private String tenantUpdatedDLQName;
|
||||
|
||||
|
||||
@Bean
|
||||
public DirectExchange indexingExchange() {
|
||||
public DirectExchange indexingRequestExchange() {
|
||||
|
||||
return new DirectExchange(INDEXING_EXCHANGE);
|
||||
return new DirectExchange(INDEXING_REQUEST_EXCHANGE);
|
||||
}
|
||||
|
||||
|
||||
@ -54,9 +54,9 @@ public class MessagingConfiguration {
|
||||
|
||||
|
||||
@Bean
|
||||
public DirectExchange deleteFromIndexExchange() {
|
||||
public DirectExchange deleteFromIndexRequestExchange() {
|
||||
|
||||
return new DirectExchange(DELETE_FROM_INDEX_EXCHANGE);
|
||||
return new DirectExchange(DELETE_FROM_INDEX_REQUEST_EXCHANGE);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -2,7 +2,7 @@ package com.iqser.red.service.search.v1.server.queue;
|
||||
|
||||
import static com.iqser.red.service.search.v1.server.configuration.MessagingConfiguration.DELETE_FROM_INDEX_DLQ;
|
||||
import static com.iqser.red.service.search.v1.server.configuration.MessagingConfiguration.INDEXING_DLQ;
|
||||
import static com.iqser.red.service.search.v1.server.configuration.MessagingConfiguration.INDEXING_EXCHANGE;
|
||||
import static com.iqser.red.service.search.v1.server.configuration.MessagingConfiguration.INDEXING_REQUEST_EXCHANGE;
|
||||
import static com.iqser.red.service.search.v1.server.configuration.MessagingConfiguration.X_ERROR_INFO_HEADER;
|
||||
import static com.iqser.red.service.search.v1.server.configuration.MessagingConfiguration.X_ERROR_INFO_TIMESTAMP_HEADER;
|
||||
|
||||
@ -223,7 +223,7 @@ public class IndexingMessageReceiver {
|
||||
|
||||
for (FileModel file : files) {
|
||||
log.info("Will add dossier {} file {} to index queue", dossierId, file.getId());
|
||||
rabbitTemplate.convertAndSend(INDEXING_EXCHANGE,
|
||||
rabbitTemplate.convertAndSend(INDEXING_REQUEST_EXCHANGE,
|
||||
TenantContext.getTenantId(),
|
||||
IndexMessage.builder().messageType(IndexMessageType.INSERT).dossierId(dossierId).fileId(file.getId()).build(),
|
||||
message -> {
|
||||
|
||||
@ -32,15 +32,15 @@ public class TenantExchangeMessageReceiverImpl extends TenantExchangeMessageRece
|
||||
|
||||
return Set.of(TenantQueueConfiguration.builder()
|
||||
.listenerId(IndexingMessageReceiver.INDEXING_LISTENER_ID)
|
||||
.exchangeName(INDEXING_EXCHANGE)
|
||||
.queuePrefix(INDEXING_QUEUE_PREFIX)
|
||||
.exchangeName(INDEXING_REQUEST_EXCHANGE)
|
||||
.queuePrefix(INDEXING_REQUEST_QUEUE_PREFIX)
|
||||
.dlqName(INDEXING_DLQ)
|
||||
.arguments(Map.of("x-max-priority", 2))
|
||||
.build(),
|
||||
TenantQueueConfiguration.builder()
|
||||
.listenerId(IndexingMessageReceiver.DELETE_FROM_INDEX_LISTENER_ID)
|
||||
.exchangeName(DELETE_FROM_INDEX_EXCHANGE)
|
||||
.queuePrefix(DELETE_FROM_INDEX_QUEUE_PREFIX)
|
||||
.exchangeName(DELETE_FROM_INDEX_REQUEST_EXCHANGE)
|
||||
.queuePrefix(DELETE_FROM_INDEX_REQUEST_QUEUE_PREFIX)
|
||||
.dlqName(DELETE_FROM_INDEX_DLQ)
|
||||
.arguments(Map.of("x-max-priority", 2))
|
||||
.build());
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user