2016-08-04 163 views
1

對於下面的代碼,stream1和stream2都單獨運行,我可以看到輸出,但是加入的流根本不會記錄任何內容。我有一種感覺,它與加入窗口有關,但兩個數據流的數據幾乎完全同時出現。無法獲得加入的Kafka流來運行或輸出任何信息

val stream = builder.stream(stringSerde, byteArraySerde, "topic") 

val stream1 = stream 
    .filter((key, value) => somefilter(key, value)) 
    .through(stringSerde, byteArraySerde, "topic1") 

val stream2 = stream 
    .filter((key, value) => someotherfilter(key, value)) 
    .through(stringSerde, byteArraySerde, "topic2") 

val joinedStream = stream1 
    .join(stream2, (value1: Array[Byte], value2: Array[Byte]) => { 
    println("wont print anything") 
    return somerandomdata 
    }, 
    JoinWindows.of("othertopic").within(10000L), 
    stringSerde, byteArraySerde, byteArraySerde) 
+1

聯接窗口即包含在元數據每個記錄都附加到鍵和值)。如果您打印這些時間戳進行調試,這將有所幫助。要訪問它們,您需要使用process() - 給定的'context'對象,包含當前處理的記錄的時間戳(即,每個處理後的記錄更新上下文)。 –

回答

相關問題