2015-03-19 100 views
1

請看看下面的火花流用Scala編寫代碼:星火流 - 問題與傳遞參數

object HBase { 
    var hbaseTable = "" 
    val hConf = new HBaseConfiguration() 
    hConf.set("hbase.zookeeper.quorum", "zookeeperhost") 

    def init(input: (String)) { 
    hbaseTable = input 
    } 
    def display() { 
    print(hbaseTable) 
    } 
    def insertHbase(row: (String)) { 
    val hTable = new HTable(hConf,hbaseTable) 
    } 
} 

object mainHbase { 
    def main(args : Array[String]) { 
    if (args.length < 5) { 
     System.err.println("Usage: MetricAggregatorHBase <zkQuorum> <group> <topics> <numThreads> <hbaseTable>") 
     System.exit(1) 
    } 
    val Array(zkQuorum, group, topics, numThreads, hbaseTable) = args 
    HBase.init(hbaseTable) 
    HBase.display() 
    val sparkConf = new SparkConf().setAppName("mainHbase") 
    val ssc = new StreamingContext(sparkConf, Seconds(10)) 
    ssc.checkpoint("checkpoint") 
    val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap 
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) 
    val storeStg = lines.foreachRDD(rdd => rdd.foreach(HBase.insertHbase)) 
    lines.print() 
    ssc.start() 
    } 
} 

我試圖通過調用HBase.init方法來初始化的對象HBase參數hbaseTable。它正確設置參數。我確認通過在下一行中調用HBase.display方法。

但是,當調用HBase.insertHbase方法中的foreachRDD被調用時,其拋出錯誤hbaseTable未設置。

更新,出現異常:

java.lang.IllegalArgumentException: Table qualifier must not be empty 
     org.apache.hadoop.hbase.TableName.isLegalTableQualifierName(TableName.java:179) 
     org.apache.hadoop.hbase.TableName.isLegalTableQualifierName(TableName.java:149) 
     org.apache.hadoop.hbase.TableName.<init>(TableName.java:303) 
     org.apache.hadoop.hbase.TableName.createTableNameIfNecessary(TableName.java:339) 
     org.apache.hadoop.hbase.TableName.valueOf(TableName.java:426) 
     org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:156) 

能否請你讓我知道如何使此代碼的工作。

+0

請用引發的確切異常來更新您的問題。 – lambdas 2015-03-19 05:24:27

+0

@lambdas更新了拋出的異常。 – vijay 2015-03-19 05:48:16

回答

2

「這段代碼在哪裏運行」 - 這是我們爲了瞭解正在發生的事情而需要問的問題。

HBase是一個Scala對象 - 根據定義,它是一個單例構造,它在JVM中以'只有一次'語義進行初始化。

在初始化點,HBase.init(hbaseTable)在該Spark應用程序的驅動程序中執行,使用驅動程序的VM中的給定值初始化此對象。

但是,當我們執行:rdd.foreach(HBase.insertHbase)時,閉包將作爲每個執行器上的任務執行,該執行器承載給定RDD的分區。此時,對每個執行程序在每個VM上初始化對象HBase。正如我們所看到的那樣,在這個對象上沒有發生初始化。

這裏有兩種選擇:

我們可以添加一些檢查「將IsInitialized」到HBase對象,並新增-now conditional-調用每次調用初始化foreach。 另一種選擇是使用

rdd.foreachPartitition{partition => 
    HBase.initialize(...) 
    partition.foreach(elem => HBase.insert(elem)) 
} 

這種結構將通過元件的每個分區中的量分期償還任何初始化。也可以將它與初始化檢查結合起來以防止不必要的引導工作。

+0

非常感謝。這對我有效。 – vijay 2015-03-20 10:10:43