2016-11-08 95 views
1

我正在嘗試使用zeppelin繪製實時圖形。我對每分鐘的推文進行情緒分析。我可以靜態查詢並繪製圖表。但我希望這是動態完成的。我是zeppelin的新手,對angularJS知之甚少。什麼應該是解決這個問題的正確方法?如何在齊柏林飛艇上繪製實時圖形?

val final_score=uni_join.map{case((year,month,day,hour,minutes),(tweet_count,sentiment))=>(year, month, day, hour, minutes(sentiment/tweet_count).ceil)} 


final_score.saveToCassandra("twitter", "score",writeConf = WriteConf(ttl = TTLOption.constant(1000))) 

final_score.foreachRDD(score => { 
val rowRDD =score.map{case(year,month,day,hour,minutes,sentiment) =>(year,month,day,hour,minutes,sentiment) } 
    val tempDF = sqlContext.createDataFrame(rowRDD) 

    z.angularBindGlobal("stream", parsed) //to bind parsed to stream. 
    tempDF.registerTempTable("realTimeTable") 
}) 

對上表做一個查詢,我能夠得到圖。但我想每分鐘動態更新一次圖表,以便與情緒分數保持同步。 感謝之前。 [更新]爲飛艇筆記本角部分如下:

%angular 
<div id="graph" style="height: 100%; width: 100%"> 
<canvas id="myChart" width="400" height="400"></canvas> 
<div id="legendDiv"></div> 
</div> 
<script> 
function initMap() { 

var colorList = ["#fde577", "#ff6c40", "#c72a40", "#520833", "#a88399"] 


var el = angular.element($('#stream')); 

console.log("El is "+el) //returns el as object 

angular.element(el).ready(function() { 
    console.log('Hello') 
    window.locationWatcher =el.$scope.$watch('stream', function(new, old){ 
console.log('changed');}, true)}) 
</script> 

但運行此代碼保持返回下面的錯誤。

vendor.js:29 jQuery.Deferred exception: Cannot read property '$watch' of undefined TypeError: Cannot read property '$watch' of undefined 

,我使用的火花版本是1.6 而飛艇是0.6

回答

5

spark-highcharts自版本0.6.3支持Spark Structured Streaming

對於聚合後的structuredDataFrame,在下面的代碼中使用了一個Zeppelin段落。 OutputMode可以是appendcomplete取決於如何聚合structureDataFrame。

import com.knockdata.spark.highcharts._ 
import com.knockdata.spark.highcharts.model._ 

val query = highcharts(
    structuredDataFrame.seriesCol("country") 
    .series("x" -> "year", "y" -> "stockpile") 
    .orderBy(col("year")), z, "append") 

而下一段代碼如下。當有新數據進入structureDataFrame時,本段中的圖表將會更新。

StreamingChart(z) 

運行以下代碼停止更新圖表。

query.stop() 

以下是生成structureDataFrame的示例。

spark.conf.set("spark.sql.streaming.checkpointLocation","/usr/zeppelin/checkpoint") 

case class NuclearStockpile(country: String, stockpile: Int, year: Int) 

val USA = Seq(0, 0, 0, 0, 0, 6, 11, 32, 110, 235, 369, 640, 
    1005, 1436, 2063, 3057, 4618, 6444, 9822, 15468, 20434, 24126, 
    27387, 29459, 31056, 31982, 32040, 31233, 29224, 27342, 26662, 
    26956, 27912, 28999, 28965, 27826, 25579, 25722, 24826, 24605, 
    24304, 23464, 23708, 24099, 24357, 24237, 24401, 24344, 23586, 
    22380, 21004, 17287, 14747, 13076, 12555, 12144, 11009, 10950, 
    10871, 10824, 10577, 10527, 10475, 10421, 10358, 10295, 10104). 
    zip(1940 to 2006).map(p => NuclearStockpile("USA", p._1, p._2)) 

val USSR = Seq(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
    5, 25, 50, 120, 150, 200, 426, 660, 869, 1060, 1605, 2471, 3322, 
    4238, 5221, 6129, 7089, 8339, 9399, 10538, 11643, 13092, 14478, 
    15915, 17385, 19055, 21205, 23044, 25393, 27935, 30062, 32049, 
    33952, 35804, 37431, 39197, 45000, 43000, 41000, 39000, 37000, 
    35000, 33000, 31000, 29000, 27000, 25000, 24000, 23000, 22000, 
    21000, 20000, 19000, 18000, 18000, 17000, 16000). 
    zip(1940 to 2006).map(p => NuclearStockpile("USSR/Russia", p._1, p._2)) 

input.addData(USA.take(30) ++ USSR.take(30)) 
val structureDataFrame = input.toDF 

而下面的代碼可以模擬更新圖表。下面的代碼運行時,圖表將會更新。

input.addData(USA.drop(30) ++ USSR.drop(30)) 

NOTE: The example using Zeppelin 0.6.2 and Spark 2.0

NOTE: Please check the highcharts license for commercial usage

+0

非常感謝您的回答。 但我使用的spark版本是1.6,zeppelin是0.6.0。 你的回答對這些版本是否適用?如果不是,那麼應該採取什麼方法。 – Agnirudra

+0

對於Spark 1.6,它需要使用正常的DataFrame和一些代碼來監視觸發劇情的變化 –

+0

我已經將問題的更新添加到了角色段落。但它一直返回以下錯誤 'vendor.js:29 jQuery.Deferred異常:無法讀取未定義的屬性'$ watch'TypeError:無法讀取未定義的'$ watch'屬性' – Agnirudra