2016-08-02 81 views
0

我們有一個流數據,我在HBase表中有一些主信息。對於每一行我都需要查找HBase主表並獲取一些配置文件信息。我的代碼是這樣的foreach中的Spark Streaming過濾條件--NullPointerException

val con    = new setContext(hadoopHome,sparkMaster) 
val l_sparkcontext = con.getSparkContext 
val l_hivecontext = con.getHiveContext 

val topicname  = "events" 
val ssc    = new StreamingContext(l_sparkcontext, Seconds(30)) 
val eventsStream = KafkaUtils.createStream(ssc,"xxx.xxx.142.xxx:2181","receive_rest_events",Map(topicname.toString -> 10)) 
println("Kafka Stream for receiving Events..") 

val profile_data = l_hivecontext.sql("select gender, income, age, riid from hbase_customer_profile") 
profile_data.foreach(println) 
val tabBC = l_sparkcontext.broadcast(profile_data) 

eventsStream.foreachRDD(rdd => { 
    rdd.foreach(record => { 
    val subs_profile_rows = tabBC.value 
    val Rows = record._2.split(rowDelim) 
    Rows.foreach(row => { 
     val values = row.split(colDelim) 
     val riid = values(1).toInt 
     val cond = "riid = " + riid 
     println("Condition : ", cond) 
     val enriched_events = subs_profile_rows.filter(cond) 
    }) // End of Rows 
    }) // End of RDD 
}) // End of Events Stream 

不幸的是,我總是打在過濾器上的NPE。我在這裏接受了幾個問題和答案,以跨工作節點廣播價值觀,但沒有任何幫助。有人可以幫忙嗎?

問候

巴拉

+0

檢查您是否使用無法序列化的值。 – cchantep

+0

我不知道是否應在foreach內創建profile_data,這是不可序列化的。 –

回答

0

您的上下文用法看起來有點腥......對我來說,它看起來像你創建兩個獨立的環境中(一個火花,一個火花流),然後試圖在這些上下文之間共享一個廣播變量(這不起作用)。

我們有一些我們周圍寫的代碼是相似的。這些視頻展示了我們如何在Splice Machine(開源)中做到這一點,以防您感興趣。我會嘗試查找代碼或讓其他人爲您發佈代碼。

http://community.splicemachine.com/splice-machine-tutorial-video-configuring-kafka-feed-splice-machine-part/

http://community.splicemachine.com/splice-machine-tutorial-video-configuring-kafka-feed-splice-machine-ii/

好運。

+0

謝謝約翰。我會看看視頻。要求是從HBase表中讀取配置文件信息,以獲取DStream的數據。我也獲得了forEachPartition(將發佈更改的代碼作爲下一條評論),但這給了我不同的錯誤。如果你能得到它,我將等待代碼。非常感謝您的幫助 –

+0

由於篇幅限制,我將不得不將我的代碼分成2篇文章。 - 開始class setContext(argHadoopHome:String,argSparkMaster:String){System.setProperty(「hadoop.home.dir」,argHadoopHome) val conf = new SparkConf()。setMaster(argSparkMaster); conf.setAppName(「Evts」); 私人VAL l_valSparkContext =新SparkContext(CONF) 私人VAL l_hiveContext =新HiveContext(l_valSparkContext) DEF getSparkContext = l_valSparkContext DEF getHiveContext = l_hiveContext DEF getconfContext = CONF } –

+0

對象receiveEvents { DEF主(參數:數組[String]):單位= { var rD =「\ r \ n」 var cD =「,」 var sM =「spark:// nm2:7077」 var ip =「nm2:2181」 var hadoopHome =「/ home/..」 val con = new setContext(ip,sM) val l_sparkcontext = con.getSparkContext val topicname =「evt」 val ssc = new StreamingContext(l_sparkcontext,Seconds(9)) val eventsStream \t = KafkaUtils.createStream(ssc,「nm2:2181」,「rcv」,Map(topicname.toString - > 2)) val profile_data = w_hivecontext.sql(「select gender,income,age from hb_cust_pro」) –