2017-02-18 50 views
0

我在spark-scala上做了一些基本的工作我的星火代碼計算每個組織的員工有什麼不對?

要求是顯示每個組織的僱員數量。

我已經通過使用groupByKey和Mapvalues達到了相同的要求,並且通過創建keyValueRDD作爲數組((CTS,1)(CTS,1),(TCS,1)),然後獲得了相同的要求,然後對其應用reduceByKey((x,y)=> x + y)。兩者產生了正確的預期結果

現在我正在嘗試以下樣式的邏輯。我想使用reduceByKey,但我不希望將硬編碼值的KeyValueRDD設置爲1來實現員工數量。

請幫我改變下面的代碼以獲得預期的輸出。另外我想知道爲什麼我在我的代碼中出現錯誤的輸出

由於reduceByKey是可交換的,我得到不同的輸出。

scala> myList 
res34: List[String] = List(100|Surender|CTS|CHN, 101|Raja|CTS|CHN, 102|Kumar|TCS|BNG) 

scala> val listRDD = sc.parallelize(myList) 
listRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:23 

scala> val mapRDD = listRDD.map(elem => elem.split("\\|")) 
mapRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[19] at map at <console>:25 

scala> val keyValueRDD = mapRDD.map(elem => (elem(2),elem(0).toInt)) 
keyValueRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[21] at map at <console>:27 

scala> val resultRDD = keyValueRDD.reduceByKey((x,y) => { var incr = 0 ; incr+1 }) 
resultRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:29 

scala> resultRDD.collect 
res36: Array[(String, Int)] = Array((TCS,102), (CTS,1) 

預期輸出:

Array((TCS,1), (CTS,2) 

回答

0

即使問題明確指出它不希望映射超過1硬編碼值,它絕對是下面所顯示的,以正確的方式去。

scala> keyValueRDD.map({case (x,y) => x -> 1 }).reduceByKey(_ + _).collect() 
res46: Array[(String, Int)] = Array((TCS,1), (CTS,2)) 

如果你理解的火花是如何運作的,你永遠不會應該寫代碼必須像這樣{ var incr = 0 ; incr+1 }其中一個功能拉姆達的預期。

reduceByKey應該採用累加器的兩個參數,並且當前值被減少,並且它必須返回累加器的新值。在你的代碼中,你總是返回1,因爲對於每個減少的值,incr變量被實例化爲0。因此累加器值始終保持爲1.這就解釋了爲什麼CTS在有缺陷的結果中值爲1。

對於TCS來說,由於spark看到關鍵的TCS只有一條記錄,所以它不需要進一步減少它並因此返回它的原始值。