0

隨着org.apache.spark.mllib學習算法,我們用來設置管道沒有訓練算法sc.parallelize不是在ML管道與訓練算法工作

var stages: Array[org.apache.spark.ml.PipelineStage] = index_transformers :+ assembler 
val pipeline = new Pipeline().setStages(stages) 

然後經過我們二手LabeledPoint獲取數據準備好訓練算法,最後我們用來訓練的東西模型像

val model = GradientBoostedTrees.train(sc.parallelize(trainingData.collect()), boostingStrategy) 

我們必須注意到,如果我們逼債使用「sc.parallelize」噸他的訓練似乎永遠不會結束。

現在用org.apache.spark.ml學習算法(由於setLabelCol & setFeaturesCol),我們可以包括訓練算法也醞釀

val model = new GBTRegressor().setLabelCol(target_col_name).setFeaturesCol("features").setMaxIter(10) 

var stages: Array[org.apache.spark.ml.PipelineStage] = index_transformers :+ assembler :+ model 
val pipeline = new Pipeline().setStages(stages) 

但現在當我們通過數據,它節選的數據幀,而不是數據行通過sc.parallelize 所以下面的代碼

val model = pipeline.fit(sc.parallelize(df_train)) 

引發以下錯誤完成:

<console>:57: error: type mismatch; 
found : org.apache.spark.sql.DataFrame 
required: Seq[?] 

而這

val model = pipeline.fit(df_train) 

永遠不會結束。

這個問題的解決方案是什麼?

+0

使用數據框'toDF'將RDD轉換爲Dataframe並返回 - 您只需要添加一個模式(如標題)即可。 – GameOfThrows

+0

@GameOfThrows:對不起,我沒有明白。 df_train已經是一個DataFrame。 – Abhishek

回答

0

您的代碼的主要問題是您正在使用驅動程序作爲數據的橋樑。即您正在將所有分發的數據收集到您的驅動程序並將其傳回給您的所有節點。另一個問題是您實際上使用ML功能,這意味着您的使用DataFrame s而不是RDD s。因此,您需要做的是將您的RDD轉換爲DataFrame。請注意,有很多方法可以實現這一點,您可以查看How to convert RDD Object to DataFrame in Spark,另一種方法是使用toDF方法。

+0

「:type df_train」給了我「org.apache.spark.sql.DataFrame」。所以它上面的錯誤說「df_train」是一個DataFrame而不是一個RDD。 對不起,如果我在這裏失去了一些東西,你能幫助我多一點嗎? – Abhishek

+0

@Abhishek是的,但是當你「收集」它後,你將它轉換爲「Seq」,然後再轉換爲「RDD」,並行化之後。 –

+0

嘿,我明白了,但我仍然不確定如何解決這個問題,你能幫我解決代碼建議嗎?對不起,我是Scala和ML算法的新手。 – Abhishek