2017-09-04 131 views
0

我已經在卡夫卡主題中寫了一個主題爲my-topic,我正在嘗試獲取火花中的主題信息。但是當我收到長長的錯誤列表時,我在顯示卡夫卡主題細節時遇到了一些困難。我使用java來獲取數據。卡夫卡主題內容不顯示在火花中

下面是我的代碼:

public static void main(String s[]) throws InterruptedException{ 
    SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Sampleapp"); 
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10)); 

    Map<String, Object> kafkaParams = new HashMap<>(); 
    kafkaParams.put("bootstrap.servers", "localhost:9092"); 
    kafkaParams.put("key.deserializer", StringDeserializer.class); 
    kafkaParams.put("value.deserializer", StringDeserializer.class); 
    kafkaParams.put("group.id", "Different id is allotted for different stream"); 
    kafkaParams.put("auto.offset.reset", "latest"); 
    kafkaParams.put("enable.auto.commit", false); 

    Collection<String> topics = Arrays.asList("my-topic"); 

    final JavaInputDStream<ConsumerRecord<String, String>> stream = 
     KafkaUtils.createDirectStream(
     jssc, 
     LocationStrategies.PreferConsistent(), 
     ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) 
    ); 

    JavaPairDStream<String, String> jPairDStream = stream.mapToPair(
      new PairFunction<ConsumerRecord<String, String>, String, String>() { 
       /** 
       * 
       */ 
       private static final long serialVersionUID = 1L; 

       @Override 
       public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception { 
        return new Tuple2<>(record.key(), record.value()); 
       } 
      }); 

    jPairDStream.foreachRDD(jPairRDD -> { 
      jPairRDD.foreach(rdd -> { 
       System.out.println("key="+rdd._1()+" value="+rdd._2()); 
      }); 
     }); 

    jssc.start();    
    jssc.awaitTermination(); 

    stream.mapToPair(
      new PairFunction<ConsumerRecord<String, String>, String, String>() { 
       /** 
       * 
       */ 
       private static final long serialVersionUID = 1L; 

       @Override 
       public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception { 
        return new Tuple2<>(record.key(), record.value()); 
       } 
      }); 
} 

下面是我得到的錯誤:

