2016-07-29 94 views
0

使用mapPartition比方說,我有以下數據框:星火:使用Scala

var randomData = Seq(("a",8),("h",5),("f",3),("a",2),("b",8),("c",3) 
val df = sc.parallelize(randomData,2).toDF() 

,我有這個功能,這將是爲mapPartition輸入:

def trialIterator(row:Iterator[(String,Int)]): Iterator[(String,Int)] = 
    row.toArray.tail.toIterator 

而且使用地圖分區:

df.mapPartition(trialIterator) 

我有以下錯誤信息:

類型不匹配,預期(迭代[行])=>迭代[NotInferedR],則實際:迭代[(字符串,強度)=>迭代[(字符串,整數)]

我可以明白這是由於我的函數的輸入,輸出類型,但如何解決這個問題呢?

回答

2

如果你想獲得強類型輸入在這個特殊的情況下不使用Dataset[Row]DataFrame),但Dataset[T]其中T(String, Int)。還沒有轉換爲Array,如果分區是空的不叫盲目tail不知道:

def trialIterator(iter: Iterator[(String, Int)]) = iter.drop(1) 

randomData 
    .toDS // org.apache.spark.sql.Dataset[(String, Int)] 
    .mapPartitions(trialIterator _) 

randomData.toDF // org.apache.spark.sql.Dataset[Row] 
    .as[(String, Int)] // org.apache.spark.sql.Dataset[(String, Int)] 
    .mapPartitions(trialIterator _) 
+0

感謝您的回答。這裏的功能只是爲了說明我的問題。不是我想用的那個。爲什麼我不應該使用數據框? –

+2

因爲實際應用'DataFrame'只是一個'數據集[Seq [Any]]',所以你可以簡單地認爲它是無類型的/不安全的。 – zero323

0

你需要使用類型爲Iterator[(String,Int)],而你應該期望Iterator[Row]

def trialIterator(row:Iterator[Row]): Iterator[(String,Int)] = { 
    row.next() 
    row //seems to do the same thing w/o all the conversions 
}