2015-11-05 53 views
2

我正在實施Spark LDA模型(通過Scala API),並且在爲我的數據進行必要的格式化步驟時遇到問題。我的原始數據(存儲在文本文件中)採用以下格式,基本上是令牌列表以及它們對應的文檔。一個簡化的例子:準備火花中的LDA數據

doc XXXXX term XXXXX 
1 x  'a'  x 
1 x  'a'  x 
1 x  'b'  x 
2 x  'b'  x 
2 x  'd'  x 
... 

其中XXXXX列是垃圾數據我不在乎。我意識到這是存儲語料庫數據的非典型方式,但這正是我所擁有的。正如我希望從示例中清楚的那樣,在原始數據中每個令牌有一行(因此如果給定術語在文檔中出現5次,對應於5行文本)。

在任何情況下,我需要將這些數據格式化爲運行Spark LDA模型的稀疏項 - 頻率向量,但我不熟悉Scala,因此遇到了一些麻煩。

我開始:

import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel} 
import org.apache.spark.mllib.linalg.{Vector, Vectors} 
import org.apache.spark.rdd.RDD 

val corpus:RDD[Array[String]] = sc.textFile("path/to/data") 
    .map(_.split('\t')).map(x => Array(x(0),x(2))) 

然後我得到的詞彙數據,我需要生成稀疏矢量:

val vocab: RDD[String] = corpus.map(_(1)).distinct() 
val vocabMap: Map[String, Int] = vocab.collect().zipWithIndex.toMap 

我不知道什麼是正確的映射功能在這裏使用,這樣我可以爲每個文檔結束一個稀疏項的頻率向量,然後將其輸入到LDA模型中。我想我需要沿着這些線路的東西...

val documents: RDD[(Long, Vector)] = corpus.groupBy(_(0)).zipWithIndex 
    .map(x =>(x._2,Vectors.sparse(vocabMap.size, ???))) 

在這一點,我可以運行的實際LDA:

val lda = new LDA().setK(n_topics) 
val ldaModel = lda.run(documents) 

基本上,我沒有什麼函數適用於每個組,以便我可以將詞頻數據(可能是map?)輸入稀疏向量。換句話說,如何在上面的代碼片段中填寫???以實現所需的效果?處理這種

回答

3

方式一:

  • 確保spark-csv包可
  • 將數據加載到數據幀和利益

    val df = sqlContext.read 
        .format("com.databricks.spark.csv") 
        .option("header", "true") 
        .option("inferSchema", "true") // Optional, providing schema is prefered 
        .option("delimiter", "\t") 
        .load("foo.csv") 
        .select($"doc".cast("long").alias("doc"), $"term") 
    
  • 指數term列選擇列:

    import org.apache.spark.ml.feature.StringIndexer 
    
    val indexer = new StringIndexer() 
        .setInputCol("term") 
        .setOutputCol("termIndexed") 
    
    val indexed = indexer.fit(df) 
        .transform(df) 
        .drop("term") 
        .withColumn("termIndexed", $"termIndexed".cast("integer")) 
        .groupBy($"doc", $"termIndexed") 
        .agg(count(lit(1)).alias("cnt").cast("double")) 
    
  • 轉換爲PairwiseRDD

    import org.apache.spark.sql.Row 
    
    val pairs = indexed.map{case Row(doc: Long, term: Int, cnt: Double) => 
        (doc, (term, cnt))} 
    
  • 組由DOC:

    val docs = pairs.groupByKey 
    
  • 創建特徵向量

    import org.apache.spark.mllib.linalg.Vectors 
    import org.apache.spark.sql.functions.max 
    
    val n = indexed.select(max($"termIndexed")).first.getInt(0) + 1 
    
    val docsWithFeatures = docs.mapValues(vs => Vectors.sparse(n, vs.toSeq)) 
    
  • 現在你有所有你需要創建LabeledPoints或應用額外的處理

+1

這很好用!但是,經過一些初步實驗後,我想嘗試將TF-IDF用作預處理步驟。問題是Spark的TF-IDF的[documentation](https://spark.apache.org/docs/latest/mllib-feature-extraction.html#tf-idf)表明,建議沒有一種簡單的方法可以做到這一點給出這個稀疏矢量格式的數據。有什麼建議麼? – moustachio

+0

應該工作得很好。 – zero323