我們對具有13種不同ETL操作的系統使用Scala的Spark 2.x。其中7個相對簡單,每個都由一個域類驅動,主要區別在於這個類和處理負載的一些細微差別。Spark/Scala,數據集和案例類的多態性
負載類的簡化版本被如下,對於本例的目的說,有被加載7點比薩餅的澆頭,這裏的辣:
object LoadPepperoni {
def apply(inputFile: Dataset[Row],
historicalData: Dataset[Pepperoni],
mergeFun: (Pepperoni, PepperoniRaw) => Pepperoni): Dataset[Pepperoni] = {
val sparkSession = SparkSession.builder().getOrCreate()
import sparkSession.implicits._
val rawData: Dataset[PepperoniRaw] = inputFile.rdd.map{ case row : Row =>
PepperoniRaw(
weight = row.getAs[String]("weight"),
cost = row.getAs[String]("cost")
)
}.toDS()
val validatedData: Dataset[PepperoniRaw] = ??? // validate the data
val dedupedRawData: Dataset[PepperoniRaw] = ??? // deduplicate the data
val dedupedData: Dataset[Pepperoni] = dedupedRawData.rdd.map{ case datum : PepperoniRaw =>
Pepperoni(value = ???, key1 = ???, key2 = ???)
}.toDS()
val joinedData = dedupedData.joinWith(historicalData,
historicalData.col("key1") === dedupedData.col("key1") &&
historicalData.col("key2") === dedupedData.col("key2"),
"right_outer"
)
joinedData.map { case (hist, delta) =>
if(/* some condition */) {
hist.copy(value = /* some transformation */)
}
}.flatMap(list => list).toDS()
}
}
換句話說類執行一系列對數據進行操作時,操作大部分是相同的,並且總是按照相同的順序進行,但每個頂點可能略有不同,從「原始」到「域」和合並函數的映射也會如此。
要做到這一點爲7澆頭(即蘑菇,奶酪等),我寧願不簡單地複製/粘貼類和更改所有的名稱,因爲結構和邏輯是所有負載共同。相反,我寧願定義一個通用的「裝載」類的泛型類型,像這樣:
object Load {
def apply[R,D](inputFile: Dataset[Row],
historicalData: Dataset[D],
mergeFun: (D, R) => D): Dataset[D] = {
val sparkSession = SparkSession.builder().getOrCreate()
import sparkSession.implicits._
val rawData: Dataset[R] = inputFile.rdd.map{ case row : Row =>
...
對於每一個特定類的操作,例如:從「生」到「域」映射,或合併,有特徵或抽象類來實現細節。這將是一個典型的依賴注入/多態模式。
但我遇到了一些問題。從Spark 2.x開始,編碼器僅提供給本地類型和案例類別,並且沒有辦法一般地將類別標識爲案例類別。因此,當使用泛型類型時,推斷的toDS()和其他隱式功能不可用。
同樣如this related question of mine中所述,使用泛型時,case類copy
方法也不可用。我已經研究過Scala和Haskell常見的其他設計模式,例如類型類或ad-hoc多態,但障礙是Spark數據集基本上只處理不能抽象定義的案例類。
看來這將是Spark系統中的一個常見問題,但我無法找到解決方案。任何幫助讚賞。
謝謝你,是一種心靈鼓風機讓我試試看。我遇到了與aggregateByKey()和copy()類似的錯誤(請參閱我的文章中的相關問題鏈接),是否有任何類似的魔法可以將所需的實現納入範圍? –
更一般地說,這種方法是追蹤缺少的特定隱含,並將其作爲完全指定類型的調用站點的一個參數。在aggregateByKey的情況下,它看起來像你需要'ClassTag [D]'(https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions )。如果我有任何好的想法,我會分開看看你的關於「copy」的問題,並在那裏回覆 –