2015-06-02 73 views
0

我將我的所有代碼到Scala和我在pySpark一個功能,我對如何翻譯過來階些許端倪。任何人都可以提供幫助並提供解釋嗎? 的PySpark看起來是這樣的:星火AggregateByKey從pySpark到斯卡拉

.aggregateByKey((0.0, 0.0, 0.0), 
         lambda (sum, sum2, count), value: (sum + value, sum2 + value**2, count+1.0), 
         lambda (suma, sum2a, counta), (sumb, sum2b, countb): (suma + sumb, sum2a + sum2b, counta + countb)) 

編輯: 我至今是:

val dataSusRDD = numFilterRDD.aggregateByKey((0,0,0), (sum, sum2, count) => 

但是我無法理解的是你如何寫這篇文章的Scala,因爲該組的函數然後將該值指定爲一組動作(總和+值等)。進入第二個聚合函數,所有的語法都是正確的。在這種情況下很難一致地說明我的麻煩。它更多,所以我不理解Scala和何時使用括號,括號VS中,VS,逗號

+0

SO不是代碼翻譯服務。你卡在哪裏?你不懂什麼? –

+0

增加,說明我的想法編輯, – theMadKing

+1

有aggregateByKey的答案我寫到這裏Scala的例子:http://stackoverflow.com/a/29953122/21755。通過編寫非匿名函數開始可能會使學習曲線稍微陡峭 –

回答

3

正如@保羅建議使用命名函數可能會使理解什麼在簡單一點去。

val initialValue = (0.0,0.0,0.0) 
def seqOp(u: (Double, Double, Double), v: Double) = (u._1 + v, u._2 + v*v, u._3 + 1) 
def combOp(u1: (Double, Double, Double), u2: (Double, Double, Double)) = (u1._1 + u2._1, u1._2 + u2._2, u1._3 + u2._3) 
rdd.aggregateByKey(initialValue)(seqOp, combOp) 
+0

您好,我收到以下問題:scala> val dataStatsRDD = numFilterRDD.aggregateByKey(initialValue,seqOp,combOp) :38:error:too許多參數爲方法aggregateByKey:(零值:U)(SEQOP:(U,雙)=> U,combOp:(U,U)=> U)(隱式證據$ 3:scala.reflect.ClassTag [U])的有機apache.spark.rdd.RDD [(中間體,U)] VAL dataStatsRDD = numFilterRDD.aggregateByKey(初值,SEQOP,combOp) – theMadKing

+0

我的壞,我忘記了它與Scala中局部應用程序傳遞。我已經更新了答案。 'rdd.aggregateByKey(initialValue)(seqOp,combOp) ' – Holden