我有一個廣播變量,我反序列化獲得的RDD其相關性集合沿如下:星火 - 包裹任意RDD與使用ShuffledRDD
val taskBinary: Broadcast[Array[Byte]]
var (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
不過,我想換這個RDD由ShuffledRDD,因爲我需要一個定製的分區適用於它,我做這個:
var wrappedRDD = new ShuffledRDD[_ ,_, _](rdd[_ <: Product2[Any, Any]], context.getCustomPartitioner())
,但它導致一個錯誤:
Error:unbound wildcard type rdd = new ShuffledRDD[_ ,_, _ ](rdd[_ <: Product2[Any, Any]], context.getCustomPartitioner()) ..................................^
問題是我不知道如何用任何推斷類型替換這些通配符,因爲我認爲它是動態的,我不知道原始rdd的推斷類型是什麼。任何想法我怎麼能解決這個問題?
第一個問題 - 你爲什麼在廣播RDD? –
@GlennieHellesSindholt我正在修改Spark核心,如果您查看源代碼,您會發現DAGscheduler實際上將RDD作爲廣播變量廣播給所有執行者。如上所示,在執行器端啓動任務時,它將被反序列化回RDD。所以現在我已經解釋了這個目的,你有辦法解決這個問題嗎? –
好的 - 我認爲這是應該加入問題的相關信息。通常,當人們嘗試廣播RDD時,他們在錯誤的軌道上;-)也就是說,我已將我的解決方案發布在答案中。 –