我已經在卡夫卡主題中寫了一個主題爲my-topic
,我正在嘗試獲取火花中的主題信息。但是當我收到長長的錯誤列表時,我在顯示卡夫卡主題細節時遇到了一些困難。我使用java來獲取數據。卡夫卡主題內容不顯示在火花中
下面是我的代碼:
public static void main(String s[]) throws InterruptedException{
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Sampleapp");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "Different id is allotted for different stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("my-topic");
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
JavaPairDStream<String, String> jPairDStream = stream.mapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
return new Tuple2<>(record.key(), record.value());
}
});
jPairDStream.foreachRDD(jPairRDD -> {
jPairRDD.foreach(rdd -> {
System.out.println("key="+rdd._1()+" value="+rdd._2());
});
});
jssc.start();
jssc.awaitTermination();
stream.mapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
return new Tuple2<>(record.key(), record.value());
}
});
}
下面是我得到的錯誤:
用放電的默認log4j的配置文件: 組織/阿帕奇/火花/ log4j-defaults.properties 17/09/04 11:41:15信息 SparkContext:運行Spark版本2.1.0 17/09/04 11:41:15 WARN NativeCodeLoader:無法爲您的 平臺加載native-hadoop庫...使用builtin-java適用的課程17/09/04 11:41:15 INFO SecurityManager:將視圖acls更改爲:11014525 17/09/04 11:41:15 INFO SecurityManager:將修改acls更改爲: 11014525 17/09/04 11:41:15 INFO SecurityManager:更改查看acls 組到:17/09/04 11:41:15 INFO SecurityManager:將修改 acls組爲:17/09/04 11:41:15 INFO SecurityManager: SecurityManager:身份驗證已禁用;用戶禁用;具有查看權限的用戶 :Set(11014525);具有查看權限的組: Set();擁有修改權限的用戶:Set(11014525);與 修改權限:Set()17/09/04 11:41:15 INFO Utils:成功 在端口56668上啓動服務'sparkDriver'。17/09/04 11:41:15信息 SparkEnv:註冊MapOutputTracker 17/09/04 11:41:15 INFO SparkEnv:註冊BlockManagerMaster 17/09/04 11:41:15信息 BlockManagerMasterEndpoint:使用 org.apache.spark.storage.DefaultTopologyMapper獲取拓撲結構 信息17/09/04 11:41:15 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 17/09/04 11:41:15 INFO DiskBlockManager: 創建本地目錄爲 C:\ Users \ 11014525 \ AppData \ Local \ Temp \ blockmgr-cba489b9-2458-455a -8c03-4c4395a01d44 17/09/04 11:41:15 INFO備忘錄ryStore:MemoryStore以容量啓動 896.4 MB 17/09/04 11:41:16 INFO SparkEnv:註冊OutputCommitCoordinator 17/09/04 11:41:16 INFO實用程序:成功 在端口4040上啓動服務「SparkUI」。17/09/04 11:41:16信息 SparkUI:將SparkUI綁定到0.0.0.0,並從 開始http://172.16.202.21:4040 17/09/04 11:41:16信息執行程序:在主機localhost上啓動 executor ID驅動程序17/09/04 11:41:16 INFO Utils: 成功啓動服務 'org.apache.spark.network.netty.NettyBlockTransferService'on port 56689. 17/09/04 11:41:16 INFO NettyBlockTransferService:在172.16.202.21上創建的服務器:56689 17/09/04 11:41:16 INFO BlockManager: 使用org.apache.spark.storage.RandomBlockReplicationPolicy for bloc k 複製策略17/09/04 11:41:16 INFO BlockManagerMaster: 註冊BlockManager BlockManagerId(驅動程序,172.16.202.21,56689, 無)17/09/04 11:41:16 INFO BlockManagerMasterEndpoint:註冊 塊管理器172.16.202.21:56689與896.4 MB RAM, BlockManagerId(驅動程序,172.16.202.21,56689,無)17/09/04 11:41:16 INFO BlockManagerMaster:註冊的BlockManager BlockManagerId(驅動程序,172.16.202.21,56689,無)17/09/04 11:41:16 INFO BlockManager:初始化BlockManager:BlockManagerId(驅動程序, 172.16.202。21,56689,None)17/09/04 11:41:16 WARN KafkaUtils:爲executor覆蓋enable.auto.commit爲false 17/09/04 11:41:16 WARN KafkaUtils:覆蓋auto.offset.reset沒有執行者 17/09/04 11:41:16 WARN KafkaUtils:覆蓋執行器group.id到 spark-executor-different id分配給不同的流17/09/04 11:41:16 WARN KafkaUtils:覆蓋receive.buffer.bytes到65536見 KAFKA-3135 17/09/04 11:41:16 INFO DirectKafkaInputDStream:滑動時間 = 10000毫秒17/09/04 11:41:16信息DirectKafkaInputDStream:存儲級別=序列化1x複製17/09/04 11:41:16信息 DirectKafkaInputDStream:Checkpoint interval = null 17/09/04 11:41:16 信息DirectKafkaInputDStream:記住間隔= 10000毫秒17/09/04 11:41:16 INFO DirectKafkaInputDStream:初始化並驗證 [email protected]407b 17/09/04 11:41:16 INFO MappedDStream:Slide time = 10000 ms 17/09/04 11:41:16 INFO MappedDStream:存儲級別=序列化1x複製 17/09/04 11:41:16 INFO MappedDStream:Checkpoint interval = null 17/09/04 11:41:16 INFO MappedDStream:記住interval = 10000 ms 17/09/04 11:41:16 INFO MappedDStream:已初始化並生效 [email protected] 17/09/04 11:41:16 INFO ForEachDStream:Slide time = 10000毫秒17/09/04 11:41:16 信息ForEachDStream:存儲級別=序列化1x複製17/09/04 11:41:16 INFO ForEachDStream:Checkpoint interval = null 17/09/04 11:41:16 INFO ForEachDStream:記住間隔= 10000毫秒17/09/04 11:41:16 INFO ForEachDStream:已初始化並驗證過 [email protected] 17/09/04 11:41:16 ERROR StreamingContext:啓動上下文時出錯,標記爲 it stopped org.apache.kafka.common.config.ConfigException:Missing 需要的配置「partition.assignment.strategy」,它沒有 默認值。在 org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)在 org.apache.kafka.common.config.AbstractConfig。(AbstractConfig.java:48) 在 org.apache。 kafka.clients.consumer.ConsumerConfig。(ConsumerConfig.java:194) 在 org.apache.kafka.clients.consumer.KafkaConsumer。(KafkaConsumer.java:380) 在 org.apache.kafka.clients.consumer。 KafkaConsumer。(KafkaConsumer.java:363) 在 org.apache.kafka.clients.consumer.KafkaConsumer。(KafkaConsumer.java:350) 在 org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy。 scala:83) at org.apach e.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)在 org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243) 在 org.apache.spark.streaming在 scala.collection .DStreamGraph $$ anonfun $開始$ 5.apply(DStreamGraph.scala:49) 在 org.apache.spark.streaming.DStreamGraph $$ anonfun $開始$ 5.apply(49 DStreamGraph.scala) .parallel.mutable.ParArray $ ParArrayIterator.foreach_quick(ParArray.scala:143) 在 scala.collection.parallel.mutable.ParArray $ ParArrayIterator.foreach(ParArray.scala:136) 在 scala.collection.parallel.ParIterableLike $ Foreach.leaf(ParIterableLike.scala :972) at scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply $ mcV $ sp(Tasks.scala:49) at scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply (任務。Scala:48) at scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply(Tasks.scala:48) at scala.collection.parallel.Task $ class.tryLeaf(Tasks.scala:51)at scala.collection.parallel.ParIterableLike $ Foreach.tryLeaf(ParIterableLike.scala:969) 在 scala.collection.parallel.AdaptiveWorkStealingTasks $ WrappedTask $ class.compute(Tasks.scala:152) 在 scala.collection.parallel .AdaptiveWorkStealingForkJoinTasks $ WrappedTask.compute(Tasks.scala:443) 在 scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) 在 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260 ) 在 scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339) 在 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 在 scala.concurrent.forkjoin.ForkJoinWorkerThread .run(ForkJoinWorkerThread.java:107) at ...在單獨的線程中使用org.apache.spark.util.ThreadUtils運行 ...()at org.apache.spark.streaming.StreamingContext.liftedTree1 $ 1(StreamingContext .scala:578) 在 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) 在 org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556 ) at Json.ExcelToJson.SparkConsumingKafka.main(SparkConsumingKafka.java:56) 17/09/04 11:41:16 INFO ReceiverTracker:ReceiverTracker已停止 17/09/04 11:41:16信息JobGenerator:立即停止JobGenerator 17/09/04 11:41:16 INFO RecurringTimer:JobGenerator的停止計時器 after time -1 17/09/04 11:41:16 INFO JobGenerator:停止 JobGenerator 17/09/04 11:41:16 INFO JobScheduler:停止JobScheduler 線程「主」中的異常 org.apache.kafka.common.config.ConfigException:缺少必需的 配置「partition.assignment.strategy」,它沒有默認的 值。在 org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)在 org.apache.kafka.common.config.AbstractConfig。(AbstractConfig.java:48) 在 org.apache。 kafka.clients.consumer.ConsumerConfig。(ConsumerConfig.java:194) 在 org.apache.kafka.clients.consumer.KafkaConsumer。(KafkaConsumer.java:380) 在 org.apache.kafka.clients.consumer。 KafkaConsumer。(KafkaConsumer.java:363) 在 org.apache.kafka.clients.consumer.KafkaConsumer。(KafkaConsumer.java:350) 在 org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy。 scala:83) at org.apach e.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)在 org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243) 在 org.apache.spark.streaming在 scala.collection .DStreamGraph $$ anonfun $開始$ 5.apply(DStreamGraph.scala:49) 在 org.apache.spark.streaming.DStreamGraph $$ anonfun $開始$ 5.apply(49 DStreamGraph.scala) .parallel.mutable.ParArray $ ParArrayIterator.foreach_quick(ParArray.scala:143) 在 scala.collection.parallel.mutable.ParArray $ ParArrayIterator.foreach(ParArray.scala:136) 在 scala.collection.parallel.ParIterableLike $ Foreach.leaf(ParIterableLike.scala :972) at scala。collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply $ mcV $ sp(Tasks.scala:49) at scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply(Tasks.scala:48) 在 scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply(Tasks.scala:48) at scala.collection.parallel.Task $ class.tryLeaf(Tasks.scala:51)at scala.collection。 parallel.ParIterableLike $ Foreach.tryLeaf(ParIterableLike.scala:969) 在 scala.collection.parallel.AdaptiveWorkStealingTasks $ WrappedTask $ class.compute(Tasks.scala:152) 在 scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks $ WrappedTask。 compute(Tasks.scala:443) at scala.concurrent.forkjoin.Recursi veAction.exec(RecursiveAction.java:160) 在 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 在 scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339) 在 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 在 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 在...在單獨的線程中使用組織運行。 apache.spark.util.ThreadUtils ...()在 org.apache.spark.streaming.StreamingContext.liftedTree1 $ 1(StreamingContext.scala:578) 在 org.apache.spark.streaming.StreamingContext.start(聖reamingContext.scala:572) 在 org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556) 在 Json.ExcelToJson.SparkConsumingKafka.main(SparkConsumingKafka.java:56) 17/09/04 11:41:16信息SparkContext:從關機調用stop() hook 17/09/04 11:41:16 INFO SparkUI:停止Spark Web UI http://172.16.202.21:4040 17/09/04 11:41:16信息 MapOutputTrackerMasterEndpoint:MapOutputTrackerMasterEndpoint 停止! 17/09/04 11:41:16 INFO MemoryStore:MemoryStore清除 17/09/04 11:41:16 INFO BlockManager:BlockManager停止17/09/04 11:41:16 INFO BlockManagerMaster:BlockManagerMaster停止17/09/04/04 11:41:16 INFO OutputCommitCoordinator $ OutputCommitCoordinatorEndpoint: OutputCommitCoordinator已停止! 17/09/04 11:41:16信息SparkContext: 成功停止SparkContext 17/09/04 11:41:16信息 ShutdownHookManager:Shutdown hook called 17/09/04 11:41:16信息 ShutdownHookManager:刪除目錄 C:\用戶\ 11014525 \應用程序數據\本地\ TEMP \火花37334cdc-9680-4801-8e50-ef3024ed1d8a
的pom.xml
org.apache.spark 火花streaming_2 .11 2.1.0 commons-l ANG 公共琅 2.6 org.apache.kafka kafka_2.10 0.8.2.0 org.apache.spark 火花流 - 卡夫卡0-10_2.10 2.1.1
org.apache.kafka.common.config.ConfigException:缺少必需的配置「partition.assignment.strategy」,它沒有默認值。 可能與它有關 – Stultuske
@bleedcode我根據您提供的pom.xml細節編輯了我的答案。請嘗試一下,讓我知道它是否能解決您的問題。 – abaghel