2017-06-22 49 views
1

我們對具有13種不同ETL操作的系統使用Scala的Spark 2.x。其中7個相對簡單,每個都由一個域類驅動,主要區別在於這個類和處理負載的一些細微差別。Spark/Scala,數據集和案例類的多態性

負載類的簡化版本被如下,對於本例的目的說,有被加載7點比薩餅的澆頭,這裏的辣:

object LoadPepperoni { 
    def apply(inputFile: Dataset[Row], 
      historicalData: Dataset[Pepperoni], 
      mergeFun: (Pepperoni, PepperoniRaw) => Pepperoni): Dataset[Pepperoni] = { 
    val sparkSession = SparkSession.builder().getOrCreate() 
    import sparkSession.implicits._ 

    val rawData: Dataset[PepperoniRaw] = inputFile.rdd.map{ case row : Row => 
     PepperoniRaw(
      weight = row.getAs[String]("weight"), 
      cost = row.getAs[String]("cost") 
     ) 
    }.toDS() 

    val validatedData: Dataset[PepperoniRaw] = ??? // validate the data 

    val dedupedRawData: Dataset[PepperoniRaw] = ??? // deduplicate the data 

    val dedupedData: Dataset[Pepperoni] = dedupedRawData.rdd.map{ case datum : PepperoniRaw => 
     Pepperoni(value = ???, key1 = ???, key2 = ???) 
    }.toDS() 

    val joinedData = dedupedData.joinWith(historicalData, 
     historicalData.col("key1") === dedupedData.col("key1") && 
     historicalData.col("key2") === dedupedData.col("key2"), 
     "right_outer" 
    ) 

    joinedData.map { case (hist, delta) => 
     if(/* some condition */) { 
     hist.copy(value = /* some transformation */) 
     } 
    }.flatMap(list => list).toDS() 
    } 
} 

換句話說類執行一系列對數據進行操作時,操作大部分是相同的,並且總是按照相同的順序進行,但每個頂點可能略有不同,從「原始」到「域」和合並函數的映射也會如此。

要做到這一點爲7澆頭(即蘑菇,奶酪等),我寧願不簡單地複製/粘貼類和更改所有的名稱,因爲結構和邏輯是所有負載共同。相反,我寧願定義一個通用的「裝載」類的泛型類型,像這樣:

object Load { 
    def apply[R,D](inputFile: Dataset[Row], 
      historicalData: Dataset[D], 
      mergeFun: (D, R) => D): Dataset[D] = { 
    val sparkSession = SparkSession.builder().getOrCreate() 
    import sparkSession.implicits._ 

    val rawData: Dataset[R] = inputFile.rdd.map{ case row : Row => 
... 

對於每一個特定類的操作,例如:從「生」到「域」映射,或合併,有特徵或抽象類來實現細節。這將是一個典型的依賴注入/多態模式。

但我遇到了一些問題。從Spark 2.x開始,編碼器僅提供給本地類型和案例類別,並且沒有辦法一般地將類別標識爲案例類別。因此,當使用泛型類型時,推斷的toDS()和其他隱式功能不可用。

同樣如this related question of mine中所述,使用泛型時,case類copy方法也不可用。我已經研究過Scala和Haskell常見的其他設計模式,例如類型類或ad-hoc多態,但障礙是Spark數據集基本上只處理不能抽象定義的案例類。

看來這將是Spark系統中的一個常見問題,但我無法找到解決方案。任何幫助讚賞。

回答

2

,使.toDS的隱式轉換:

implicit def rddToDatasetHolder[T](rdd: RDD[T])(implicit arg0: Encoder[T]): DatasetHolder[T] 

(從https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SQLImplicits

你是完全正確的,有計劃的涵蓋範圍爲Encoder[T]沒有隱含的價值現在你已經取得了你的申請方法通用的,所以這種轉換不會發生。但是你可以簡單地接受一個作爲隱式參數!

object Load { 
    def apply[R,D](inputFile: Dataset[Row], 
      historicalData: Dataset[D], 
      mergeFun: (D, R) => D)(implicit enc: Encoder[D]): Dataset[D] = { 
... 

然後當時你呼叫負載,具有特定的類型,它應該是能夠找到這種類型的編碼器。請注意,您在調用上下文中也必須使用import sparkSession.implicits._

編輯:類似的方法是通過限制類型(apply[R, D <: Product])並接受隱含的JavaUniverse.TypeTag[D]作爲參數來啓用隱式的newProductEncoder[T <: Product](implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[T]): Encoder[T]

+0

謝謝你,是一種心靈鼓風機讓我試試看。我遇到了與aggregateByKey()和copy()類似的錯誤(請參閱我的文章中的相關問題鏈接),是否有任何類似的魔法可以將所需的實現納入範圍? –

+0

更一般地說,這種方法是追蹤缺少的特定隱含,並將其作爲完全指定類型的調用站點的一個參數。在aggregateByKey的情況下,它看起來像你需要'ClassTag [D]'(https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions )。如果我有任何好的想法,我會分開看看你的關於「copy」的問題,並在那裏回覆 –