2015-09-07 88 views
0

我一直在向Kafka主題發送消息,這些消息在主題中使用JSON,我使用KafkaSpout從Kafka獲取消息並使用shuffle將消息發送給螺栓分組。現在我想要在KafkaSpout和bolt之間實現字段組合。請任何人都可以幫我解決這個問題。如何實現KafkaSpout與螺栓之間的區域分組。我如何實現KafkaSpout和螺栓之間的字段分組

回答

2

您需要實現backtype.storm.spout.scheme接口,基本上它看起來是這樣的:

public class FooScheme implements Scheme { 

    public Values deserialize(final byte[] _line) { 

    try{ 
      Values values = new Values(); 
      JSONObject msg = (JSONObject) JSONValue.parseWithException(new String(_line)); 
      values.add((String) msg.get("a")); 
      values.add((String) msg.get("b")) 
      values.add(msg) 
     } 
     catch(ParseException e) { 
      //handle the exception 
      return null; 
     } 

    } 

    public Fields getOutputFields() { 
    return new Fields("a", "b", "json");} 
} 

,你與你的嘴像這樣使用:

SpoutConfig spoutConfig = new SpoutConfig(... your config here ...); 
spoutConfig.scheme = new SchemeAsMultiScheme(new FooScheme()); 
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); 
topology.setSpout("kafka-spout", 1).setNumTasks(1); 

,現在你可以準備好使用「a」或「b」或兩者的字段分組。

FooBolt bolt = new FooBolt(); 
topology.setBolt("foo-bolt", new FooBolt(), 1).setNumtasks(1) 
     .fieldsGrouping("kafka-spout", new Fields("a","b")); 

享受

+0

我曾試圖用你的代碼示例實現,但我可以得到下面的錯誤--- java.lang.IllegalArgumentException異常:有錯號碼字段創建的元組。預期1個字段,但在backtype.storm.tuple.TupleImpl處獲得2個字段 。 (TupleImpl.java:55)〜[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] –

+0

@RishiArora getOutputFields中的字段數與反序列化中的值的數量? –

+0

公共值反序列化(最後一個字節[] _line){ \t \t // TODO自動生成方法存根 \t \t嘗試{ \t \t \t值的值=新值(); \t \t \t的JSONObject味精=(的JSONObject)JSONValue \t \t \t \t \t .parseWithException(新字符串(_line)); \t \t \t //values.add((String)msg.get(「id」)); \t \t \t values.add(msg); \t \t \t返回值; –