2016-02-10 28 views
1

我在C一系列元組的,這是一個日誌的活動由用戶1更新變量取決於具體條件Scala中

scala> C.collect.foreach(println) 
((1,A,1),1) 
((1,B,2),1) 
((1,C,4),2) 
((1,D,7),3) 
((1,E,15),8) 
((1,F,16),1) 

在第一元組(1,2中的第三條目,4 ,7,15,16)是時間戳,並且第二條目(1,1,2,3,8,1)是連續時間戳之間的差異。

我試圖創建一個會話,無論這個用戶第一次啓動一個動作,還是在一定的時間後,TIMEOUT

我的計劃是首先將ID s分配給每個元組,然後將它們映射爲成對。 ID s將是它所屬會話中的第一個時間戳。

例如,如果TIMEOUT = 2,示例將被映射到

(1, (1,"A",1)) 
(1, (1,"B",2)) 
(4, (1,"C",4)) //creation of a new session with ID 4 
(7, (1,"D",7)) //creation of a new session with ID 7 
(15, (1,"E",15)) //creation of a new session with ID 15 
(15, (1,"F",16)) 

然後,我將通過會話處理數據會話。

但是,我在這個映射中遇到困難。

我需要保留某種全局變量來跟蹤TIMEOUT中的最後一個時間戳記,並在創建新會話時更新此變量,並將其作爲後續條目的ID

因爲這是Spark,我正在使用Accumulatoraccum像一個全局變量。

如果時間戳差異> = 2,我不知道如何設置accum的值,然後使用新值作爲新會話的ID。如果時間戳差異< 2,則會話的ID保持不變。

我嘗試到目前爲止是

val accum = sc.accumulator(0, "My Accumulator") 
C.map(x => (x._2 match { 
    case _ if (x._2 > -2) => accum.setValue(x._1._3); accum.value 
    case _ => accum.value 
}, x._1)).collect 

和失敗。

我想這是因爲accum.setValue()是一個副作用,而不是一個值,並且這是不允許在scala。另外,在scala中,對象的變異也是皺起了眉頭。我也知道語法是錯誤的。但是,我想不出任何其他方式來做到這一點。

我該如何實現這種映射?謝謝。

+1

pleeeeeeese不要使用元組來分組數據。用例類。更容易閱讀,更安全,更易於使用。 –

+0

你有多少條記錄?有多少用戶?平均會話長度是多少?你預計每個會話有多少條目?你需要確切的結果還是可以接受一些誤報?總時間線是多少(min-timestamp - max-timestamp)? – zero323

+0

嗨,我會發現更多的細節後我找出來。謝謝。 – user2418202

回答

0

問題不是副作用。在任何你想要的Scala中都允許有副作用。它們只是在功能代碼中不受鼓勵。問題只在於,如果您希望在其中包含多個語句,則需要將函數體放入{}中。也只使用一個匹配來產生一個if是毫無意義的。我還假設你想要條件> = 2不是> -2,至少可以適合你的例子。

所以這應該工作:

val accum = sc.accumulator(0, "My Accumulator") 
C.map(x => 
    (if (x._2 >= 2) { 
    accum.setValue(x._1._3) 
    accum.value 
    } else accum.value, 
    x._1) 
).collect 

唯一的問題是第一個ID,你會在你的ID有0,直到你發現第一個暫停。但是你的例子並沒有真正解釋你想如何處理這個邊緣案例。

但我不會使用副作用來解決這個問題。有對序列的方法scanLeft,可以讓你同時具有進入前值做了改造:

val list = List(
    ((1,"A",1),1), 
    ((1,"B",2),1), 
    ((1,"C",4),2), 
    ((1,"D",7),3), 
    ((1,"E",15),8), 
    ((1,"F",16),1)) 
list.tail.scanLeft((list.head._1._1, list.head._1)){ 
    case ((id, _), ((a, b, id2), delta)) => 
    if(delta < 2) (id, (a,b,id2)) 
    else (id2, (a,b,id2)) 
} 

如明確指定的第一個元素這也與第一ID解決了這個問題。這顯然假定您的序列中至少有一個元素。

+0

這不起作用。它看起來像'C'是'RDD',它不是'Iterable'。此外,您不能在轉換中訪問累加器值。 – zero323

+0

我不知道火花。查看RDD的文檔,第一個示例將在語法上起作用,因爲RDD具有映射方法。但是隨着RDD實施一些分佈式魔法,副作用將不起作用。然而這與Scala本身無關。對於第二個例子:如果您首先收集數據,這將起作用。 RDD沒有掃描。人們可以嘗試將其轉換爲摺疊,但我認爲這種內在的順序操作對於分佈式數據結構RDD沒有多大意義。 – dth

+0

那麼,它實際上可以並行完成,但確切的方法高度依賴於數據分佈。 – zero323