0

我有一個spark工作,它從一個cassandra表中運行讀取數據,並將結果轉儲回兩個表中,稍作修改。我的問題是這項工作需要比預期更長的時間。Spark-Cassandra寫入需要比預期更長的時間

的代碼如下:

val range = sc.parallelize(0 to 100) 

val rdd1 = range.map(x => (some_value, x)).joinWithCassandraTable[Event](keyspace_name, table2).select("col1", "col2", "col3", "col4", "col5", "col6", "col7").map(x => x._2) 

val rdd2: RDD[((Int, String, String, String), Iterable[Event])] = rdd1.keyBy(r => (r.col1, r.col2, r.col3, r.col4)).groupByKey 

val rdd3 = rdd2.mapValues(iter => someFunction(iter.toList.sorted)) 

//STORE 1 

rdd3.map(r => (r._1._1, r._1._2, r._1._3, r._1._4, r._2.split('|')(1).toDouble)).saveToCassandra(keyspace_name, table1, SomeColumns("col1","col2", "col3","col4", "col5")) 

//STORE 2 

rdd3.map(r => (to, r._1%100, to, "MANUAL_"+r._1+"_"+r._2+"_"+r._3+"_"+r._4+"_"+java.util.UUID.randomUUID(), "M", to, r._4, r._3, r._1, r._5, r._2)).saveToCassandra(keyspace_name, table2, SomeColumns("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10", "col11")) 

對於圍繞一百萬條記錄,STORE 1花費接近40秒,並且STORE 2(輕微修改rdd3)開超過一分鐘。我不知道我錯在哪裏,或者爲什麼要花這麼多時間。我的火花環境如下:

DSE 4.8.9具有6個節點 70 GB RAM 12次芯各自

任何幫助理解。

+0

你嘗試加入rdd3設置檢查點,看看它會更快? –

+0

不是。它不會更快 –

回答

0

讓我做我的猜測。需要日誌,perf監視輸出和C *數據模型才能獲得更精確的答案。 但一些數學: 你有

  • joinWithCassandra - 隨機C *閱讀
  • saveToCassandra - 秒C *寫
  • 火花重新分配? /減少

(我希望saveToCassadndra需要的所有時間的一半) ,如果你需要之前不要運行任何疑問減去12-20秒,火花開始執行者和其他的東西

爲SO在6個節點上的1M條目和你得到的40秒: 1000000/6/40 = 4166記錄/秒/節點。這並不壞。每個節點10K/s的混合工作負載是一個很好的結果。

第二次寫入比第二次寫入大2倍(11列與5比較),它會在第一次寫入後運行,因此我希望卡桑德拉在此時開始將先前的數據溢出到磁盤,以便在此處獲得更多的性能下降。

我是否正確理解當您添加rdd3.cache()調用時,第二次運行沒有任何更改?那很奇怪。

是的,你可以得到更好的與C *數據模型的調整結果和Spark/C *參數

相關問題