1

我試圖用Spark MLlib使用tweets進行情感分析。在預處理數據並將其轉換爲適當的格式之後,我調用NaiveBayes的訓練方法來獲取模型,但它失敗並出現異常。這裏是堆棧跟蹤:Spark + Scala:NaiveBayes.train - 異常是java.util.NoSuchElementException:空迭代器上的下一個

java.util.NoSuchElementException: next on empty iterator 
    at scala.collection.Iterator$$anon$2.next(Iterator.scala:39) 
    at scala.collection.Iterator$$anon$2.next(Iterator.scala:37) 
    at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:64) 
    at scala.collection.IterableLike$class.head(IterableLike.scala:91) 
    at scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$head(ArrayOps.scala:108) 
    at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:120) 
    at scala.collection.mutable.ArrayOps$ofRef.head(ArrayOps.scala:108) 
    at org.apache.spark.mllib.classification.NaiveBayes.run(NaiveBayes.scala:408) 
    at org.apache.spark.mllib.classification.NaiveBayes$.train(NaiveBayes.scala:467) 
    at org.jc.sparknaivebayes.main.NaiveBayesTrain$delayedInit$body.apply(NaiveBayesTrain.scala:53) 
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40) 
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) 
    at scala.App$$anonfun$main$1.apply(App.scala:71) 
    at scala.App$$anonfun$main$1.apply(App.scala:71) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) 
    at scala.App$class.main(App.scala:71) 
    at org.jc.sparknaivebayes.main.NaiveBayesTrain$.main(NaiveBayesTrain.scala:12) 
    at org.jc.sparknaivebayes.main.NaiveBayesTrain.main(NaiveBayesTrain.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) 

這是我的主要方法:

val csvFiles = args(0).split(",") 
    val modelStore = args(1) 
    val docs = TweetParser.parseAll(csvFiles, sc) 
    val termDocs = Tokenizer.tokenizeAll(docs) 

    val termDocsRdd = sc.parallelize[TermDoc](termDocs.toSeq) 

    val numDocs = termDocsRdd.count() 

    //val terms = termDocsRdd.flatMap(_.terms).distinct().collect().sortBy(identity) 
    val terms = termDocsRdd.flatMap(_.terms).distinct().sortBy(identity) 
    val termDict = new Dictionary(terms) 

    //val labels = termDocsRdd.flatMap(_.labels).distinct().collect() 
    val labels = termDocsRdd.flatMap(_.labels).distinct() 
    val labelDict = new Dictionary(labels) 

    val idfs = (termDocsRdd.flatMap(termDoc => termDoc.terms.map((termDoc.doc, _))).distinct().groupBy(_._2) collect { 
     case (term, docs) if docs.size > 3 => 
     term -> (numDocs.toDouble/docs.size.toDouble) 
    }).collect.toMap 

    val tfidfs = termDocsRdd flatMap { 
     termDoc => 
     val termPairs: Seq[(Int, Double)] = termDict.tfIdfs(termDoc.terms, idfs) 
     termDoc.labels.headOption.map { 
      label => 
      val labelId = labelDict.indexOf(label).toDouble 
      val vector = Vectors.sparse(termDict.count.toInt, termPairs) 
      LabeledPoint(labelId, vector) 
     } 
    } 

    val model = NaiveBayes.train(tfidfs) 

Dictionary類是在這裏:

class Dictionary(dict: RDD[String]) extends Serializable { 

    //val builder = ImmutableBiMap.builder[String, Long]() 
    //dict.zipWithIndex.foreach(e => builder.put(e._1, e._2)) 

    //val termToIndex = builder.build() 
    val termToIndex = dict.zipWithIndex() 

    //@transient 
    //lazy val indexToTerm = termToIndex.inverse() 
    lazy val indexToTerm = dict.zipWithIndex().map{ 
    case (k, v) => (v, k) 
    } //converts from (a, 0),(b, 1),(c, 2) to (0, a),(1, b),(2, c) 

    val count = termToIndex.count().toInt 

    def indexOf(term: String): Int = termToIndex.lookup(term).headOption.getOrElse[Long](-1).toInt 

    def valueOf(index: Int): String = indexToTerm.lookup(index).headOption.getOrElse("") 

    def tfIdfs (terms: Seq[String], idfs: Map[String, Double]): Seq[(Int, Double)] = { 
    val filteredTerms = terms.filter(idfs contains) 
    (filteredTerms.groupBy(identity).map { 
     case (term, instances) => { 
     val indexOfTerm: Int = indexOf(term) 
     if (indexOfTerm < 0) (-1, 0.0) else (indexOf(term), (instances.size.toDouble/filteredTerms.size.toDouble) * idfs(term)) 
     } 
    }).filter(p => p._1.toInt >= 0).toSeq.sortBy(_._1) 
    } 

