2016-11-28 170 views
3

我正在訓練一個Word2Vec模型,它具有相當重要的200個單詞(〜100k)。Spark Word2VecModel超過了最大RPC大小以節省空間

Spark的典型W2V建模目前加起來主要由每個字的向量構成的內存使用情況,即:numberOfDimensions*sizeof(float)*numberOfWords。做數學,以上的數量級爲100MB,給予或考慮。
考慮到我仍然在研究我的標記器,並且仍然對最佳向量大小做出了貢獻,實際上我正在對75k-150k字和100至300維字典進行計算,所以我們只需說模型可以達到〜500MB 。

現在一切都很好,直到節省這個模型。這是目前在火花這種方式實現:

override protected def saveImpl(path: String): Unit = { 
    DefaultParamsWriter.saveMetadata(instance, path, sc) 
    val data = Data(instance.wordVectors.wordIndex, instance.wordVectors.wordVectors.toSeq) 
    val dataPath = new Path(path, "data").toString 
    sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) 
} 

即:創建1行的數據幀,含有所有矢量的陣列的大的F(L)的行。數據幀保存爲實木複合地板。沒關係......除非......你必須將它運送給執行者。你在集羣模式下執行的操作。

這結束了吹起來的工作,與像這樣一個堆棧跟蹤:

16/11/28 11:29:00 INFO scheduler.DAGScheduler: Job 3 failed: parquet at Word2Vec.scala:311, took 5,208453 s 
16/11/28 11:29:00 ERROR datasources.InsertIntoHadoopFsRelationCommand: Aborting job. 
org.apache.spark.SparkException: Job aborted due to stage failure: 
    Serialized task 32:5 was 204136673 bytes, 
    which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). 
    Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values. 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) 

簡單的代碼來重現(你不能火花外殼局部吧,不過,你需要把它運到集羣) :

object TestW2V { 

def main(args: Array[String]): Unit = { 
    val spark = SparkSession.builder().appName("TestW2V").getOrCreate() 
    import spark.implicits._ 

    // Alphabet 
    val randomChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTYVWXTZ".toCharArray 
    val random = new java.util.Random() 

    // Dictionnary 
    def makeWord(wordLength: Int): String = new String((0 until wordLength).map(_ => randomChars(random.nextInt(randomChars.length))).toArray) 
    val randomWords = for (wordIndex <- 0 to 100000) // Make approx 100 thousand distinct words 
        yield makeWord(random.nextInt(10)+5) 

    // Corpus (make it fairly non trivial) 
    def makeSentence(numberOfWords: Int): Seq[String] = (0 until numberOfWords).map(_ => randomWords(random.nextInt(randomWords.length))) 
    val allWordsDummySentence = randomWords // all words at least once 
    val randomSentences = for (sentenceIndex <- 0 to 100000) 
         yield makeSentence(random.nextInt(10) +5) 
    val corpus: Seq[Seq[String]] = allWordsDummySentence +: randomSentences 

    // Train a W2V model on the corpus 
    val df = spark.createDataFrame(corpus.map(Tuple1.apply)) 
    import org.apache.spark.ml.feature.Word2Vec 
    val w2v = new Word2Vec().setVectorSize(250).setMinCount(1).setInputCol("_1").setNumPartitions(4) 
    val w2vModel = w2v.fit(df) 
    w2vModel.save("/home/Documents/w2v") 

    spark.stop 
} 
} 

現在......我明白了內部不夠好,我想,瞭解爲什麼發生這種情況。這些問題是:

  • 我這樣做的權利(我的API使用正確的嗎?)
  • 我怎麼能解決呢? spark.mllib.feature.Word2VecModel(「基於RDD的1.x版本」)有一個公共構造函數,我可以通過滾動我自己的,適當分區的保存/加載實現來手動處理。但新的spark.ml.feature.Word2VecModel不提供我可以看到的公共構造函數。
  • 如果有火花的貢獻者出現這種情況:這會被認爲是一個錯誤/可能的改進?

考慮火花隊解決了這個JIRA:https://issues.apache.org/jira/browse/SPARK-11994,(這是1.x的API),我想他們做了仔細檢查,對2.0 API,我做錯了:-)。

知道我想我可以在本地模式下運行它,並避免最終的任務序列化,但這是一個臨時解決方案,在生產級別(數據可訪問性和所有...)是不可能的。或者將RPC大小破解爲512MB,當然...

PS:上述情況發生在Spark 2.0.1和Spark獨立羣集(不能在本地模式下重現)。
我通常會發布這種消息到用戶郵件列表,但看到Spark encourages the use of SO,這裏去...

回答

0

我和你有完全一樣的體驗。它可以在本地正常工作,但是在集羣模式下,它會死掉,而不會按照您的建議將RPC大小改爲512mb。

即通過spark.rpc.message.maxSize=512讓我。

而且我也同意保存實現看起來很可疑,特別是在repartition(1)位。

相關問題