用放電的默認log4j的配置文件: 組織/阿帕奇/火花/ log4j-defaults.properties 17/09/04 11:41:15信息 SparkContext:運行Spark版本2.1.0 17/09/04 11:41:15 WARN NativeCodeLoader:無法爲您的 平臺加載native-hadoop庫...使用builtin-java適用的課程17/09/04 11:41:15 INFO SecurityManager:將視圖acls更改爲:11014525 17/09/04 11:41:15 INFO SecurityManager:將修改acls更改爲: 11014525 17/09/04 11:41:15 INFO SecurityManager:更改查看acls 組到:17/09/04 11:41:15 INFO SecurityManager:將修改 acls組爲:17/09/04 11:41:15 INFO SecurityManager: SecurityManager:身份驗證已禁用;用戶禁用;具有查看權限的用戶 :Set(11014525);具有查看權限的組: Set();擁有修改權限的用戶:Set(11014525);與 修改權限:Set()17/09/04 11:41:15 INFO Utils:成功 在端口56668上啓動服務'sparkDriver'。17/09/04 11:41:15信息 SparkEnv:註冊MapOutputTracker 17/09/04 11:41:15 INFO SparkEnv:註冊BlockManagerMaster 17/09/04 11:41:15信息 BlockManagerMasterEndpoint:使用 org.apache.spark.storage.DefaultTopologyMapper獲取拓撲結構 信息17/09/04 11:41:15 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 17/09/04 11:41:15 INFO DiskBlockManager: 創建本地目錄爲 C:\ Users \ 11014525 \ AppData \ Local \ Temp \ blockmgr-cba489b9-2458-455a -8c03-4c4395a01d44 17/09/04 11:41:15 INFO備忘錄ryStore:MemoryStore以容量啓動 896.4 MB 17/09/04 11:41:16 INFO SparkEnv:註冊OutputCommitCoordinator 17/09/04 11:41:16 INFO實用程序:成功 在端口4040上啓動服務「SparkUI」。17/09/04 11:41:16信息 SparkUI:將SparkUI綁定到0.0.0.0,並從 開始http://172.16.202.21:4040 17/09/04 11:41:16信息執行程序:在主機localhost上啓動 executor ID驅動程序17/09/04 11:41:16 INFO Utils: 成功啓動服務 'org.apache.spark.network.netty.NettyBlockTransferService'on port 56689. 17/09/04 11:41:16 INFO NettyBlockTransferService:在172.16.202.21上創建的服務器:56689 17/09/04 11:41:16 INFO BlockManager: 使用org.apache.spark.storage.RandomBlockReplicationPolicy for bloc k 複製策略17/09/04 11:41:16 INFO BlockManagerMaster: 註冊BlockManager BlockManagerId(驅動程序,172.16.202.21,56689, 無)17/09/04 11:41:16 INFO BlockManagerMasterEndpoint:註冊 塊管理器172.16.202.21:56689與896.4 MB RAM, BlockManagerId(驅動程序,172.16.202.21,56689,無)17/09/04 11:41:16 INFO BlockManagerMaster:註冊的BlockManager BlockManagerId(驅動程序,172.16.202.21,56689,無)17/09/04 11:41:16 INFO BlockManager:初始化BlockManager:BlockManagerId(驅動程序, 172.16.202。21,56689,None)17/09/04 11:41:16 WARN KafkaUtils:爲executor覆蓋enable.auto.commit爲false 17/09/04 11:41:16 WARN KafkaUtils:覆蓋auto.offset.reset沒有執行者 17/09/04 11:41:16 WARN KafkaUtils:覆蓋執行器group.id到 spark-executor-different id分配給不同的流17/09/04 11:41:16 WARN KafkaUtils:覆蓋receive.buffer.bytes到65536見 KAFKA-3135 17/09/04 11:41:16 INFO DirectKafkaInputDStream:滑動時間 = 10000毫秒17/09/04 11:41:16信息DirectKafkaInputDStream:存儲級別=序列化1x複製17/09/04 11:41:16信息 DirectKafkaInputDStream:Checkpoint interval = null 17/09/04 11:41:16 信息DirectKafkaInputDStream:記住間隔= 10000毫秒17/09/04 11:41:16 INFO DirectKafkaInputDStream:初始化並驗證 [email protected]407b 17/09/04 11:41:16 INFO MappedDStream:Slide time = 10000 ms 17/09/04 11:41:16 INFO MappedDStream:存儲級別=序列化1x複製 17/09/04 11:41:16 INFO MappedDStream:Checkpoint interval = null 17/09/04 11:41:16 INFO MappedDStream:記住interval = 10000 ms 17/09/04 11:41:16 INFO MappedDStream:已初始化並生效 [email protected] 17/09/04 11:41:16 INFO ForEachDStream:Slide time = 10000毫秒17/09/04 11:41:16 信息ForEachDStream:存儲級別=序列化1x複製17/09/04 11:41:16 INFO ForEachDStream:Checkpoint interval = null 17/09/04 11:41:16 INFO ForEachDStream:記住間隔= 10000毫秒17/09/04 11:41:16 INFO ForEachDStream:已初始化並驗證過 [email protected] 17/09/04 11:41:16 ERROR StreamingContext:啓動上下文時出錯,標記爲 it stopped org.apache.kafka.common.config.ConfigException:Missing 需要的配置「partition.assignment.strategy」,它沒有 默認值。在 org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)在 org.apache.kafka.common.config.AbstractConfig。(AbstractConfig.java:48) 在 org.apache。 kafka.clients.consumer.ConsumerConfig。(ConsumerConfig.java:194) 在 org.apache.kafka.clients.consumer.KafkaConsumer。(KafkaConsumer.java:380) 在 org.apache.kafka.clients.consumer。 KafkaConsumer。(KafkaConsumer.java:363) 在 org.apache.kafka.clients.consumer.KafkaConsumer。(KafkaConsumer.java:350) 在 org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy。 scala:83) at org.apach e.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)在 org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243) 在 org.apache.spark.streaming在 scala.collection .DStreamGraph $$ anonfun $開始$ 5.apply(DStreamGraph.scala:49) 在 org.apache.spark.streaming.DStreamGraph $$ anonfun $開始$ 5.apply(49 DStreamGraph.scala) .parallel.mutable.ParArray $ ParArrayIterator.foreach_quick(ParArray.scala:143) 在 scala.collection.parallel.mutable.ParArray $ ParArrayIterator.foreach(ParArray.scala:136) 在 scala.collection.parallel.ParIterableLike $ Foreach.leaf(ParIterableLike.scala :972) at scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply $ mcV $ sp(Tasks.scala:49) at scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply (任務。Scala:48) at scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply(Tasks.scala:48) at scala.collection.parallel.Task $ class.tryLeaf(Tasks.scala:51)at scala.collection.parallel.ParIterableLike $ Foreach.tryLeaf(ParIterableLike.scala:969) 在 scala.collection.parallel.AdaptiveWorkStealingTasks $ WrappedTask $ class.compute(Tasks.scala:152) 在 scala.collection.parallel .AdaptiveWorkStealingForkJoinTasks $ WrappedTask.compute(Tasks.scala:443) 在 scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) 在 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260 ) 在 scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339) 在 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 在 scala.concurrent.forkjoin.ForkJoinWorkerThread .run(ForkJoinWorkerThread.java:107) at ...在單獨的線程中使用org.apache.spark.util.ThreadUtils運行 ...()at org.apache.spark.streaming.StreamingContext.liftedTree1 $ 1(StreamingContext .scala:578) 在 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) 在 org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556 ) at Json.ExcelToJson.SparkConsumingKafka.main(SparkConsumingKafka.java:56) 17/09/04 11:41:16 INFO ReceiverTracker:ReceiverTracker已停止 17/09/04 11:41:16信息JobGenerator:立即停止JobGenerator 17/09/04 11:41:16 INFO RecurringTimer:JobGenerator的停止計時器 after time -1 17/09/04 11:41:16 INFO JobGenerator:停止 JobGenerator 17/09/04 11:41:16 INFO JobScheduler:停止JobScheduler 線程「主」中的異常 org.apache.kafka.common.config.ConfigException:缺少必需的 配置「partition.assignment.strategy」,它沒有默認的 值。在 org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)在 org.apache.kafka.common.config.AbstractConfig。(AbstractConfig.java:48) 在 org.apache。 kafka.clients.consumer.ConsumerConfig。(ConsumerConfig.java:194) 在 org.apache.kafka.clients.consumer.KafkaConsumer。(KafkaConsumer.java:380) 在 org.apache.kafka.clients.consumer。 KafkaConsumer。(KafkaConsumer.java:363) 在 org.apache.kafka.clients.consumer.KafkaConsumer。(KafkaConsumer.java:350) 在 org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy。 scala:83) at org.apach e.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)在 org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243) 在 org.apache.spark.streaming在 scala.collection .DStreamGraph $$ anonfun $開始$ 5.apply(DStreamGraph.scala:49) 在 org.apache.spark.streaming.DStreamGraph $$ anonfun $開始$ 5.apply(49 DStreamGraph.scala) .parallel.mutable.ParArray $ ParArrayIterator.foreach_quick(ParArray.scala:143) 在 scala.collection.parallel.mutable.ParArray $ ParArrayIterator.foreach(ParArray.scala:136) 在 scala.collection.parallel.ParIterableLike $ Foreach.leaf(ParIterableLike.scala :972) at scala。collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply $ mcV $ sp(Tasks.scala:49) at scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply(Tasks.scala:48) 在 scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply(Tasks.scala:48) at scala.collection.parallel.Task $ class.tryLeaf(Tasks.scala:51)at scala.collection。 parallel.ParIterableLike $ Foreach.tryLeaf(ParIterableLike.scala:969) 在 scala.collection.parallel.AdaptiveWorkStealingTasks $ WrappedTask $ class.compute(Tasks.scala:152) 在 scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks $ WrappedTask。 compute(Tasks.scala:443) at scala.concurrent.forkjoin.Recursi veAction.exec(RecursiveAction.java:160) 在 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 在 scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339) 在 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 在 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 在...在單獨的線程中使用組織運行。 apache.spark.util.ThreadUtils ...()在 org.apache.spark.streaming.StreamingContext.liftedTree1 $ 1(StreamingContext.scala:578) 在 org.apache.spark.streaming.StreamingContext.start(聖reamingContext.scala:572) 在 org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556) 在 Json.ExcelToJson.SparkConsumingKafka.main(SparkConsumingKafka.java:56) 17/09/04 11:41:16信息SparkContext:從關機調用stop() hook 17/09/04 11:41:16 INFO SparkUI:停止Spark Web UI http://172.16.202.21:4040 17/09/04 11:41:16信息 MapOutputTrackerMasterEndpoint:MapOutputTrackerMasterEndpoint 停止! 17/09/04 11:41:16 INFO MemoryStore:MemoryStore清除 17/09/04 11:41:16 INFO BlockManager:BlockManager停止17/09/04 11:41:16 INFO BlockManagerMaster:BlockManagerMaster停止17/09/04/04 11:41:16 INFO OutputCommitCoordinator $ OutputCommitCoordinatorEndpoint: OutputCommitCoordinator已停止! 17/09/04 11:41:16信息SparkContext: 成功停止SparkContext 17/09/04 11:41:16信息 ShutdownHookManager:Shutdown hook called 17/09/04 11:41:16信息 ShutdownHookManager:刪除目錄 C:\用戶\ 11014525 \應用程序數據\本地\ TEMP \火花37334cdc-9680-4801-8e50-ef3024ed1d8a

