0

我試圖創建一個基於事件的系統,使用Apache Kafka作爲消息系統和Spring雲流Kafka進行通信。無法過濾使用Spring雲流條件屬性接收到的消息流@StreamListener註釋

我已經如下書面我的接收器類的方法,

@StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeCreatedEvent'") 
    public void handleEmployeeCreatedEvent(@Payload String payload) { 
     logger.info("Received EmployeeCreatedEvent: " + payload); 
    } 

這種方法是特別捕捉針對與EmployeeCreatedEvent消息或事件。

@StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeTransferredEvent'") 
    public void handleEmployeeTransferredEvent(@Payload String payload) { 
     logger.info("Received EmployeeTransferredEvent: " + payload); 
    } 

此方法專門用於捕獲與EmployeeTransferredEvent相關的消息或事件。

@StreamListener(target = Sink.INPUT) 
    public void handleDefaultEvent(@Payload String payload) { 
     logger.info("Received payload: " + payload); 
    } 

這是默認方法。

當我運行該應用程序時,我無法看到使用condition屬性被調用的方法。我只看到handleDefaultEvent方法被調用。

我從發送/源應用程序使用以下CustomMessageSource如下類發送信息給該接收機中的應用,

@Component 
@EnableBinding(Source.class) 
public class CustomMessageSource { 
    @Autowired 
    private Source source;     


    public void sendMessage(String payload,String eventType) { 
     Message<String> myMessage = MessageBuilder.withPayload(payload) 
       .setHeader("eventType", eventType) 
       .build(); 
     source.output().send(myMessage); 

    } 

} 

我打電話從我在源應用控制器如以下的方法中,

customMessageSource.sendMessage("Hello","EmployeeCreatedEvent"); 

的customMessageSource實例自動裝配下面,

@Autowired 
CustomMessageSource customMessageSource; 

Basicaly,我想篩選接收器/接收器應用程序接收到的消息並相應地處理它們。

爲此,我使用了帶條件屬性的@StreamListener註釋來模擬處理不同事件的行爲。

我正在使用Spring Cloud Stream Chelsea.SR2版本。

有人可以幫助我解決這個問題。

回答

1

看起來好像頭不傳播。確保您在spring.cloud.stream.kafka.binder.headershttp://docs.spring.io/autorepo/docs/spring-cloud-stream-docs/Chelsea.SR2/reference/htmlsingle/#_kafka_binder_properties中包含自定義標頭。

+0

感謝您的回覆。但是,我沒有得到你的迴應。我明確地構造了有效負載和頭部作爲輸入接收的消息。我不知道爲什麼我應該在配置文件中包含自定義標題。 – juser

+0

默認情況下,Spring雲流不傳播自定義標頭。這個設置會告訴生產者應用程序,我們應該。 –

+0

卡夫卡(目前)沒有標題的概念;我們必須將它們嵌入數據中;因此選擇加入方法,以避免添加不必要的數據。 –