2017-05-09 62 views
0
object SparkMain extends App { 
System.setProperty("spark.cassandra.connection.host", "127.0.0.1") 
val conf = new SparkConf().setMaster("local[2]").setAppName("kafkaspark").set("spark.streaming.concurrentJobs","4") 
val sc = new SparkContext(conf) 
val ssc = new StreamingContext(sc, Seconds(5)) 
val sqlContext= new SQLContext(sc) 
val host = "localhost:2181" 
val topicList = List("test","fb") 
topicList.foreach{ 
    topic=> val lines =KafkaUtils.createStream(ssc, host, topic, Map(topic -> 1)).map(_._2); 
    //configureStream(topic, lines) 
    lines.foreachRDD(rdd => rdd.map(test(_)).saveToCassandra("test","rawdata",SomeColumns("key"))) 
} 
    ssc.addStreamingListener(new StreamingListener { 
    override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { 
    System.out.println("Batch completed, Total delay :" + batchCompleted.batchInfo.totalDelay.get.toString + " ms") 
    } 
    override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = { 
    println("inside onReceiverStarted") 
    } 
    override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = { 
    println("inside onReceiverError") 
    } 
    override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = { 
    println("inside onReceiverStopped") 
    } 
    override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = { 
    println("inside onBatchSubmitted") 
    } 
    override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = { 
    println("inside onBatchStarted") 
    } 
}) 
    ssc.start() 
println("===========================") 
ssc.awaitTermination() 
} 
case class test(key: String) 

如果我把任何一個主題在同一時間,然後每個主題的作品。但是當主題列表有多個主題時,在獲得kafka主題中的DataStream之後,它將繼續打印「onBatchSubmitted」。處理muitsple kafka主題與單火花流上下文掛起batchSubmitted

回答

0

我的壞。我配置錯了。 setMaster(「local [*]」)代替setMaster(「local [2]」)。

0

更改本地[2]地方[*]和它的工作正常。

val conf = new SparkConf().setMaster("local[*]").setAppName("kafkaspark").set("spark.streaming.concurrentJobs","4")