我有一個spark工作,它從一個cassandra表中運行讀取數據,並將結果轉儲回兩個表中,稍作修改。我的問題是這項工作需要比預期更長的時間。Spark-Cassandra寫入需要比預期更長的時間
的代碼如下:
val range = sc.parallelize(0 to 100)
val rdd1 = range.map(x => (some_value, x)).joinWithCassandraTable[Event](keyspace_name, table2).select("col1", "col2", "col3", "col4", "col5", "col6", "col7").map(x => x._2)
val rdd2: RDD[((Int, String, String, String), Iterable[Event])] = rdd1.keyBy(r => (r.col1, r.col2, r.col3, r.col4)).groupByKey
val rdd3 = rdd2.mapValues(iter => someFunction(iter.toList.sorted))
//STORE 1
rdd3.map(r => (r._1._1, r._1._2, r._1._3, r._1._4, r._2.split('|')(1).toDouble)).saveToCassandra(keyspace_name, table1, SomeColumns("col1","col2", "col3","col4", "col5"))
//STORE 2
rdd3.map(r => (to, r._1%100, to, "MANUAL_"+r._1+"_"+r._2+"_"+r._3+"_"+r._4+"_"+java.util.UUID.randomUUID(), "M", to, r._4, r._3, r._1, r._5, r._2)).saveToCassandra(keyspace_name, table2, SomeColumns("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10", "col11"))
對於圍繞一百萬條記錄,STORE 1花費接近40秒,並且STORE 2(輕微修改rdd3)開超過一分鐘。我不知道我錯在哪裏,或者爲什麼要花這麼多時間。我的火花環境如下:
DSE 4.8.9具有6個節點 70 GB RAM 12次芯各自
任何幫助理解。
你嘗試加入rdd3設置檢查點,看看它會更快? –
不是。它不會更快 –