2017-02-23 43 views
1

我需要將從HBASE讀取的RDD序列化到alluxio內存文件系統中,以便緩存和更新它定期用於增量式SPARK計算。通過saveAsObject保存RDD,Exception「有一個不可序列化的結果:org.apache.hadoop.hbase.io.ImmutableBytesWritable」

代碼都是這樣的,但碰上名爲例外

val inputTableNameEvent = HBaseTables.TABLE_XXX.tableName 
val namedeRDDName = "EventAllCached2Update" 
val alluxioPath = "alluxio://hadoop1:19998/" 
val fileURI = alluxioPath + namedeRDDName 
val path:AlluxioURI = new AlluxioURI("/"+namedeRDDName) 

val fs:FileSystem = FileSystem.Factory.get() 

val conf = HBaseConfiguration.create() 
conf.set(TableInputFormat.INPUT_TABLE, inputTableNameEvent) 

val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], 
       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], 
       classOf[org.apache.hadoop.hbase.client.Result]) 
numbers = rdd.count() 
println("rdd count: " + numbers) 
if(fs.exists(path)) 
     fs.delete(path) 
rdd.saveAsObjectFile(fileURI) 

誰能幫助告訴如何ImmutableBytesWritable映射到另一種類型的繞過這個問題?此外,映射需要可恢復,因爲後來我需要使用objectFile來讀取這個保存的對象,並將它變成一個[(ImmutableBytesWritable,Result)] RDD,稍後用於更新和計算。

回答

0

您需要將rdd轉換爲行對象。像下面的東西,然後將其保存到hdfs。解析後的RDD與現在的任何其他rdd數據一樣

val parsedRDD = yourRDD.map(tuple => tuple._2).map(result => (
     Row((result.getRow.map(_.toChar).mkString), 
     (result.getColumn("CF".getBytes(),"column1".getBytes()).get(0).getValue.map(_.toChar).mkString), 
     (result.getColumn("CF".getBytes(),"column2".getBytes()).get(0).getValue.map(_.toChar).mkString), 
     (result.getColumn("CF".getBytes(),"column3".getBytes()).get(0).getValue.map(_.toChar).mkString), 
     (result.getColumn("CF".getBytes(),"column4".getBytes()).get(0).getValue.map(_.toChar).mkString), 
     (result.getColumn("CF".getBytes(),"column5".getBytes()).get(0).getValue.map(_.toChar).mkString), 
     (result.getColumn("CF".getBytes(),"column5".getBytes()).get(0).getValue.map(_.toChar).mkString) 
    ))) 
相關問題