3

你們知道我在哪裏可以找到Spark中多類分類的例子。我花了很多時間在書本和網絡上搜索,到目前爲止,我只知道根據文檔的最新版本,這是可能的。Spark多類分類示例

回答

21

ML

建議在星火2.0+

我們將使用相同的數據如下MLlib。有兩個基本選項。如果模型只支持二元分類(logistic迴歸),並延伸

val trainRawDf = trainRaw.toDF 

import org.apache.spark.ml.feature.{Tokenizer, CountVectorizer, StringIndexer} 
import org.apache.spark.ml.Pipeline 

import org.apache.spark.ml.classification.RandomForestClassifier 

val transformers = Array(
    new StringIndexer().setInputCol("group").setOutputCol("label"), 
    new Tokenizer().setInputCol("text").setOutputCol("tokens"), 
    new CountVectorizer().setInputCol("tokens").setOutputCol("features") 
) 


val rf = new RandomForestClassifier() 
    .setLabelCol("label") 
    .setFeaturesCol("features") 

val model = new Pipeline().setStages(transformers :+ rf).fit(trainRawDf) 

model.transform(trainRawDf) 

o.a.s.ml.classification.Classifier你可以使用一個-VS:如果Estimator支持multilclass分類外的開箱(例如隨機森林),你可以直接使用它-rest策略:

import org.apache.spark.ml.classification.OneVsRest 
import org.apache.spark.ml.classification.LogisticRegression 

val lr = new LogisticRegression() 
    .setLabelCol("label") 
    .setFeaturesCol("features") 

val ovr = new OneVsRest().setClassifier(lr) 

val ovrModel = new Pipeline().setStages(transformers :+ ovr).fit(trainRawDf) 

MLLib

按照official documentation在這個時刻(MLlib 1.6.0)以下方法suppo RT多分類:

  • 迴歸,
  • 決策樹,
  • 隨機森林,
  • 樸素貝葉斯

至少一些例子使用多分類:

一般框架,忽略方法具體參數,幾乎是相同的MLlib的所有其他方法。你必須預先處理您的輸入來創建與代表labelfeatures列,列數據幀:

root 
|-- label: double (nullable = true) 
|-- features: vector (nullable = true) 

RDD[LabeledPoint]

Spark提供了廣泛的有用的工具,旨在促進此過程,包括Feature ExtractorsFeature Transformerspipelines

你會發現一個相當天真的使用下面的隨機森林的例子。

首先讓進口所需的軟件包,並創建虛擬數據:

import sqlContext.implicits._ 
import org.apache.spark.ml.feature.{HashingTF, Tokenizer} 
import org.apache.spark.mllib.regression.LabeledPoint 
import org.apache.spark.ml.feature.StringIndexer 
import org.apache.spark.mllib.tree.RandomForest 
import org.apache.spark.mllib.tree.model.RandomForestModel 
import org.apache.spark.mllib.linalg.{Vectors, Vector} 
import org.apache.spark.mllib.evaluation.MulticlassMetrics 
import org.apache.spark.sql.Row 
import org.apache.spark.rdd.RDD 

case class LabeledRecord(group: String, text: String) 

val trainRaw = sc.parallelize(
    LabeledRecord("foo", "foo v a y b foo") :: 
    LabeledRecord("bar", "x bar y bar v") :: 
    LabeledRecord("bar", "x a y bar z") :: 
    LabeledRecord("foobar", "foo v b bar z") :: 
    LabeledRecord("foo", "foo x") :: 
    LabeledRecord("foobar", "z y x foo a b bar v") :: 
    Nil 
) 

現在讓我們來定義所需的變壓器和加工列Dataset

// Tokenizer to process text fields 
val tokenizer = new Tokenizer() 
    .setInputCol("text") 
    .setOutputCol("words") 

// HashingTF to convert tokens to the feature vector 
val hashingTF = new HashingTF() 
    .setInputCol("words") 
    .setOutputCol("features") 
    .setNumFeatures(10) 

// Indexer to convert String labels to Double 
val indexer = new StringIndexer() 
    .setInputCol("group") 
    .setOutputCol("label") 
    .fit(trainRaw.toDF) 


def transfom(rdd: RDD[LabeledRecord]) = { 
    val tokenized = tokenizer.transform(rdd.toDF) 
    val hashed = hashingTF.transform(tokenized) 
    val indexed = indexer.transform(hashed) 
    indexed 
     .select($"label", $"features") 
     .map{case Row(label: Double, features: Vector) => 
      LabeledPoint(label, features)} 
} 

val train: RDD[LabeledPoint] = transfom(trainRaw) 

請注意,indexer被「裝」在火車上的數據。它只是意味着將用作標籤的分類值轉換爲doubles。要在新數據上使用分類器,必須先使用此indexer對其進行轉換。

接下來,我們可以訓練RF模型:

val numClasses = 3 
val categoricalFeaturesInfo = Map[Int, Int]() 
val numTrees = 10 
val featureSubsetStrategy = "auto" 
val impurity = "gini" 
val maxDepth = 4 
val maxBins = 16 

val model = RandomForest.trainClassifier(
    train, numClasses, categoricalFeaturesInfo, 
    numTrees, featureSubsetStrategy, impurity, 
    maxDepth, maxBins 
) 

最後測試:

val testRaw = sc.parallelize(
    LabeledRecord("foo", "foo foo z z z") :: 
    LabeledRecord("bar", "z bar y y v") :: 
    LabeledRecord("bar", "a a bar a z") :: 
    LabeledRecord("foobar", "foo v b bar z") :: 
    LabeledRecord("foobar", "a foo a bar") :: 
    Nil 
) 

val test: RDD[LabeledPoint] = transfom(testRaw) 

val predsAndLabs = test.map(lp => (model.predict(lp.features), lp.label)) 
val metrics = new MulticlassMetrics(predsAndLabs) 

metrics.precision 
metrics.recall 
+0

你有一些python的例子嗎?還是它只支持scala? –

-2

您正在使用的Spark 1.6,而不是2.1火花? 我認爲問題在於spark 2.1中的transform方法返回一個數據集,該數據集可以隱式轉換爲類型化的RDD,在此之前,它返回一個數據幀或行。

嘗試作爲RDD [LabeledPoint]指定轉換函數的返回類型的診斷,並查看是否得到相同的錯誤。

+0

這是一個評論,而不是答案是不是? –