2017-02-22 50 views
1

我設法讓我的決策樹分類器適用於基於RDD的API,但現在我正試圖切換到Spark中基於Dataframes的API。在帶有字符串字段的spark中使用決策樹分類器的數據框

我有一個這樣的數據集(但有更多的字段):

國家,目的地,時間,標籤

Belgium, France, 10, 0 
Bosnia, USA, 120, 1 
Germany, Spain, 30, 0 

首先,我加載一個數據幀我的csv文件:

val data = session.read 
    .format("org.apache.spark.csv") 
    .option("header", "true") 
    .csv("/home/Datasets/data/dataset.csv") 

然後,我將字符串列轉換爲數字列

val stringColumns = Array("country", "destination") 

val index_transformers = stringColumns.map(
    cname => new StringIndexer() 
    .setInputCol(cname) 
    .setOutputCol(s"${cname}_index") 
) 

然後我組裝我的所有功能集成到一個單一的載體,使用VectorAssembler,像這樣:

val assembler = new VectorAssembler() 
    .setInputCols(Array("country_index", "destination_index", "duration_index")) 
    .setOutputCol("features") 

我我的數據分割爲訓練和測試:

val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3)) 

然後創建我DecisionTree分類

val dt = new DecisionTreeClassifier() 
    .setLabelCol("label") 
    .setFeaturesCol("features") 

然後我使用一個管道進行所有轉換

val pipeline = new Pipeline() 
    .setStages(Array(index_transformers, assembler, dt)) 

我訓練我的模型,並用它來預測:

val model = pipeline.fit(trainingData) 

val predictions = model.transform(testData) 

,但我得到了一些錯誤,我不明白:

當我運行我的代碼這樣的,我有這樣的錯誤:

[error] found : Array[org.apache.spark.ml.feature.StringIndexer] 
[error] required: org.apache.spark.ml.PipelineStage 
[error]   .setStages(Array(index_transformers, assembler,dt)) 

因此,我所做的是,我添加了一個管道index_transformers VAL和Val彙編權前右後:

val index_pipeline = new Pipeline().setStages(index_transformers) 
val index_model = index_pipeline.fit(data) 
val df_indexed = index_model.transform(data) 

和我的訓練集和測試集,我的新df_indexed數據框中使用,我從我的管道用匯編和DT

val Array(trainingData, testData) = df_indexed.randomSplit(Array(0.7, 0.3)) 

val pipeline = new Pipeline() 
    .setStages(Array(assembler,dt)) 

去除index_transformers我得到這個錯誤:

Exception in thread "main" java.lang.IllegalArgumentException: Data type StringType is not supported. 

它基本上說我在字符串上使用VectorAssembler,而我告訴它在df_indexed上使用它,它現在有一個數字column_index,但它似乎並沒有在vectorAssembler中使用它,我只是不清楚和..

謝謝

編輯

現在我幾乎設法得到它的工作:

val data = session.read 
    .format("org.apache.spark.csv") 
    .option("header", "true") 
    .csv("/home/hvfd8529/Datasets/dataOINIS/dataset.csv") 

val stringColumns = Array("country_index", "destination_index", "duration_index") 

val stringColumns_index = stringColumns.map(c => s"${c}_index") 

val index_transformers = stringColumns.map(
    cname => new StringIndexer() 
    .setInputCol(cname) 
    .setOutputCol(s"${cname}_index") 
) 

val assembler = new VectorAssembler() 
    .setInputCols(stringColumns_index) 
    .setOutputCol("features") 

val labelIndexer = new StringIndexer() 
    .setInputCol("label") 
    .setOutputCol("indexedLabel") 

val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3)) 

// Train a DecisionTree model. 
val dt = new DecisionTreeClassifier() 
    .setLabelCol("indexedLabel") 
    .setFeaturesCol("features") 
    .setImpurity("entropy") 
    .setMaxBins(1000) 
    .setMaxDepth(15) 

// Convert indexed labels back to original labels. 
val labelConverter = new IndexToString() 
    .setInputCol("prediction") 
    .setOutputCol("predictedLabel") 
    .setLabels(labelIndexer.labels()) 

val stages = index_transformers :+ assembler :+ labelIndexer :+ dt :+ labelConverter 

val pipeline = new Pipeline() 
    .setStages(stages) 


// Train model. This also runs the indexers. 
val model = pipeline.fit(trainingData) 

// Make predictions. 
val predictions = model.transform(testData) 

// Select example rows to display. 
predictions.select("predictedLabel", "label", "indexedFeatures").show(5) 

// Select (prediction, true label) and compute test error. 
val evaluator = new MulticlassClassificationEvaluator() 
    .setLabelCol("indexedLabel") 
    .setPredictionCol("prediction") 
    .setMetricName("accuracy") 
val accuracy = evaluator.evaluate(predictions) 
println("accuracy = " + accuracy) 

val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel] 
println("Learned classification tree model:\n" + treeModel.toDebugString) 

只是現在我有一個錯誤,說這樣的:

value labels is not a member of org.apache.spark.ml.feature.StringIndexer 

,我不明白,因爲我在跟隨在火花DOC例子:/

回答

0

我做什麼我的第一個問題:

val stages = index_transformers :+ assembler :+ labelIndexer :+ rf :+ labelConverter 

val pipeline = new Pipeline() 
    .setStages(stages) 

對於我的標籤第二個問題,我需要使用.fit(數據)這樣

val labelIndexer = new StringIndexer() 
    .setInputCol("label_fraude") 
    .setOutputCol("indexedLabel") 
    .fit(data) 
0

應該是:

val pipeline = new Pipeline() 
    .setStages(index_transformers ++ Array(assembler, dt): Array[PipelineStage]) 
+0

我仍然有同樣的錯誤:( 我也曾嘗試\t \t VAL階段= index_transformers:+彙編:+ dt的 VAL管道=新管道() \t \t .setStages(級) 但不工作:不支持數據類型StringType:java.lang.IllegalArgumentException異常 –