2017-09-23 131 views
0

我正在嘗試使用Bigtable上的開始和結束行進行掃描。 掃描之間的元素大約爲100K。 我希望能夠在HBase中使用setCaching(500)批量處理它們。Hbase vs Google Bigtable:掃描大量的行

在Bigtable中,似乎setCaching被忽略,它會嘗試在1個RPC中獲取整個結果集。怎樣才能實現類似HBase?

我使用Java驅動程序bigtable-hbase-1.1和版本1.0.0-pre3

Bigtable的配置:

Configuration conf = new Configuration(); 
conf.set("google.bigtable.buffered.mutator.throttling.enable", "false"); 
conf.set("google.bigtable.rpc.timeout.ms", "1500000"); 
conf.set("google.bigtable.grpc.read.partial.row.timeout.ms","1500000"); 
conf.set("google.bigtable.long.rpc.timeout.ms", "1500000"); 
conf.set("google.bigtable.grpc.retry.deadlineexceeded.enable", "false"); 
conf.set("google.bigtable.buffered.mutator.max.inflight.rpcs", "500"); 
conf.set("google.bigtable.bulk.max.row.key.count", "500"); 

Configuration conff = BigtableConfiguration.configure(conf,projectID,instanceID); 
connection = BigtableConfiguration.connect(conff); 

掃描器配置:

byte[] start = "prefix".getbytes() ; 
byte[] end = Bytes.add("prefix".getbytes(),(byte))0xff); 
Scan scan = new Scan(start, end); 

預計排出來的數字是100KS的順序。

回答

0

讀取行時,您不必擔心批處理問題。 Bigtable響應得到流式傳輸,並且知道背壓。我們依靠GRPC來緩衝流的塊。 下面是關於GRPC流介紹的鏈接: https://grpc.io/docs/guides/concepts.html#server-streaming-rpc

介意嘗試此示例代碼,讓我知道,如果它的工作原理(即無期限超過錯誤)。如果示例代碼有效,請修改它以掃描您自己的數據並確保它仍然有效。如果沒有,請讓我知道。

的pom.xml:

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 

    <groupId>com.google.cloud.example</groupId> 
    <artifactId>row-write-read-example</artifactId> 
    <version>1.0-SNAPSHOT</version> 

    <dependencies> 
    <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <version>4.12</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>com.google.cloud.bigtable</groupId> 
     <artifactId>bigtable-hbase-1.x</artifactId> 
     <version>1.0.0-pre3</version> 
    </dependency> 
    </dependencies> 

    <build> 
    <plugins> 
     <plugin> 
     <artifactId>maven-compiler-plugin</artifactId> 
     <version>3.6.2</version> 
     <configuration> 
      <source>1.8</source> 
      <target>1.8</target> 
     </configuration> 
     </plugin> 
    </plugins> 
    </build> 
</project> 

的java:

import com.google.cloud.bigtable.hbase.BigtableConfiguration; 
import java.io.IOException; 
import org.apache.hadoop.hbase.HColumnDescriptor; 
import org.apache.hadoop.hbase.HConstants; 
import org.apache.hadoop.hbase.HTableDescriptor; 
import org.apache.hadoop.hbase.TableName; 
import org.apache.hadoop.hbase.client.Admin; 
import org.apache.hadoop.hbase.client.BufferedMutator; 
import org.apache.hadoop.hbase.client.Connection; 
import org.apache.hadoop.hbase.client.Put; 
import org.apache.hadoop.hbase.client.Result; 
import org.apache.hadoop.hbase.client.ResultScanner; 
import org.apache.hadoop.hbase.client.Scan; 
import org.apache.hadoop.hbase.client.Table; 

public class WriteReadTest { 
    private static final String PROJECT_ID = "<YOUR_PROJECT_ID>"; 
    private static final String INSTANCE_ID = "<YOUR_INSTANCE_ID>"; 
    private static final String TABLE_ID = "<YOUR_NONEXISTENT_TABLE>"; 
    private static final String FAMILY = "cf"; 

    private static final TableName TABLE_NAME = TableName.valueOf(TABLE_ID); 

    public static void main(String[] args) throws IOException { 
    try(Connection connection = BigtableConfiguration.connect(PROJECT_ID, INSTANCE_ID); 
     Admin admin = connection.getAdmin()) { 

     // Setup 
     admin.createTable(
      new HTableDescriptor(TABLE_NAME) 
       .addFamily(new HColumnDescriptor(FAMILY)) 
    ); 

     try { 
     // Write the rows 
     populateTable(connection, 2_000_000); 

     // Read the rows 
     readFullTable(connection); 
     } finally { 
     admin.disableTable(TABLE_NAME); 
     admin.deleteTable(TABLE_NAME); 
     } 

    } 
    } 

    private static void populateTable(Connection connection, int rowCount) throws IOException { 
    long startTime = System.currentTimeMillis(); 
    int buckets = 100; 
    int maxWidth = Integer.toString(buckets).length(); 

    try(BufferedMutator bufferedMutator = connection.getBufferedMutator(TABLE_NAME)) { 
     for (int i = 0; i < rowCount; i++) { 
     String prefix = String.format("%0" + maxWidth + "d", i % buckets); 
     String key = prefix + "-" + String.format("%010d", i); 
     String value = "value-" + key; 

     Put put = new Put(key.getBytes()) 
      .addColumn(
       FAMILY.getBytes(), 
       HConstants.EMPTY_BYTE_ARRAY, 
       value.getBytes() 
      ); 

     bufferedMutator.mutate(put); 
     } 
    } 

    long endTime = System.currentTimeMillis(); 
    System.out.printf("Populated table in %d secs, writing %d rows\n", (endTime - startTime)/1000, rowCount); 
    } 

    private static void readFullTable(Connection connection) throws IOException { 
    long startTime = System.currentTimeMillis(); 

    int count = 0; 
    try(Table table = connection.getTable(TABLE_NAME); 
     ResultScanner scanner = table.getScanner(new Scan("0".getBytes(), "z".getBytes()))) { 

     for(Result row = scanner.next(); row != null; row = scanner.next()) { 
     count++; 
     } 
    } 

    long endTime = System.currentTimeMillis(); 

    System.out.printf("Scanned table in %d secs, reading %d rows\n", (endTime - startTime)/1000, count); 
    } 
} 
+0

我一直保持了5分鐘之久RPC超時,並仍然得到超過截止日期錯誤。只有10萬行? – Peter

+0

有沒有辦法調整緩衝區塊的大小?可能它太小了。 在我的情況下,因爲客戶端在新加坡和臺灣bigtable一次往返需要大約50毫秒 – Peter

+0

或者是因爲Bigtable首先在服務器上獲取整個結果,然後將其傳輸到客戶端。我想那麼得到整個結果集可能會很費時間呢? – Peter