2016-08-03 187 views
3

我試圖從RDD過濾空值,但失敗。這裏是我的代碼:Spark&Scala - 無法過濾來自RDD的空值

val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], 
     classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], 
     classOf[org.apache.hadoop.hbase.client.Result]) 

val raw_hbaserdd = hBaseRDD.map{ 
    kv => kv._2 
} 

val Ratings = raw_hbaseRDD.map { 
     result => val x = Bytes.toString(result.getValue(Bytes.toBytes("data"),Bytes.toBytes("user"))) 
       val y = Bytes.toString(result.getValue(Bytes.toBytes("data"),Bytes.toBytes("item"))) 
       val z = Bytes.toString(result.getValue(Bytes.toBytes("data"),Bytes.toBytes("rating"))) 

       (x,y, z) 
    } 
Ratings.filter (x => x._1 != null) 

Ratings.foreach(println) 

調試時,空值仍然出現篩選後:

(3359,1494,4) 
(null,null,null) 
(28574,1542,5) 
(null,null,null) 
(12062,1219,5) 
(14068,1459,3) 

任何更好的主意?

+1

你做錯了。 Ratings.filter(x => x._1!= null).foreach(println)將起作用 – Knight71

+0

'val filteredRatings = Ratings.filter(x => x._1!= null)'和'filteredRatings.foreach(println)' 。 –

回答

5
Ratings.filter (x => x._1 != null) 

這實際上變換RDD,但你是不是使用那個特定的RDD。你可以試試

Ratings.filter(_._1 !=null).foreach(println) 
2

RDD是不可變的對象 - RDD上的任何轉換都不會改變原來的RDD,而是會產生一個新的RDD。所以 - 你應該使用RDD從filter,如果你想看到的filter效果(就像你的map結果做)返回:

val result = Ratings.filter (x => x._1 != null) 
result.foreach(println)