2017-07-20 137 views
-1

我讀一個CSV作爲一個數據幀由如下:如何將一組RelationalGroupedDataset傳遞給一個函數?

val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("D:/ModelData.csv") 

然後我組由三列如下返回一個RelationalGroupedDataset

df.groupBy("col1", "col2","col3") 

而且我希望每個分組的數據幀進行發送通過以下功能

def ModelFunction(daf: DataFrame) = { 

    //do some calculation 

      } 

例如,如果我有col1有2個唯一值(0,1)值和col2有2個唯一值(1,2)和co l3有3個唯一值(1,2,3)然後我想通過每個組合分組到模型函數像col1 = 0,col2 = 1,col3 = 1我將有一個數據幀,我想將它傳遞給ModelFunction等三列的每個組合。

我試圖

df.groupBy("col1", "col2","col3").ModelFunction(); 

但它拋出一個錯誤。

任何幫助表示讚賞。

回答

1

簡短的回答是,你不能這樣做。你只能做RelationalGroupedDataset聚合函數(你寫的UDAF或建在那些在org.apache.spark.sql.functions任的)

我看到它,你有幾種選擇方式:

選項1 :與其他組合相比,每個獨特組合的數據量足夠小並且不會過多偏斜。

在這種情況下,你可以這樣做:

val grouped = df.groupBy("col1", "col2","col3").agg(collect_list(struct(all other columns))) 
grouped.as[some case class to represent the data including the combination].map[your own logistic regression function). 

選項2:如果組合的總數量是足夠小,你可以這樣做:

val values: df.select("col1", "col2", "col3").distinct().collect() 

,然後依次通過他們創建一個新的數據幀從每個組合通過做一個過濾器。

選項3:寫你自己的UDAF

的數據是在一個流,而不做迭代的能力,但是,如果你有迴歸的實行相匹配,你可以在此可能會不夠好嘗試寫一個UDAF來做到這一點。參見例如:How to define and use a User-Defined Aggregate Function in Spark SQL?

相關問題