2015-02-06 106 views
2

我想研究從搜索引擎查詢日誌中提取的會話中的用戶操作。我定義了前兩種動作:查詢和Clics。查詢使用Apache Spark查詢事件的Bigram頻率

sealed trait Action{} 
case class Query(val input:String) extends Action 
case class Click(val link:String) extends Action 

假設在查詢日誌第一個動作是由以下時間戳以毫秒爲單位:

val t0 = 1417444964686L // 2014-12-01 15:42:44 

讓我們定義一個語料庫的時間有序關聯到會話IDS行動。

val query_log:Array[(String, (Action, Long))] = Array (
("session1",(Query("query1"),t0)), 
("session1",(Click("link1") ,t0+1000)), 
("session1",(Click("link2") ,t0+2000)), 
("session1",(Query("query2"),t0+3000)), 
("session1",(Click("link3") ,t0+4000)), 
("session2",(Query("query3"),t0+5000)), 
("session2",(Click("link4") ,t0+6000)), 
("session2",(Query("query4"),t0+7000)), 
("session2",(Query("query5"),t0+8000)), 
("session2",(Click("link5") ,t0+9000)), 
("session2",(Click("link6") ,t0+10000)), 
("session3",(Query("query6"),t0+11000)) 
) 

與我們共創RDD這個quey_log:

import org.apache.spark.rdd.RDD 
var logs:RDD[(String, (Action, Long))] = sc.makeRDD(query_log) 

日誌,然後通過會話ID

val sessions_groups:RDD[(String, Iterable[(Action, Long)])] = logs.groupByKey().cache() 

現在,我們要研究一個會話中的行動共生組合,例如,會話中的重寫次數。然後我們定義將從會話操作初始化的類共現。

case class Cooccurrences(
    var numQueriesWithClicks:Int = 0, 
    var numQueries:Int = 0, 
    var numRewritings:Int = 0, 
    var numQueriesBeforeClicks:Int = 0 
) { 
// The cooccurrence object is initialized from a list of timestamped action in order to catch a session group 
    def initFromActions(actions:Iterable[(Action, Long)]) = { 
    // 30 seconds is the maximal time (in milliseconds) between two queries (q1, q2) to consider q2 is a rewririting of q1 
    var thirtySeconds = 30000 
    var hasClicked = false 
    var hasRewritten = false 
    // int the observed action sequence, we extract consecutives (sliding(2)) actions sorted by timestamps 
    // for each bigram in the sequence we want to count and modify the cooccurrence object 
    actions.toSeq.sortBy(_._2).sliding(2).foreach{ 
     // case Seq(l0) => // session with only one Action 
     case Seq((e1:Click, t0)) => { // click without any query 
     numQueries = 0   
     } 
     case Seq((e1:Query, t0)) => { // query without any click 
     numQueries = 1   
     numQueriesBeforeClicks = 1 
     } 
     // case Seq(l0, l1) => // session with at least two Actions 
     case Seq((e1:Click, t0), (e2:Query, t1)) => { // a click followed by a query 
     if(! hasClicked) 
      numQueriesBeforeClicks = numQueries 
     hasClicked = true 
     } 
     case Seq((e1:Click, t0), (e2:Click, t1)) => { //two consecutives clics 
     if(! hasClicked) 
      numQueriesBeforeClicks = numQueries 
     hasClicked = true 
     } 
     case Seq((e1:Query, t0), (e2:Click, t1)) => { // a query followed by a click 
     numQueries += 1 
     if(! hasClicked) 
      numQueriesBeforeClicks = numQueries 
     hasClicked = true 
     numQueriesWithClicks +=1 
     } 
     case Seq((e1:Query, t0), (e2:Query, t1)) => { // two consecutives queries 
     val dt = t1 - t0 
     numQueries += 1 
     if(dt < thirtySeconds && e1.input != e2.input){ 
      hasRewritten = true 
      numRewritings += 1 
     } 
     } 
    } 
    } 

}

現在,讓我們試着計算共生的RDD爲每個會話:

val session_cooc_stats:RDD[Cooccurrences] = sessions_groups.map{ 
    case (sessionId, actions) => { 
    var coocs = Cooccurrences() 
    coocs.initFromActions(actions) 
    coocs 
    } 
} 

不幸的是,它提出了以下MatchError

scala> session_cooc_stats.take(2) 

15/02/06 22:50:08 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 4) scala.MatchError: List((Query(query3),1417444969686), (Click(link4),1417444970686)) (of class scala.collection.immutable.$colon$colon) at $line25.$read$$iwC$$iwC$Cooccurrences$$anonfun$initFromActions$2.apply(<console>:29) 
    at $line25.$read$$iwC$$iwC$Cooccurrences$$anonfun$initFromActions$2.apply(<console>:29) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at $line25.$read$$iwC$$iwC$Cooccurrences.initFromActions(<console>:29) 
    at $line28.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:31) 
    at $line28.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:28) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:1081) 
    at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:1081) 
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) 
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
    at org.apache.spark.scheduler.Task.run(Task.scala:56) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
15/02/06 22:50:08 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 4, localhost): scala.MatchError: List((Query(query3),1417444969686), (Click(link4),1417444970686)) (of class scala.collection.immutable.$colon$colon) 
    at $line25.$read$$iwC$$iwC$Cooccurrences$$anonfun$initFromActions$2.apply(<console>:29) 
    at $line25.$read$$iwC$$iwC$Cooccurrences$$anonfun$initFromActions$2.apply(<console>:29) 
... 

如果我建我自己的行動清單等同於session_cooc_stats中的第一組RDD

val actions:Iterable[(Action, Long)] = Array(
(Query("query1"),t0), 
(Click("link1") ,t0+1000), 
(Click("link2") ,t0+2000), 
(Query("query2"),t0+3000), 
(Click("link3") ,t0+4000) 
) 

我得到預期的結果

var c = Cooccurrences() 
c.initFromActions(actions) 
// c == Cooccurrences(2,2,0,1) 

有什麼不對勁,當我建立從RDD一個共生的對象。 它似乎鏈接到用groupByKey()構建的CompactBuffer。
缺什麼?

我是Spark和Scala的新手。 預先感謝您的幫助。

Thomas

+0

我只是插上它,它適用於我... – 2015-02-07 01:37:01

+0

你真的可以在沒有任何改變的情況下執行Spark指令'session_cooc_stats.take(2)'嗎? 我仍然得到一個scala.MatchError列表((Query(query3),1417444969686),(Click(link4),1417444970686)) – 2015-02-07 08:03:44

回答

-2

我在IntelliJ上設置了你的代碼。

爲Action,Query,Click和Coocurence創建一個類。

而你的代碼放在主體上。

val sessions_groups:RDD[(String, Iterable[(Action, Long)])] = logs.groupByKey().cache() 

    val session_cooc_stats:RDD[Cooccurrences] = sessions_groups.map{ 
    case (sessionId, actions) => { 
     val coocs = Cooccurrences() 
     coocs.initFromActions(actions) 
     coocs 
    } 
    } 
    session_cooc_stats.take(2).foreach(println(_)) 

剛剛修改VAR coocs> VAL coocs

我想這點。

共生(0,1,0,1)

共生(2,3,1,1)

0

正如你建議,我改寫的IntelliJ用的代碼並創建了一個同伴對象主要功能。令人驚訝的是,代碼編譯(使用sbt)並且運行完美無缺。

但是,我不明白爲什麼編譯代碼運行,而它不適用於spark-shell。

謝謝你的回答!