2

我正在讀取大小爲30 MB的json文件,並創建列族和鍵值。然後創建Put對象,將rowkey和值插入它。創建這樣的put對象列表並調用Table.batch()並傳遞該列表。當我的數組列表大小爲50000時,我會調用它。然後清除列表並調用下一批。但是,如果最終有800,000個條目的文件需要300秒才能處理。我也厭倦了table.put,但速度更慢。我正在使用hbase 1.1。我從卡夫卡那裏得到了那個json。任何建議,以提高性能表示讚賞。我查了SO論壇,但沒有太多的幫助。如果你想看看它,我會分享代碼。Hbase Table.batch需要300秒將800,000個條目插入表

問候

Raghavendra

public static void processData(String jsonData) 
{ 
    if (jsonData == null || jsonData.isEmpty()) 
    { 
     System.out.println("JSON data is null or empty. Nothing to process"); 
     return; 
    } 

    long startTime = System.currentTimeMillis(); 

    Table table = null; 
    try 
    { 
     table = HBaseConfigUtil.getInstance().getConnection().getTable(TableName.valueOf("MYTABLE")); 
    } 
    catch (IOException e1) 
    { 
     System.out.println(e1); 
    } 

    Put processData = null; 
    List<Put> bulkData = new ArrayList<Put>(); 

    try 
    { 

     //Read the json and generate the model into a class  
     //ProcessExecutions is List<ProcessExecution> 
     ProcessExecutions peData = JsonToColumnData.gson.fromJson(jsonData, ProcessExecutions.class); 

     if (peData != null) 
     { 
      //Read the data and pass it to Hbase 
      for (ProcessExecution pe : peData.processExecutions) 
      { 
       //Class Header stores some header information 
       Header headerData = pe.getHeader(); 

       String rowKey = headerData.getRowKey(); 
       processData = new Put(Bytes.toBytes(JsonToColumnData.rowKey)); 
       processData.addColumn(Bytes.toBytes("Data"), 
           Bytes.toBytes("Time"), 
           Bytes.toBytes("value")); 

       //Add to list 
       bulkData.add(processData);    
       if (bulkData.size() >= 50000) //hardcoded for demo 
       { 
        long tmpTime = System.currentTimeMillis(); 
        Object[] results = null; 
        table.batch(bulkData, results);      
        bulkData.clear(); 
        System.gc(); 
       } 
      } //end for 
      //Complete the remaining write operation 
      if (bulkData.size() > 0) 
      { 
       Object[] results = null; 
       table.batch(bulkData, results); 
       bulkData.clear(); 
       //Try to free memory 
       System.gc(); 
      } 
    } 
    catch (Exception e) 
    { 
     System.out.println(e); 
     e.printStackTrace(); 
    } 
    finally 
    { 
     try 
     { 
      table.close(); 
     } 
     catch (IOException e) 
     { 
      System.out.println("Error closing table " + e); 
      e.printStackTrace(); 
     } 
    } 

} 


//This function is added here to show the connection 
/*public Connection getConnection() 
{ 

    try 
    { 
     if (this.connection == null) 
     { 
      ExecutorService executor = Executors.newFixedThreadPool(HBaseConfigUtil.THREADCOUNT); 
      this.connection = ConnectionFactory.createConnection(this.getHBaseConfiguration(), executor); 
     } 
    } 
    catch (IOException e) 
    { 
     e.printStackTrace(); 
     System.out.println("Error in getting connection " + e.getMessage()); 
    } 

    return this.connection; 
}*/ 
+0

請共享代碼段。 –

+0

理想情況下,table.batch也以類似的方式工作,如下所述。它也應該工作。 –

+0

@RamGhadiyaram,感謝您發表您的評論。我在另一個問題上閱讀了你的答案,但那對我沒有幫助。在幾分鐘內共享我的代碼 – AnswerSeeker

回答

2

我有我需要解析5 GB JSON和插入到HBase的表同樣的情況......你可以試試下面的方法(這應該工作)這在我的案例中證明了一批100000條記錄非常快。

public void addMultipleRecordsAtaShot(final ArrayList<Put> puts, final String tableName) throws Exception { 
     try { 
      final HTable table = new HTable(HBaseConnection.getHBaseConfiguration(), getTable(tableName)); 
      table.put(puts); 
      LOG.info("INSERT record[s] " + puts.size() + " to table " + tableName + " OK."); 
     } catch (final Throwable e) { 
      e.printStackTrace(); 
     } finally { 
      LOG.info("Processed ---> " + puts.size()); 
      if (puts != null) { 
       puts.clear(); 
      } 
     } 
    } 

有關詳細信息,增加緩衝區大小檢查我的答案在不同的context增加緩衝區的大小,請參閱文檔https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Table.html