的pom.xml

org.apache.spark 火花streaming_2 .11 2.1.0 commons-l ANG 公共琅 2.6 org.apache.kafka kafka_2.10 0.8.2.0 org.apache.spark 火花流 - 卡夫卡0-10_2.10 2.1.1

+0

org.apache.kafka.common.config.ConfigException:缺少必需的配置「partition.assignment.strategy」,它沒有默認值。 可能與它有關 – Stultuske

+0

@bleedcode我根據您提供的pom.xml細節編輯了我的答案。請嘗試一下,讓我知道它是否能解決您的問題。 – abaghel

回答

1

從日誌中,你的spark版本是2.1.0。您沒有共享具有其他依賴關係的構建文件。它看起來像你在類路徑中都有spark-streaming-kafka-0-8_2.11-2.1.0.jarspark-streaming-kafka-0-10_2.11-2.1.0.jar,並且它加載了錯誤的類。如果您使用的是maven,那麼您需要像下面這樣的依賴關係。請檢查並更新您的項目。

<dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.11</artifactId> 
     <version>2.1.0</version> 
</dependency> 
<dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-sql_2.11</artifactId> 
     <version>2.1.0</version> 
</dependency> 
<dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.11</artifactId> 
     <version>2.1.0</version> 
</dependency> 
<dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> 
     <version>2.1.0</version> 
</dependency> 

編輯

正如你所編輯的問題,並張貼依賴我編輯我的答案。您正在使用卡夫卡版本0.8.*,而您的spark-streaming-kafka版本是0.10.*。請使用相同版本的Kafka依賴項。請使用以下的依賴於org.apache.kafka

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.11</artifactId> 
     <version>0.10.2.0</version> 
</dependency> 
+0

它仍然沒有工作,,, – BleedCode

+0

你得到什麼錯誤?如果舊罐子仍然存在,請檢查你的版本。請使用Maven更新項目和清理版本。 – abaghel