2017-02-27 113 views
1
final KStream<String, EmpModel> empModelStream = getMapOperator(empoutStream); 
final KStream<String, EmpModel> empModelinput = getMapOperator(inputStream); 

// empModelinput.print(); 
// empModelStream.print(); 

empModelStream.join(empModelinput, new ValueJoiner<EmpModel, EmpModel, Object>() { 

    @Override 
    public Object apply(EmpModel paramV1, EmpModel paramV2) { 
     System.out.println("Model1 "+paramV1.getKey()); 
     System.out.println("Model2 "+paramV2.getKey()); 
     return paramV1; 
    } 

},JoinWindows.of("2000L")); 

兩個KStreams我得到錯誤:卡夫卡流API:我加盟empmodel

Invalid topology building: KSTREAM-MAP-0000000003 and KSTREAM-MAP-0000000004 are not joinable

回答

2

如果你想連接兩個KStreams你必須確保兩個具有相同數量的分區。 (參見「注」 中​​盒)

如果使用卡夫卡v0.10.1+,重新分區將自動發生(參見http://docs.confluent.io/current/streams/upgrade-guide.html#auto-repartitioning)。

卡夫卡v0.10.0.x你有兩種選擇:

  1. 確保原始輸入主題做有相同數量的分區
  2. 的,或者以.through("my-repartitioning-topic")呼叫在連接前添加到KStream S的一個。您需要創建主題"my-repartioning-topic"與正確數量的分區(即,分區數量與第二個KStream的原始輸入主題相同),然後再啓動您的Streams應用程序