HBASE-17371 Enhance 'HBaseContextSuite @ distributedScan to test HBase client' with filter

This commit is contained in:
tedyu 2016-12-27 15:44:18 -08:00
parent 5ffbd4a87d
commit ccb8d671d5
1 changed files with 12 additions and 0 deletions

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.hbase.spark
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{ CellUtil, TableName, HBaseTestingUtility}
import org.apache.spark.{SparkException, Logging, SparkContext}
@ -311,6 +312,9 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
put = new Put(Bytes.toBytes("scan2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
table.put(put)
put = new Put(Bytes.toBytes("scan2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo-2"))
table.put(put)
put = new Put(Bytes.toBytes("scan3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
table.put(put)
@ -328,15 +332,23 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
val hbaseContext = new HBaseContext(sc, config)
val scan = new Scan()
val filter = new FirstKeyOnlyFilter()
scan.setCaching(100)
scan.setStartRow(Bytes.toBytes("scan2"))
scan.setStopRow(Bytes.toBytes("scan4_"))
scan.setFilter(filter)
val scanRdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
try {
val scanList = scanRdd.map(r => r._1.copyBytes()).collect()
assert(scanList.length == 3)
var cnt = 0
scanRdd.map(r => r._2.listCells().size()).collect().foreach(l => {
cnt += l
})
// the number of cells returned would be 4 without the Filter
assert(cnt == 3);
} catch {
case ex: Exception => ex.printStackTrace()
}