2017-04-19 64 views
0

我試圖循環訪問文本文件的RDD,並對文件中的每個唯一字進行計數,然後累積每個唯一字後面的所有單詞以及它們的計數。到目前爲止,這是我所:如何使用設置爲(String,(String,Int))的元組中value的鍵值的reduceByKey?

// connecting to spark driver 
val conf = new SparkConf().setAppName("WordStats").setMaster("local") 
val spark = new SparkContext(conf) //Creates a new SparkContext object 

//Loads the specified file into an RDD 
val lines = sparkContext.textFile(System.getProperty("user.dir") + "/" + "basketball_words_only.txt") 

//Splits the file into individual words 
val words = lines.flatMap(line => { 

    val wordList = line.split(" ") 

    for {i <- 0 until wordList.length - 1} 

    yield (wordList(i), wordList(i + 1), 1) 

}) 

Output Generated By My Current MapReduce Program

如果我沒有明確迄今爲止,我所要做的是積累了一套遵循每個單詞的詞文件以及所述單詞遵循的次數。

回答

0

如果我理解正確的話,我們有這樣的事情:

val lines: Seq[String] = ... 
val words: Seq[(String, String, Int)] = ... 

而且我們希望是這樣的:

val frequencies: Map[String, Seq[(String, Int)]] = { 
    words 
    .groupBy(_._1)      // word -> [(w, next, cc), ...] 
    .mapValues { values => 
     values 
     .map { case (w, n, cc) => (n, cc) } 
     .groupBy(_._1)     // next -> [(next, cc), ...] 
     .mapValues(_.reduce(_._2 + _._2)) // next -> sum 
     .toSeq 
    } 
} 
+0

謝謝@adu!您似乎完全理解我在找什麼,但是,當我嘗試將此代碼添加到自己的代碼中時,在編譯時會遇到兩個錯誤,如下所示:錯誤:(92,36)type mismatch; (字符串,Int) .mapValues(_。reduce(_._ 2 + _._ 2))// next - > sum
錯誤:(93,12)類型不匹配; found:Seq [(String,(String,Int))] required:Seq [(String,Int)] .toSeq將您的代碼放置在第85-95行 – JGT

相關問題