我設法讓我的決策樹分類器適用於基於RDD的API,但現在我正試圖切換到Spark中基於Dataframes的API。在帶有字符串字段的spark中使用決策樹分類器的數據框
我有一個這樣的數據集(但有更多的字段):
國家,目的地,時間,標籤
Belgium, France, 10, 0
Bosnia, USA, 120, 1
Germany, Spain, 30, 0
首先,我加載一個數據幀我的csv文件:
val data = session.read
.format("org.apache.spark.csv")
.option("header", "true")
.csv("/home/Datasets/data/dataset.csv")
然後,我將字符串列轉換爲數字列
val stringColumns = Array("country", "destination")
val index_transformers = stringColumns.map(
cname => new StringIndexer()
.setInputCol(cname)
.setOutputCol(s"${cname}_index")
)
然後我組裝我的所有功能集成到一個單一的載體,使用VectorAssembler,像這樣:
val assembler = new VectorAssembler()
.setInputCols(Array("country_index", "destination_index", "duration_index"))
.setOutputCol("features")
我我的數據分割爲訓練和測試:
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
然後創建我DecisionTree分類
val dt = new DecisionTreeClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
然後我使用一個管道進行所有轉換
val pipeline = new Pipeline()
.setStages(Array(index_transformers, assembler, dt))
我訓練我的模型,並用它來預測:
val model = pipeline.fit(trainingData)
val predictions = model.transform(testData)
,但我得到了一些錯誤,我不明白:
當我運行我的代碼這樣的,我有這樣的錯誤:
[error] found : Array[org.apache.spark.ml.feature.StringIndexer]
[error] required: org.apache.spark.ml.PipelineStage
[error] .setStages(Array(index_transformers, assembler,dt))
因此,我所做的是,我添加了一個管道index_transformers VAL和Val彙編權前右後:
val index_pipeline = new Pipeline().setStages(index_transformers)
val index_model = index_pipeline.fit(data)
val df_indexed = index_model.transform(data)
和我的訓練集和測試集,我的新df_indexed數據框中使用,我從我的管道用匯編和DT
val Array(trainingData, testData) = df_indexed.randomSplit(Array(0.7, 0.3))
val pipeline = new Pipeline()
.setStages(Array(assembler,dt))
去除index_transformers我得到這個錯誤:
Exception in thread "main" java.lang.IllegalArgumentException: Data type StringType is not supported.
它基本上說我在字符串上使用VectorAssembler,而我告訴它在df_indexed上使用它,它現在有一個數字column_index,但它似乎並沒有在vectorAssembler中使用它,我只是不清楚和..
謝謝
編輯
現在我幾乎設法得到它的工作:
val data = session.read
.format("org.apache.spark.csv")
.option("header", "true")
.csv("/home/hvfd8529/Datasets/dataOINIS/dataset.csv")
val stringColumns = Array("country_index", "destination_index", "duration_index")
val stringColumns_index = stringColumns.map(c => s"${c}_index")
val index_transformers = stringColumns.map(
cname => new StringIndexer()
.setInputCol(cname)
.setOutputCol(s"${cname}_index")
)
val assembler = new VectorAssembler()
.setInputCols(stringColumns_index)
.setOutputCol("features")
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// Train a DecisionTree model.
val dt = new DecisionTreeClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("features")
.setImpurity("entropy")
.setMaxBins(1000)
.setMaxDepth(15)
// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels())
val stages = index_transformers :+ assembler :+ labelIndexer :+ dt :+ labelConverter
val pipeline = new Pipeline()
.setStages(stages)
// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)
// Make predictions.
val predictions = model.transform(testData)
// Select example rows to display.
predictions.select("predictedLabel", "label", "indexedFeatures").show(5)
// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println("accuracy = " + accuracy)
val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
println("Learned classification tree model:\n" + treeModel.toDebugString)
只是現在我有一個錯誤,說這樣的:
value labels is not a member of org.apache.spark.ml.feature.StringIndexer
,我不明白,因爲我在跟隨在火花DOC例子:/
我仍然有同樣的錯誤:( 我也曾嘗試\t \t VAL階段= index_transformers:+彙編:+ dt的 VAL管道=新管道() \t \t .setStages(級) 但不工作:不支持數據類型StringType:java.lang.IllegalArgumentException異常 –