0
我一直在向Kafka主題發送消息,這些消息在主題中使用JSON,我使用KafkaSpout
從Kafka獲取消息並使用shuffle將消息發送給螺栓分組。現在我想要在KafkaSpout
和bolt之間實現字段組合。請任何人都可以幫我解決這個問題。如何實現KafkaSpout
與螺栓之間的區域分組。我如何實現KafkaSpout和螺栓之間的字段分組
我一直在向Kafka主題發送消息,這些消息在主題中使用JSON,我使用KafkaSpout
從Kafka獲取消息並使用shuffle將消息發送給螺栓分組。現在我想要在KafkaSpout
和bolt之間實現字段組合。請任何人都可以幫我解決這個問題。如何實現KafkaSpout
與螺栓之間的區域分組。我如何實現KafkaSpout和螺栓之間的字段分組
您需要實現backtype.storm.spout.scheme
接口,基本上它看起來是這樣的:
public class FooScheme implements Scheme {
public Values deserialize(final byte[] _line) {
try{
Values values = new Values();
JSONObject msg = (JSONObject) JSONValue.parseWithException(new String(_line));
values.add((String) msg.get("a"));
values.add((String) msg.get("b"))
values.add(msg)
}
catch(ParseException e) {
//handle the exception
return null;
}
}
public Fields getOutputFields() {
return new Fields("a", "b", "json");}
}
,你與你的嘴像這樣使用:
SpoutConfig spoutConfig = new SpoutConfig(... your config here ...);
spoutConfig.scheme = new SchemeAsMultiScheme(new FooScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
topology.setSpout("kafka-spout", 1).setNumTasks(1);
,現在你可以準備好使用「a」或「b」或兩者的字段分組。
FooBolt bolt = new FooBolt();
topology.setBolt("foo-bolt", new FooBolt(), 1).setNumtasks(1)
.fieldsGrouping("kafka-spout", new Fields("a","b"));
享受
我曾試圖用你的代碼示例實現,但我可以得到下面的錯誤--- java.lang.IllegalArgumentException異常:有錯號碼字段創建的元組。預期1個字段,但在backtype.storm.tuple.TupleImpl處獲得2個字段 。(TupleImpl.java:55)〜[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] –
@RishiArora getOutputFields中的字段數與反序列化中的值的數量? –
公共值反序列化(最後一個字節[] _line){ \t \t // TODO自動生成方法存根 \t \t嘗試{ \t \t \t值的值=新值(); \t \t \t的JSONObject味精=(的JSONObject)JSONValue \t \t \t \t \t .parseWithException(新字符串(_line)); \t \t \t //values.add((String)msg.get(「id」)); \t \t \t values.add(msg); \t \t \t返回值; –