2017-08-10 95 views
0

成員這是非常sad.My火花版本是2.1.1,斯卡拉版本是2.11值reduceByKey不是org.apache.spark.rdd.RDD

import org.apache.spark.SparkContext._ 
import com.mufu.wcsa.component.dimension.{DimensionKey, KeyTrait} 
import com.mufu.wcsa.log.LogRecord 
import org.apache.spark.rdd.RDD 

object PV { 

// 
    def stat[C <: LogRecord,K <:DimensionKey](statTrait: KeyTrait[C ,K],logRecords: RDD[C]): RDD[(K,Int)] = { 
    val t = logRecords.map(record =>(statTrait.getKey(record),1)).reduceByKey((x,y) => x + y) 

我得到這個錯誤

at 1502387780429 
[ERROR] /Users/lemanli/work/project/newcma/wcsa/wcsa_my/wcsavistor/src/main/scala/com/mufu/wcsa/component/stat/PV.scala:25: error: value reduceByKey is not a member of org.apache.spark.rdd.RDD[(K, Int)] 
[ERROR]  val t = logRecords.map(record =>(statTrait.getKey(record),1)).reduceByKey((x,y) => x + y) 

限定有特點

trait KeyTrait[C <: LogRecord,K <: DimensionKey]{ 
    def getKey(c:C):K 
} 

它被編譯,謝謝。

def stat[C <: LogRecord,K <:DimensionKey : ClassTag : Ordering](statTrait: KeyTrait[C ,K],logRecords: RDD[C]): RDD[(K,Int)] = { 
    val t = logRecords.map(record =>(statTrait.getKey(record),1)).reduceByKey((x,y) => x + y) 

重點需要重寫訂購[T]。

object ClientStat extends KeyTrait[DetailLogRecord, ClientStat] { 
     implicit val c 

lientStatSorting = new Ordering[ClientStat] { 
    override def compare(x: ClientStat, y: ClientStat): Int = x.key.compare(y.key) 
    } 

     def getKey(detailLogRecord: DetailLogRecord): ClientStat = new ClientStat(detailLogRecord) 
    } 

回答

3

這來自一般使用一對rdd函數。該reduceByKey方法實際上是PairRDDFunctions類,它具有從RDD的隱式轉換的方法:

implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) 
    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] 

所以它需要一些隱含的類型類。通常在使用簡單的混凝土類型時,這些已經在範圍之內。但是,你應該能夠修改你的方法也需要這些相同的implicits:

def stat[C <: LogRecord,K <:DimensionKey](statTrait: KeyTrait[C ,K],logRecords: RDD[C])(implicit kt: ClassTag[K], ord: Ordering[K]) 

或者用較新的語法:

def stat[C <: LogRecord,K <:DimensionKey : ClassTag : Ordering](statTrait: KeyTrait[C ,K],logRecords: RDD[C]) 
1

reduceByKey是僅在元組的RDDS中定義的方法,即RDD[(K, V)](K,V是隻是一個約定的說,第一密鑰是被第二值)。

約你想達到什麼樣的例子不知道,但是可以肯定的,你需要的RDD裏面的值轉換爲兩個值的元組。

相關問題