2017-08-30 83 views
1

目前我正在構建數據管道。我正在從sql數據庫中讀取2個表,並且我必須在使用kafka流將數據流加入到數據倉庫後,將它們以非規範化格式存儲在OLAP數據倉庫中。加入包含Java哈希映射對象的Kafka流

而不是每個表有單獨的主題,我有兩個表將數據插入單個主題。

我將行轉換爲散列表,然後使用字節序列化程序將此信息轉換爲字節數組並將其推送到主題,因此一行中的所有信息都存儲在單個對象中。的代碼是:

ByteArrayOutputStream bos = new ByteArrayOutputStream(); 
ObjectOutput out = null; 
byte[] yourBytes = null; 
try { 
     out = new ObjectOutputStream(bos); 
     out.writeObject(record); 
     //here record is the row hashmap 
     out.flush(); 
     yourBytes = bos.toByteArray(); 
    } 
catch (IOException ex) { 
    // ignore close exception 
    } 

在流處理應用我反序列化字節陣列回HashMap和過濾器記錄到每個兩個單獨的流爲一個表。

所以我的記錄在處理相反序列化字節陣列回HashMap對象後,記錄如下所示,其中顯示如下屬於每個表中的每個流的一個記錄:

(key,{meta = "PRODUCTS",PRODUCTNAME=ONE, ISACTIVE=1, METATABLENAME=PRODUCT, PRODUCTSUBCATEGORYID=16, PRODUCTID=57}) 

(key,{meta = "BRAND", BRANDNAME="ABC", BRANDID=16, PRODUCTID=57, BRANDCATEGORY = "Electronics"}) 

現在我有加入兩個數據流中的數據,其中每個數值都是一個哈希映射,並加入關鍵字PRODUCTID這是兩個表的公用字段,最後爲每行生成一個哈希表並將該數據流推送到主題。

所以聯接的記錄看起來有類似如下:

(key,{meta = "JOINEDTABLE",PRODUCTNAME=ONE, ISACTIVE=1, METATABLENAME=PRODUCT, PRODUCTSUBCATEGORYID=16, BRANDNAME="ABC", BRANDID=16, PRODUCTID=57,BRANDCATEGORY = "Electronics"}) 

是有可能做到這一點使用的卡夫卡流,如果是,那麼如何?

回答

2

如果你想在卡夫卡流加入,你需要提取的連接屬性,並將其設置爲信息的關鍵字:

KStream streamOfTable1 = ... 
streamOfTable1.selectKey(/*extract productId and set as key*/).to("newTopic1"); 

KStream streamOfTable2 = ... 
streamOfTable2.selectKey(/*extract productId and set as key*/).to("newTopic2"); 

KTable table1 = builder.table("newTopic1"); 
KTable table2 = builder.table("newTopic2"); 

table1.join(table2, ...).to("resultTopic"); 

更多詳情請參閱文檔:http://docs.confluent.io/current/streams/developer-guide.html#joining

我確實假設KTable-KTable連接是你需要的。請注意,您需要手動創建「newTopic1」和「newTopic2」,並且兩者都需要具有相同數量的分區。 (參考http://docs.confluent.io/current/streams/developer-guide.html#user-topics

如果KTable-KTable連接不是您想要的,請檢查其他可用連接類型。

+0

感謝@Matthias它的工作。我已經爲此使用了KTables。 – Anmol