2016-01-13 49 views
0

我需要從服務器端日誌中計算應用程序的「新用戶,活動用戶」。 我已經實現了scala和spark的日常計算算法。這項工作每天提交一次,並獲得當天的所有結果。它運作良好。
這裏是我的舊日常算法實現的一些僞代碼。此代碼運行,每天一次,並得到了一組每日結果:如何使用spark-streaming實現日常計算並獲得實時結果

// Get today log from hbase or somewhere else 
val log = getRddFromHbase(todayDate) 
// Compute active user 
val activeUser = log.map(line => ((line.uid, line.appId), line).reduceByKey(distinctStrategyMethod) 
// Get history user from hdfs 
val historyUser = loadFromHdfs(path + yesterdayDate) 
// Compute new user from active user and historyUser 
val newUser = activeUser.subtractByKey(historyUser) 
// Get new history user 
val newHistoryUser = historyUser.union(newUser) 
// Save today history user 
saveToHdfs(path + todayDate) 

現在我想獲得「實時」的結果:
1的結果應重新計算,並改爲每5分鐘或減。
2.結果應該在一天的開始時爲0,並且與一天結束時的舊算法相同。

我認爲如果我使用一個恆定的時間窗口(1天,每5分鐘滑動一次,我認爲)來實現算法是不對的。
如果有人能爲我提供一些想法或示例,我將不勝感激。謝謝你的時間。

回答

0

而是從HBase的得到它,你可以利用以下策略: -

  1. Flume-NG - 安裝水槽代理,要麼「尾巴」的日誌文件或代碼自己Source捕捉實時生成的事件-time.List item
  2. Flume-Spark Listener - 按照指定的說明進行操作,並在Near Real-time(非實時)中接收事件。
  3. 下一頁在星火流的代碼,我們可以用60秒或者更少這樣創建的StreamingContext: -

    val streamCtx = new StreamingContext(conf, Seconds(60)) 
    val lines = streamCtx.socketTextStream("HOST_NAME", PORT, MEMORY_AND_DISK_SER_2) 
    //Next - Depending upon the format do the further computations 
    //and either print the results or Store in RDBMS/ NOSQL - HBASE/Cassandra 
    
+0

其實我可以正確讀取從卡夫卡流和過程DSTREAM。我需要一些關於流式算法,spark-streaming的api和解決這類問題的思考。也許你可以在這個問題上幫助我。這是更具體的。 [如何使用spark-streaming從流中計算新元素](http://stackoverflow.com/questions/34786117/how-to-count-new-element-from-stream-by-using-spark-streaming) – yyforever1988