2017-08-03 202 views
0

我使用Druid Kafka Indexing服務來加載我自己的卡夫卡流。我使用Load from Kafka tutorial來實現它。德魯伊卡夫卡攝取(暗示-2.2.3):卡夫卡錯誤NoReplicaOnlineException

卡夫卡的所有設置默認(僅從tgz提取)。

當我開始暗示-2.2.3(德魯伊)與空數據(後VAR刪除文件夾)中的所有工作正常。

但是,當我停止卡夫卡2.11-0.10.2.0並啓動它,直到我停止暗示(德魯伊),並刪除所有數據再次出現錯誤和德魯伊卡夫卡攝入沒有更多的作品(即除去VAR文件夾)。

有時候德魯伊不會從卡夫卡攝取數據,甚至在卡夫卡也沒有錯誤。 當我刪除var文件夾在德魯伊全部被修復,直到下一個相同的錯誤。

錯誤:

kafka.common.NoReplicaOnlineException: No replica for partition [__consumer_offsets,19] is alive. Live brokers are: [Set()], Assigned replicas are: [List(0)] 
    at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:73) ~[kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:339) ~[kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:200) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:115) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:112) [kafka_2.11-0.10.2.0.jar:?] 
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) [scala-library-2.11.8.jar:?] 
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) [scala-library-2.11.8.jar:?] 
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) [scala-library-2.11.8.jar:?] 
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) [scala-library-2.11.8.jar:?] 
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) [scala-library-2.11.8.jar:?] 
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) [scala-library-2.11.8.jar:?] 
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) [scala-library-2.11.8.jar:?] 
    at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:112) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:67) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:342) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:160) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:85) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:51) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:49) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:49) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.server.ZookeeperLeaderElector.startup(ZookeeperLeaderElector.scala:49) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.KafkaController$$anonfun$startup$1.apply$mcV$sp(KafkaController.scala:681) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:677) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:677) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.KafkaController.startup(KafkaController.scala:677) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.server.KafkaServer.startup(KafkaServer.scala:224) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:39) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.Kafka$.main(Kafka.scala:67) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.Kafka.main(Kafka.scala) [kafka_2.11-0.10.2.0.jar:?] 

,我做的步驟:

1.啓動意味着:

bin/supervise -c conf/supervise/quickstart.conf 

2.啓動卡夫卡:

./bin/kafka-server-start.sh config/server.properties 

3.創建話題:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikiticker 

4.啓用德魯伊卡夫卡攝入:

curl -XPOST -H'Content-Type: application/json' -d @quickstart/wikiticker-kafka-supervisor.json http://localhost:8090/druid/indexer/v1/supervisor 

5.後事件到卡夫卡的主題,然後將其攝取到德魯伊由卡夫卡索引服務

在所有的.properties文件(common.runtime.properties,經紀人,協調器,歷史,middlemanager,霸王)添加的屬性:

druid.extensions.loadList=["druid-caffeine-cache", "druid-histogram", "druid-datasketches", "druid-kafka-indexing-service"] 

包括「德魯伊卡夫卡索引服務」提供攝取服務。

我相信這樣的問題不應該發生在Druid Kafka Indexing

有沒有辦法找出這個問題?

回答

0

該消息表示代理ID爲0的代理已關閉,並且因爲它是託管該分區的唯一代理,所以您現在無法使用該分區。您必須確保代理0已啓動並投放。

0

看起來您擁有單個節點Kafka集羣,並且唯一的代理節點已關閉。這不是一個非常容錯的設置。您應該有3個卡夫卡經紀人並設置複製因子爲3的所有主題,以便系統即使在一個或兩個卡夫卡經紀人關閉時也能正常工作。單節點羣集通常僅用於開發。

0

我通過添加3個Kafka代理並設置複製因子爲3的所有主題來解決Kafka穩定性問題。

在德魯伊我修正了問題,增加druid.worker.capacity在middleManager和減少taskDurationioConfig監督規範。

詳情在another question