2017-08-12 111 views
0

任何人都可以幫助接受返回迭代器listWords()方法mapPartitions。映射分區迭代器返回

object MapPartitionExample { 

    def main(args: Array[String]): Unit = { 

    val conf= new SparkConf().setAppName("MapPartitionExample").setMaster("local[*]") 
    val sc= new SparkContext(conf) 

    val input:RDD[String] = sc.parallelize(List("ABC","DEF","GHU","YHG")) 

    val x= input.mapPartitions(word => listWords(word)) 


    } 

    def listWords(words: Iterator[String]) : util.Iterator[String] = { 

    val arrList = new util.ArrayList[String]() 
    while(words.hasNext) { 
     arrList.add(words.next()) 
    } 
    return arrList.iterator() 
    } 

} 

回答

0

Iterable[NotInferU]預期,但你是通過導入scala.collection.JavaConversions._如下

def listWords(words: Iterator[String]) : Iterator[String] = { 
    val arrList = new util.ArrayList[String]() 
    while(words.hasNext) { 
     arrList.add(words.next()) 
    } 
    import scala.collection.JavaConversions._ 
    return arrList.toList.iterator 
    } 

代碼的休息恢復java.util.Iterator[String]

您需要的java.util.Iterator轉換爲scala Iterator是因爲它是。

我希望答案是mapPartitions使用應該是scala.collection.Iterator,不java.util.Iterator功能的有益

+0

非常感謝.... – Tinku

+0

我的榮幸@ Tinku :)是否有效? –

+0

是的,工作完美。 – Tinku

0

返回類型。我沒有看到你當前的代碼的遠點,但你可以使用Scala的可變集合:

import scala.collection.mutable.ArrayBuffer 

def listWords(words: Iterator[String]) : Iterator[String] = { 
    val arr = ArrayBuffer[String]() 
    while(words.hasNext) { 
    arr += words.next() 
    } 
    arr.toIterator 
} 

我個人倒只是map

def listWords(words: Iterator[String]) : Iterator[String] = { 
    // Some init code 
    words.map(someFunction) 
} 
+0

非常感謝...它的工作。 – Tinku