2016-05-31 52 views
3

考慮這裏給出的代碼之前並行化序列,我們應該並行化數據幀像我們培訓

https://spark.apache.org/docs/1.2.0/ml-guide.html

import org.apache.spark.ml.classification.LogisticRegression 
val training = sparkContext.parallelize(Seq(
    LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)), 
    LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)), 
    LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)), 
    LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)))) 

val lr = new LogisticRegression() 
lr.setMaxIter(10).setRegParam(0.01) 

val model1 = lr.fit(training) 

假設我們讀到「培訓」爲使用sqlContext.read(一個數據幀),應 我們仍然這樣做

val model1 = lr.fit(sparkContext.parallelize(training)) // or some variation of this 

或擬合函數將自動執行並行計算/數據時的護理通過一個數據幀

問候,

回答

3

DataFrame是分佈式的數據結構。它既不要求也不可能parallelize它。 SparkConext.parallelize方法僅用於駐留在驅動程序內存中的分佈式本地數據結構。你不應該被用來分發大型數據集更不用說重新分配RDDs或更高級別的數據結構(就像你在你的前面的問題做)

sc.parallelize(trainingData.collect()) 

如果你想RDD/DataframeDataset)之間進行轉換使用方法其目的是爲了做到這一點:

  1. DataFrameRDD

    import org.apache.spark.sql.DataFrame 
    import org.apache.spark.sql.Row 
    import org.apache.spark.rdd.RDD 
    
    val df: DataFrame = Seq(("foo", 1), ("bar", 2)).toDF("k", "v") 
    val rdd: RDD[Row] = df.rdd 
    
  2. 形式RDDDataFrame

    val rdd: RDD[(String, Int)] = sc.parallelize(Seq(("foo", 1), ("bar", 2))) 
    val df1: DataFrame = rdd.toDF 
    // or 
    val df2: DataFrame = spark.createDataFrame(rdd) // From 1.x use sqlContext 
    
1

你也許應該看看RDD和數據幀,以及如何在兩者之間轉換的區別:Difference between DataFrame and RDD in Spark

直接回答你的問題:一個數據幀已經針對並行執行進行了優化。你不需要做任何事情,你可以直接將它傳遞給任何火花估計器的fit()方法。並行執行在後臺處理。

相關問題