diff --git a/layoutparser-service/layoutparser-service-server/src/main/java/com/knecon/fforesight/service/layoutparser/server/queue/MessageHandler.java b/layoutparser-service/layoutparser-service-server/src/main/java/com/knecon/fforesight/service/layoutparser/server/queue/MessageHandler.java index 4b6a6cd..e4c4d8f 100644 --- a/layoutparser-service/layoutparser-service-server/src/main/java/com/knecon/fforesight/service/layoutparser/server/queue/MessageHandler.java +++ b/layoutparser-service/layoutparser-service-server/src/main/java/com/knecon/fforesight/service/layoutparser/server/queue/MessageHandler.java @@ -1,6 +1,9 @@ package com.knecon.fforesight.service.layoutparser.server.queue; import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; import org.springframework.amqp.AmqpRejectAndDontRequeueException; import org.springframework.amqp.core.Message; @@ -28,6 +31,7 @@ public class MessageHandler { private final LayoutParsingPipeline layoutParsingPipeline; private final ObjectMapper objectMapper; private final RabbitTemplate rabbitTemplate; + private final static String X_PIPELINE_PREFIX = "X-PIPE-"; @RabbitHandler @@ -46,14 +50,24 @@ public class MessageHandler { layoutParsingRequest.identifier())); } LayoutParsingFinishedEvent layoutParsingFinishedEvent = layoutParsingPipeline.parseLayoutAndSaveFilesToStorage(layoutParsingRequest); - sendLayoutParsingFinishedEvent(layoutParsingFinishedEvent); + sendLayoutParsingFinishedEvent(layoutParsingFinishedEvent, message); } - public void sendLayoutParsingFinishedEvent(LayoutParsingFinishedEvent layoutParsingFinishedEvent) { + public void sendLayoutParsingFinishedEvent(LayoutParsingFinishedEvent layoutParsingFinishedEvent, Message message) { Arrays.stream(layoutParsingFinishedEvent.message().split("\n")).forEach(log::info); - rabbitTemplate.convertAndSend(LayoutParsingQueueNames.LAYOUT_PARSING_FINISHED_EVENT_QUEUE, layoutParsingFinishedEvent); + rabbitTemplate.convertAndSend(LayoutParsingQueueNames.LAYOUT_PARSING_FINISHED_EVENT_QUEUE, layoutParsingFinishedEvent, m -> { + var forwardHeaders = message.getMessageProperties() + .getHeaders() + .entrySet() + .stream() + .filter(e -> e.getKey().toUpperCase(Locale.ROOT).startsWith(X_PIPELINE_PREFIX)) + .collect(Collectors.toMap(Map.Entry::getKey, + Map.Entry::getValue)); + m.getMessageProperties().getHeaders().putAll(forwardHeaders); + return m; + }); } }