    def vectorize(tfIdfs: Iterable[(Int, Double)]) = { 
    Vectors.sparse(dict.count().toInt, tfIdfs.toSeq) 
    } 
} 

文檔類看起來是這樣的:

case class Document(docId: String, body: String = "", labels: Set[String] = Set.empty) 

TermDoc類:

case class TermDoc(doc: String, labels: Set[String], terms: Seq[String]) 

我被困在這一步,我真的需要完成這項工作,但我在尋找有用的信息方面遇到了很多麻煩。提前致謝。

P.S:這是一個基於chimpler的博客:https://github.com/chimpler/blog-spark-naive-bayes-reuters/blob/master/src/main/scala/com/chimpler/sparknaivebayesreuters/NaiveBayes.scala

UPDATE:爲CSV分析器和文檔構建新的代碼。

import org.apache.spark.SparkContext 

import scala.io.Source 

/** 
    * Created by cespedjo on 14/02/2017. 
    */ 
object TweetParser extends Serializable{ 

    val headerPart = "polarity" 

    val mentionRegex = """@(.)+?\s""".r 

    val fullRegex = """(\d+),(.+?),(N|P|NEU|NONE)(,\w+|;\w+)*""".r 

    def parseAll(csvFiles: Iterable[String], sc: SparkContext) = csvFiles flatMap(csv => parse(csv, sc)) 

    def parse(csvFile: String, sc: SparkContext) = { 
    val csv = sc.textFile(csvFile) 
    val docs = scala.collection.mutable.ArrayBuffer.empty[Document] 

    csv.foreach(
     line => if (!line.contains(headerPart)) docs += buildDocument(line) 
    ) 
    docs 
    //docs.filter(!_.docId.equals("INVALID")) 
    } 

    def buildDocument(line: String): Document = { 

    val fullRegex(id, txt, snt, opt) = line 
    if (id != null && txt != null && snt != null) 
     new Document(id, mentionRegex.replaceAllIn(txt, ""), Set(snt)) 
    else 
     new Document("INVALID") 
    } 
} 

case class Document(docId: String, body: String = "", labels: Set[String] = Set.empty) 
+0

我認爲你的錯誤來自於'VAL矢量空載體= Vectors.sparse',你需要找到/後指向在您的應用程序的代碼,打破這樣所有的錯誤信息你可以肯定,我有類似的問題,並通過推動更多的數據向量解決,順便說一句可能會查找'稀疏矢量類,並對其應用的操作了解更多詳情 –

+0

感謝您的意見Karol,我是新的火花和斯卡拉,請你詳細說說你的建議?我無法理解「將更多數據推送到矢量」部分,因爲我相信它已經包含在RDD中的數據中,所以缺少多少數據? –

+0

順便說一句,我去了Vector的文檔,它說本地向量......這是否意味着它不能用於分佈式模式?在分佈式模式下運行時,需要使用什麼來監督學習? –

回答

1

我認爲問題在於某些文檔不包含任何termpair。你不能訓練空的數據點。試着改變你的代碼:

val tfidfs = termDocsRdd flatMap { 
    termDoc => 
    val termPairs: Seq[(Int, Double)] = termDict.tfIdfs(termDoc.terms, idfs) 
    if (termPairs.nonEmpty) { 
     termDoc.labels.headOption.map { 
     label => 
      val labelId = labelDict.indexOf(label).toDouble 
      val vector = Vectors.sparse(termDict.count.toInt, termPairs) 
      LabeledPoint(labelId, vector) 
    } else { 
     None 
    } 
} 
+0

感謝您的回答Pascal,爲什麼問題中的代碼更新可能會產生空的RDD?我已經從CSV文件的幾行測試了代碼,它識別指定的模式,但是沒有文件被添加到可變數組中。 –

+0

您似乎在某些地方更改了代碼(所評論的內容)。您正在對字典進行查找,該字典是未收集的RDD,這是錯誤的,因爲在此代碼中,您需要擁有「大圖片」 (即:你不想在部分工作人員字典中進行查找,而是在全局字典中進行查找。因此,前面帶有.collect()的代碼似乎適合我,但我不明白新代碼 –

+0

...但我想找到一個正確的方法來實現這一目的而不使用collect,因爲我在某處讀取這不是一個好的選擇,因爲它會強制數據在驅動程序中收集,並且在處理大量數據時可能會導致錯誤...有什麼建議嗎?順便說一句,我提到的行爲發生在這一行代碼的執行:val docs = TweetParser.parseAll(csvFiles,sc)。我測試過一個文件,docs.size爲0.我不知道爲什麼會發生這種情況,即使模式在單行測試時仍然有效。 –