2016-11-13 58 views
0

我有一個彈性索引,其中包含用戶狀態歷史記錄的文檔。數據看起來像這樣;只彙總最新文檔

{ 
    "session_id": "yunus", 
    "state_name": "start", 
    "entry_time": "2016-11-09 15:27:03" 
    }, 
    { 
    "session_id": "yunus", 
    "state_name": "end", 
    "entry_time": "2016-11-09 16:30:00" 
    }, 
    { 
    "session_id": "can", 
    "state_name": "start", 
    "entry_time": "2016-11-09 12:01:00" 
    }, 
    { 
    "session_id": "rick", 
    "state_name": "start", 
    "entry_time": "2016-11-09 09:00:00" 
    }, 
    { 
    "session_id": "rick", 
    "state_name": "end", 
    "entry_time": "2016-11-10 10:00:00" 
    } 

我想通過州名與日期直方圖進行彙總,但僅當時的相關最後狀態。所以結果可以;

2016-11-08 
start = 0 
end = 0 

2016-11-09 
start = 2 
end = 1 

2016-11-10 
start = 1 
end = 2 

實際上計劃是生成帶時間軸的分組條形圖以顯示隨時間變化的狀態。

我嘗試了一些東西,如聚合管道,頂級點擊,但無法取得任何進展。

任何幫助表示讚賞。

回答

0

對於任何有興趣的人,我用火花解決了它。我用elastic-spark從elasticsearch中讀取數據,然後回寫到elasticsearch。

這是從es讀取爲Rdd;

val allData = sc.esRDD(s"states_${id}/log", query) 

然後我先按會話id分組,按日期排序找到最新的會話狀態;

val latestStates = allData.groupBy(k => k._2.get("session_id").get).map(k => (k._2).reduceLeft((d1, d2) => { 
    d1._2.get("timestamp").get.asInstanceOf[Long] > d2._2.get("timestamp").get.asInstanceOf[Long] match { 
    case true => d1 
    case false => d2 
    } 
})).map(_._2) 

一旦我有最新的會話狀態,我篩選退出狀態,然後按值計數;

val stateSummary = latestStates 
    .filter(s => s.isDefinedAt("state_id") && s("state_id").asInstanceOf[Long] != -1) 
    .map(s => (s("state_id"), s("state_name"))) 
    .countByValue() 
    .map(d => Map("state_id" -> d._1._1.asInstanceOf[Long], "state_name" -> d._1._2.asInstanceOf[String], "count" -> d._2)).toList 

現在我們有當前的狀態數。 (當前是可配置的,所以我們可以將其設置爲特定時間),只剩下一件事,回寫到elasticsearch;

sc.makeRDD(Seq(finalElasticDoc)).saveToEs(s"states_${id}/analytic_daily")