2016-04-03 52 views
1

你好異常線程 「main」 org.apache.spark.SparkException:任務不可序列

我使用斯卡拉2.11.8和火花1.6.1。每當我打電話裏面地圖功能,它拋出以下異常:

"Exception in thread "main" org.apache.spark.SparkException: Task not serializable" 

您可以在下面找到我的完整的代碼

package moviestream.recommender 
import java.io 
import java.io.Serializable 

import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.mllib.recommendation.ALS 
import org.apache.spark.mllib.recommendation.Rating 
import org.jblas.DoubleMatrix 


class FeatureExtraction{ 

    val conf = new SparkConf().setMaster("local[2]").setAppName("Recommendation") 
    val sc = new SparkContext(conf) 
    val rawData = sc.textFile("data/u.data") 
    val rawRatings = rawData.map(_.split("\t").take(3)) 

    //create rating object from rawratings 
    val ratings = rawRatings.map{case Array(user,movie,rating) => Rating(user.toInt,movie.toInt,rating.toDouble)} 
    //user Spark ALS library to train our model 
    // Build the recommendation model using ALS 

    val model = ALS.train(ratings,50,10,0.01) 
    //val model = ALS.trainImplicit(ratings,50,10,0.01,0.1) //last parameter is alpha 
    val predictedRating = model.predict(789,123) 
    //top ten recommended movies for user id 789, where k= number of recommended(10) 789=userid 
    val topKRecs = model.recommendProducts(789,10) 
    val movies = sc.textFile("data/u.item") 
    val titles = movies.map(line=>line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collectAsMap() 
    //how many movies this user has rated 
    val moviesForUser = ratings.keyBy(_.user).lookup(789) 
    //we will take the 10 movies with the highest ratings ction using the  field of the  object. 
    //moviesForUser.sortBy(-_.rating).take(10).map(rating=>(titles(rating.product),rating.rating)).foreach(println) 
    //let’s take a look at the top 10 recommendations for this user and see what the titles 
    //topKRecs.map(rating=>(titles(rating.product),rating.rating)).foreach(println) 
    // we will then need to create a DoubleMatrix object 

    val itemId = 567 
    val itemFactor = model.productFeatures.lookup(itemId).head 
    val itemVector = new DoubleMatrix(itemFactor) 


    //we are ready to apply our similarity metric to each item 
    /*val sims = model.productFeatures.map{ case (id, factor) => 
    val factorVector = new DoubleMatrix(factor) 
    val sim = cosineSimilarity(factorVector, itemVector) 
    (id, sim) 
    }*/ 

    //we can compute the top 10 most similar items by sorting out the similarity score for each item 
    //val sortedSims = sims.top(10)(Ordering.by[(Int,Double),Double]{case(id,similarity)=>similarity}) 

    //we can sense check our item-to-item similarity 
    //val sortedSims2 = sims.top(11)(Ordering.by[(Int,Double),Double]{case(id,similarity)=>simintellij idea debugilarity}) 
    //sortedSims2.slice(1,11).map{case (id,sim)=>(titles(id),sim)}.foreach(println) 
    //Finally,we can print the 10 items with the highest computed similarity metric to our given item: 
    //println("Result = "+titles(123)) 

    def cosineSimilarity(vect1:DoubleMatrix,vect2:DoubleMatrix): Double = { 
    vect1.dot(vect2)/(vect1.norm1()*vect2.norm2()) 
    } 

    val actualRating = moviesForUser.take(1)(0) 
    val predictedRatings = model.predict(789,actualRating.product) 
    //println(predictedRatings) 
    val squaredError = math.pow(predictedRatings - actualRating.rating,2.0) 

    val usersProducts = ratings.map{case Rating(user,product,rating) => (user,product)} 
    val predictions = model.predict(usersProducts).map{case Rating(user,product,rating) 
          =>((user,product),rating)} 
    val ratingsAndPredictions = ratings.map{case Rating(user,product,rating)=>((user,product),rating)} 
            .join(predictions) 
    val MSE = ratingsAndPredictions.map{case ((user,product),(actual,predicted)) 
      => math.pow((actual-predicted),2)}.reduce(_ + _)/ratingsAndPredictions.count() 
    //println("Mean Squared Error = " + MSE) 
    val RMSE = math.sqrt(MSE) 
    println("Root Mean Squared Error = "+ RMSE) 
    def avgPrecisionK(actual:Seq[Int],predicted:Seq[Int],k:Int):Double = { 
    val predk = predicted.take(k) 
    var score = 0.0 
    var numHits = 0.0 
    for((p,i)<- predk.zipWithIndex){ 
     if(actual.contains(p)){ 
     numHits += 1.0 
     score += numHits/(i.toDouble+1.0) 
     } 
    } 
    if(actual.isEmpty) { 
     1.0 
    } 
    else{ 
     score/scala.math.min(actual.size,k).toDouble 
    } 


    } 

    val actualMovies = moviesForUser.map(_.product) 
    val predictedMovies = topKRecs.map(_.product) 
//predictedMovies.foreach(println) 
    val apk10 = avgPrecisionK(actualMovies,predictedMovies,10) 
    //println(apk10) 
    //Locality Sensitive Hashing 
    val itemFactors = model.productFeatures.map{case (id,factor)=>factor}.collect() 
    val itemMatrix = new DoubleMatrix(itemFactors) 
    //println(itemMatrix.rows,itemMatrix.columns) 
    val imBroadcast = sc.broadcast(itemMatrix) 
    //println(imBroadcast) 
    val allRecs = model.userFeatures.map{case (userId,array)=> 
       val userVector = new DoubleMatrix(array) 
       val scores = imBroadcast.value.mmul(userVector) 
       val sortedWithId = scores.data.zipWithIndex.sortBy(- _._1) 
       val recommendedIds = sortedWithId.map(_._2 +1).toSeq 
       (userId,recommendedIds) 
    } 
    println(allRecs) 


} 
+1

放置代碼片段很酷,但缺少模型創建等。考慮到上下文,錯誤消息是通用的。 allRecs上沒有觸發任何操作。 Spark很懶。 *結論:這個問題由於許多原因而成爲題外話題*。 – eliasah

+1

^同意@eliasah。還要注意Spark 1.6需要爲scala 2.11手動編譯。你將能夠單元測試這個好,但是你將不能在羣集上部署,除非你有一個手動編譯的火花版本。 – marios

+0

@Humayoo你能給出更多關於RDD所包含的類的詳細信息嗎?model.userFeatures? – mauriciojost

回答

1

正如評論上面提到的,問題是過於寬泛。但有一個猜測可以幫助。你在map裏面使用廣播值imBroadcast。我想它包括在與sparkContext相同範圍內聲明的函數,對吧?然後將它們移動到單獨的對象。

相關問題