2017-03-31 65 views
1

我最近開始嘗試使用kafka流。我有一個場景,我需要加入KStreamKTable。可能是KTable不包含某些密鑰。在這種情況下,我得到一個NullPointerException當KTable丟失密鑰時,處理KStream與KTable的連接

具體我是越來越

流線[StreamThread-1]處理過程中Streams應用程序錯誤: 顯示java.lang.NullPointerException

我不知道我怎麼可以搞定。我不能以某種方式過濾掉不符合表條目的流的記錄。

更新

看遠一點,我發現我可以查詢底層的店裏找是否通過ReadOnlyKeyValueStore接口存在的關鍵。

在這種情況下,我的問題是,這是最好的方式嗎?即根據本地存儲中是否存在密鑰來過濾要加入的流?

我在這種情況下,第二個問題是,因爲我在乎利用在下一階段10.2版中引入的Global State Store,我應該預料到我會也能以同樣的方式來查詢Global State Store

更新

上一次更新是不準確的,因爲它無法從拓撲

最後更新

內部查詢狀態存儲理解加入語義好一點後我能夠解決這個問題只是簡化valueJoiner只返回結果,而不是對連接的值執行操作,並在連接後添加額外的過濾步驟t o過濾掉空值。

+0

我還是有點困惑。你什麼時候準確得到'NullPointerException'?你使用什麼版本的卡夫卡?你使用什麼「類型」連接(即內連接或左連接)?另外看看這個:http://docs.confluent.io/current/streams/developer-guide.html#kstream-ktable-join –

+0

你可能想自己回答你的問題,並接受你自己的答案。或畢竟刪除了這個問題:) –

+0

我試圖在流上做一個'leftJoin'。我用我收到的信息更新了問題。我在'0.10.1'版本。我會妥善制定答案並提交。謝謝:) – LetsPlayYahtzee

回答

1

解決我的問題來自於理解join語義更好一點。

和數據庫連接一樣(儘管我並不是說Kstream連接遵循db連接概念)左連接操作會導致無論右側鍵丟失的位置都有空值的行。因此最終我唯一要做的就是將我的valueJoiner與後續的計算/操作(我需要對連接記錄的字段執行一些計算並返回一個新構造的對象)解耦,並且只返回一個連接值的數組。然後,我可以通過檢查這些數組來篩選出導致null值的記錄。

基於馬蒂亞斯的J.薩克斯建議,我使用的0.10.2版本代替0.10.1這與經紀人版本0.10.1兼容的,並與內更換整個leftJoin邏輯加入其去除用於濾除null值的需要。