2017-02-17 153 views
1

我正在使用卡夫卡0.10.1.1和風暴1.0.2。在kafka集成的風暴文檔中,我可以看到,使用zookeeper服務器初始化kafka spout時,仍然使用zookeeper維護偏移量。 如何使用kafka服務器引導噴口。是否有任何示例。從使用動物園管理員風暴文檔卡夫卡噴口集成

BrokerHosts hosts = new ZkHosts(zkConnString); 
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString()); 
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); 

此選項 實施例工作正常且消耗的消息。但我無法將消費者羣體或風暴節點視爲kafkamanager用戶。

嘗試的替代方法是這樣的。

KafkaSpoutConfig<String, String> kafkaSpoutConfig = newKafkaSpoutConfig(); 

KafkaSpout<String, String> spout = new KafkaSpout<>(kafkaSpoutConfig); 

private static KafkaSpoutConfig<String, String> newKafkaSpoutConfig() { 

     Map<String, Object> props = new HashMap<>(); 
     props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, bootstrapServers); 
     props.put(KafkaSpoutConfig.Consumer.GROUP_ID, GROUP_ID); 
     props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, 
       "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, 
       "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true"); 

     String[] topics = new String[1]; 
     topics[0] = topicName; 

     KafkaSpoutStreams kafkaSpoutStreams = 
       new KafkaSpoutStreamsNamedTopics.Builder(new Fields("message"), topics).build(); 

     KafkaSpoutTuplesBuilder<String, String> tuplesBuilder = 
       new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(new TuplesBuilder(topicName)).build(); 

     KafkaSpoutConfig<String, String> spoutConf = 
       new KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, tuplesBuilder).build(); 

     return spoutConf; 
    } 

但是這個解決方案在從kafka讀取一些消息後顯示CommitFailedException。

回答

0

Storm-kafka在通用kafka客戶端的zookeeper中以不同位置和不同格式寫入消費者信息。所以你不能在kafkamanager ui中看到它。

您可以找到一些其他監視器工具,如 https://github.com/keenlabs/capillary