我試圖創建一個基於事件的系統,使用Apache Kafka作爲消息系統和Spring雲流Kafka進行通信。無法過濾使用Spring雲流條件屬性接收到的消息流@StreamListener註釋
我已經如下書面我的接收器類的方法,
@StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeCreatedEvent'")
public void handleEmployeeCreatedEvent(@Payload String payload) {
logger.info("Received EmployeeCreatedEvent: " + payload);
}
這種方法是特別捕捉針對與EmployeeCreatedEvent消息或事件。
@StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeTransferredEvent'")
public void handleEmployeeTransferredEvent(@Payload String payload) {
logger.info("Received EmployeeTransferredEvent: " + payload);
}
此方法專門用於捕獲與EmployeeTransferredEvent相關的消息或事件。
@StreamListener(target = Sink.INPUT)
public void handleDefaultEvent(@Payload String payload) {
logger.info("Received payload: " + payload);
}
這是默認方法。
當我運行該應用程序時,我無法看到使用condition屬性被調用的方法。我只看到handleDefaultEvent方法被調用。
我從發送/源應用程序使用以下CustomMessageSource如下類發送信息給該接收機中的應用,
@Component
@EnableBinding(Source.class)
public class CustomMessageSource {
@Autowired
private Source source;
public void sendMessage(String payload,String eventType) {
Message<String> myMessage = MessageBuilder.withPayload(payload)
.setHeader("eventType", eventType)
.build();
source.output().send(myMessage);
}
}
我打電話從我在源應用控制器如以下的方法中,
customMessageSource.sendMessage("Hello","EmployeeCreatedEvent");
的customMessageSource實例自動裝配下面,
@Autowired
CustomMessageSource customMessageSource;
Basicaly,我想篩選接收器/接收器應用程序接收到的消息並相應地處理它們。
爲此,我使用了帶條件屬性的@StreamListener註釋來模擬處理不同事件的行爲。
我正在使用Spring Cloud Stream Chelsea.SR2版本。
有人可以幫助我解決這個問題。
感謝您的回覆。但是,我沒有得到你的迴應。我明確地構造了有效負載和頭部作爲輸入接收的消息。我不知道爲什麼我應該在配置文件中包含自定義標題。 – juser
默認情況下,Spring雲流不傳播自定義標頭。這個設置會告訴生產者應用程序,我們應該。 –
卡夫卡(目前)沒有標題的概念;我們必須將它們嵌入數據中;因此選擇加入方法,以避免添加不必要的數據。 –