我在C
一系列元組的,這是一個日誌的活動由用戶1更新變量取決於具體條件Scala中
scala> C.collect.foreach(println)
((1,A,1),1)
((1,B,2),1)
((1,C,4),2)
((1,D,7),3)
((1,E,15),8)
((1,F,16),1)
在第一元組(1,2中的第三條目,4 ,7,15,16)是時間戳,並且第二條目(1,1,2,3,8,1)是連續時間戳之間的差異。
我試圖創建一個會話,無論這個用戶第一次啓動一個動作,還是在一定的時間後,TIMEOUT
。
我的計劃是首先將ID
s分配給每個元組,然後將它們映射爲成對。 ID
s將是它所屬會話中的第一個時間戳。
例如,如果TIMEOUT = 2
,示例將被映射到
(1, (1,"A",1))
(1, (1,"B",2))
(4, (1,"C",4)) //creation of a new session with ID 4
(7, (1,"D",7)) //creation of a new session with ID 7
(15, (1,"E",15)) //creation of a new session with ID 15
(15, (1,"F",16))
然後,我將通過會話處理數據會話。
但是,我在這個映射中遇到困難。
我需要保留某種全局變量來跟蹤TIMEOUT
中的最後一個時間戳記,並在創建新會話時更新此變量,並將其作爲後續條目的ID
。
因爲這是Spark
,我正在使用Accumulator
accum
像一個全局變量。
如果時間戳差異> = 2,我不知道如何設置accum
的值,然後使用新值作爲新會話的ID
。如果時間戳差異< 2,則會話的ID
保持不變。
我嘗試到目前爲止是
val accum = sc.accumulator(0, "My Accumulator")
C.map(x => (x._2 match {
case _ if (x._2 > -2) => accum.setValue(x._1._3); accum.value
case _ => accum.value
}, x._1)).collect
和失敗。
我想這是因爲accum.setValue()
是一個副作用,而不是一個值,並且這是不允許在scala
。另外,在scala
中,對象的變異也是皺起了眉頭。我也知道語法是錯誤的。但是,我想不出任何其他方式來做到這一點。
我該如何實現這種映射?謝謝。
pleeeeeeese不要使用元組來分組數據。用例類。更容易閱讀,更安全,更易於使用。 –
你有多少條記錄?有多少用戶?平均會話長度是多少?你預計每個會話有多少條目?你需要確切的結果還是可以接受一些誤報?總時間線是多少(min-timestamp - max-timestamp)? – zero323
嗨,我會發現更多的細節後我找出來。謝謝。 – user2418202