Pull request #64: RED-5151: Reject redelivered messages
Merge in RED/search-service from RED-5151 to master * commit '060f4c45690204ac1bddec30e8a50b8490237de8': RED-5151: Reject redelivered messages
This commit is contained in:
commit
239ebe4a77
@ -6,8 +6,6 @@ import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.iqser.red.service.search.v1.model.IndexMessage;
|
||||
import com.iqser.red.service.search.v1.model.IndexMessageType;
|
||||
import com.iqser.red.service.search.v1.server.queue.IndexingMessageReceiver;
|
||||
@ -25,21 +23,19 @@ public class MigrationStarterService {
|
||||
private final ApplicationContext ctx;
|
||||
private final IndexInformationService indexInformationService;
|
||||
private final IndexingMessageReceiver indexingMessageReceiver;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final SearchServiceSettings settings;
|
||||
|
||||
|
||||
@EventListener(ApplicationReadyEvent.class)
|
||||
public void migrate() throws JsonProcessingException {
|
||||
public void migrate() {
|
||||
|
||||
// This can only run in post upgrade hook, because otherwise the old service is still runnnig.
|
||||
if (settings.isMigrateOnly()) {
|
||||
if (indexInformationService.hasIndexChanged()) {
|
||||
log.info("Index has changed and will be closed, dropped, recreated and all files will be indexed");
|
||||
String indexMessage = objectMapper.writeValueAsString(IndexMessage.builder()
|
||||
indexingMessageReceiver.receiveIndexingRequest(IndexMessage.builder()
|
||||
.messageType(IndexMessageType.DROP)
|
||||
.build());
|
||||
indexingMessageReceiver.receiveIndexingRequest(indexMessage);
|
||||
}
|
||||
System.exit(SpringApplication.exit(ctx, () -> 0));
|
||||
}
|
||||
|
||||
@ -7,6 +7,8 @@ import static com.iqser.red.service.search.v1.server.queue.MessagingConfiguratio
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
@ -32,6 +34,7 @@ import com.iqser.red.service.search.v1.server.service.TextStorageService;
|
||||
|
||||
import io.micrometer.core.annotation.Timed;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@ -53,11 +56,25 @@ public class IndexingMessageReceiver {
|
||||
private final IndexInformationService indexInformationService;
|
||||
|
||||
|
||||
@SneakyThrows
|
||||
@RabbitHandler
|
||||
@RabbitListener(queues = INDEXING_QUEUE)
|
||||
public void receiveIndexingRequest(String in) throws JsonProcessingException {
|
||||
public void receiveIndexingRequest(Message message){
|
||||
|
||||
var indexRequest = objectMapper.readValue(message.getBody(), IndexMessage.class);
|
||||
|
||||
// This prevents from endless retries oom errors.
|
||||
if(message.getMessageProperties().isRedelivered()){
|
||||
throw new AmqpRejectAndDontRequeueException(String.format("Error during last processing of request with dossierId: %s and fileId: %s, do not retry.", indexRequest.getDossierId(), indexRequest.getFileId()));
|
||||
}
|
||||
|
||||
receiveIndexingRequest(indexRequest);
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void receiveIndexingRequest(IndexMessage indexRequest) {
|
||||
|
||||
var indexRequest = objectMapper.readValue(in, IndexMessage.class);
|
||||
log.info("Processing indexing request: {}", indexRequest);
|
||||
|
||||
switch (indexRequest.getMessageType()) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user