2017-04-12 112 views
0

我目前正在寫一個Spark Streaming。我的任務非常簡單,只需接收來自kafka的json消息並進行一些文本過濾(包含TEXT1,TEXT2,TEXT3,TEXT4)。代碼看起來像:爲什麼我的Spark Streaming程序處理速度如此之慢?

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     ssc, kafkaParams, topics) 


    messages.foreachRDD { rdd => 

     val originrdd = rdd.count() 

     val record = rdd.map(_._2).filter(x=>x.contains(TEXT1)).filter(x=>x.contains(TEXT2)).filter(x=>x.contains(TEXT3)).filter(x=>x.contains(TEXT4)) 

     val afterrdd = record.count() 

     println("original number of record: ", originrdd) 
     println("after filtering number of records:", afterrdd) 
} 

它是爲每個JSON消息大約4kb的,並且從圍繞卡夫卡50000記錄每1秒。

對於上述任務,處理時間每個批次需要3秒,因此無法實現實時性能。我面臨同樣的任務風暴,並且執行速度更快。

+0

順便說一句,Spark是微批,而不是實時 –

+0

那麼,如果Spark實時無法處理,那麼Spark流的優勢是什麼? – lserlohn

+0

你有多少卡夫卡? Kafka中的分區數量= Spark中的分區數量,一切都可能在一個分區中處理 –

回答

1

那麼,在這個過程中,你已經做了3次不必要的RDD。

val record = rdd.map(_._2).filter(x => { 
    x.contains(TEXT1) && 
    x.contains(TEXT2) && 
    x.contains(TEXT3) && 
    x.contains(TEXT4) 
} 

另外值得一讀。 https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

+0

謝謝,我也嘗試了以上,我發現性能非常接近。假設過濾標準低,大部分記錄將被過濾出來在這種情況下,每個過濾器操作實際上會減小rdd的大小,以便下一個過濾器操作可以在更小的記錄大小上工作,從而減少處理時間。 – lserlohn

相關問題