2015-11-08 70 views
0

我有一個廣播變量,我反序列化獲得的RDD其相關性集合沿如下:星火 - 包裹任意RDD與使用ShuffledRDD

val taskBinary: Broadcast[Array[Byte]] 
var (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
     ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) 

不過,我想換這個RDD由ShuffledRDD,因爲我需要一個定製的分區適用於它,我做這個:

var wrappedRDD = new ShuffledRDD[_ ,_, _](rdd[_ <: Product2[Any, Any]], context.getCustomPartitioner()) 

,但它導致一個錯誤:

Error:unbound wildcard type rdd = new ShuffledRDD[_ ,_, _ ](rdd[_ <: Product2[Any, Any]], context.getCustomPartitioner()) ..................................^

問題是我不知道如何用任何推斷類型替換這些通配符,因爲我認爲它是動態的,我不知道原始rdd的推斷類型是什麼。任何想法我怎麼能解決這個問題?

+1

第一個問題 - 你爲什麼在廣播RDD? –

+0

@GlennieHellesSindholt我正在修改Spark核心,如果您查看源代碼,您會發現DAGscheduler實際上將RDD作爲廣播變量廣播給所有執行者。如上所示,在執行器端啓動任務時,它將被反序列化回RDD。所以現在我已經解釋了這個目的,你有辦法解決這個問題嗎? –

+0

好的 - 我認爲這是應該加入問題的相關信息。通常,當人們嘗試廣播RDD時,他們在錯誤的軌道上;-)也就是說,我已將我的解決方案發布在答案中。 –

回答

1

所以,我相信你的wrappedRDD有幾個問題。報告的錯誤「未綁定通配符類型...」與您在構造函數調用中向rdd變量添加類型定義有關。

(rdd[_ <: Product2[Any, Any]], context.getCustomPartitioner()) 

應改爲

(rdd, context.getCustomPartitioner()) 

此外,您需要提供的ShuffledRDD一個類型。你可以使用Any

var wrappedRDD = new ShuffledRDD[Any,Any,Any](rdd, context.getCustomPartitioner()) 

但我懷疑你真正想做的事,是定義一個函數,它的類型,並返回一個類型的特定ShuffledRDD這樣的:

def wrapRDD[K:ClassTag, V:ClassTag, C: ClassTag](rdd: RDD[(K, V)]) = { 
    new ShuffledRDD[K, V, C](rdd, context.getCustomPartitioner()) 
} 

val wrappedRDD = wrapRDD[String, String, Combiner](rdd, context.getCustomPartitioner()) 
+0

我試圖初始化RDY,如下所示: * * * var wrappedRDD = new ShuffledRDD [Any,Any, * Error:type mismatch; ** 'found:org.apache.spark.rdd.RDD [_0]其中,類型_0 必需:org.apache.spark.rdd.RDD [_ <:Product2 [Any,Any]] var wrappedRDD = new ShuffledRDD [Any,Any,Any](rdd,context.getCustomPartitioner())' ^ –

+0

至於你提出的第二種解決方案,我真的不會在運行時知道什麼特定類型的** ShuffledRDD **需要被創建,並且因此考慮使用** Any **類型來初始化其關鍵字,值和組合器。但如前所述,這不會編譯。 –

+0

'found:org.apache.spark.rdd.RDD [_0]其中類型_0是必需的:org.apache.spark.rdd.RDD [_ <:Product2 [Any,Any]]'告訴你rdd是傳入沒有正確的類型。也許你需要首先將你的rdd映射到元組? –