HBASE-4199 blockCache summary - backend (Doug Meil)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1160488 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-08-22 22:58:39 +00:00
parent d0de098625
commit 0e152ad9e2
10 changed files with 671 additions and 8 deletions

View File

@ -393,6 +393,7 @@ Release 0.91.0 - Unreleased
HBASE-4236 Don't lock the stream while serializing the response (Benoit Sigoure)
HBASE-4237 Directly remove the call being handled from the map of outstanding RPCs
(Benoit Sigoure)
HBASE-4199 blockCache summary - backend (Doug Meil)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -19,6 +19,10 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
@ -90,4 +94,18 @@ public interface BlockCache {
public long getCurrentSize();
public long getEvictedCount();
}
/**
* Performs a BlockCache summary and returns a List of BlockCacheColumnFamilySummary objects.
* This method could be fairly heavyweight in that it evaluates the entire HBase file-system
* against what is in the RegionServer BlockCache.
* <br><br>
* The contract of this interface is to return the List in sorted order by Table name, then
* ColumnFamily.
*
* @param conf HBaseConfiguration
* @return List of BlockCacheColumnFamilySummary
* @throws IOException exception
*/
public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(Configuration conf) throws IOException;
}

View File

@ -0,0 +1,246 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
/**
* BlockCacheColumnFamilySummary represents a summary of the blockCache usage
* at Table/ColumnFamily granularity.
* <br><br>
* As ColumnFamilies are owned by Tables, a summary by ColumnFamily implies that
* the owning Table is included in the summarization.
*
*/
public class BlockCacheColumnFamilySummary implements Writable, Comparable<BlockCacheColumnFamilySummary> {
private String table = "";
private String columnFamily = "";
private int blocks;
private long heapSize;
/**
* Default constructor for Writable
*/
public BlockCacheColumnFamilySummary() {
}
/**
*
* @param table table
* @param columnFamily columnFamily
*/
public BlockCacheColumnFamilySummary(String table, String columnFamily) {
this.table = table;
this.columnFamily = columnFamily;
}
/**
*
* @return table
*/
public String getTable() {
return table;
}
/**
*
* @param table (table that owns the cached block)
*/
public void setTable(String table) {
this.table = table;
}
/**
*
* @return columnFamily
*/
public String getColumnFamily() {
return columnFamily;
}
/**
*
* @param columnFamily (columnFamily that owns the cached block)
*/
public void setColumnFamily(String columnFamily) {
this.columnFamily = columnFamily;
}
/**
*
* @return blocks in the cache
*/
public int getBlocks() {
return blocks;
}
/**
*
* @param blocks in the cache
*/
public void setBlocks(int blocks) {
this.blocks = blocks;
}
/**
*
* @return heapSize in the cache
*/
public long getHeapSize() {
return heapSize;
}
/**
* Increments the number of blocks in the cache for this entry
*/
public void incrementBlocks() {
this.blocks++;
}
/**
*
* @param heapSize to increment
*/
public void incrementHeapSize(long heapSize) {
this.heapSize = this.heapSize + heapSize;
}
/**
*
* @param heapSize (total heapSize for the table/CF)
*/
public void setHeapSize(long heapSize) {
this.heapSize = heapSize;
}
@Override
public void readFields(DataInput in) throws IOException {
table = in.readUTF();
columnFamily = in.readUTF();
blocks = in.readInt();
heapSize = in.readLong();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(table);
out.writeUTF(columnFamily);
out.writeInt(blocks);
out.writeLong(heapSize);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result
+ ((columnFamily == null) ? 0 : columnFamily.hashCode());
result = prime * result + ((table == null) ? 0 : table.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
BlockCacheColumnFamilySummary other = (BlockCacheColumnFamilySummary) obj;
if (columnFamily == null) {
if (other.columnFamily != null)
return false;
} else if (!columnFamily.equals(other.columnFamily))
return false;
if (table == null) {
if (other.table != null)
return false;
} else if (!table.equals(other.table))
return false;
return true;
}
@Override
public String toString() {
return "BlockCacheSummaryEntry [table=" + table + ", columnFamily="
+ columnFamily + ", blocks=" + blocks + ", heapSize=" + heapSize + "]";
}
/**
* Construct a BlockCacheSummaryEntry from a full StoreFile Path
* <br><br>
* The path is expected to be in the format of...
* <pre>
* hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
* </pre>
* ... where: <br>
* '-ROOT-' = Table <br>
* '70236052' = Region <br>
* 'info' = ColumnFamily <br>
* '3944417774205889744' = StoreFile
*
* @param path (full StoreFile Path)
* @return BlockCacheSummaryEntry
*/
public static BlockCacheColumnFamilySummary createFromStoreFilePath(Path path) {
// The full path will look something like this...
// hdfs://localhost:51169/user/doug.meil/-ROOT-/70236052/info/3944417774205889744
// tbl region cf sf
String sp = path.toString();
String s[] = sp.split("\\/");
BlockCacheColumnFamilySummary bcse = null;
if (s.length >= 4) {
// why 4? StoreFile, CF, Region, Table
String table = s[s.length - 4]; // 4th from the end
String cf = s[s.length - 2]; // 2nd from the end
bcse = new BlockCacheColumnFamilySummary(table, cf);
}
return bcse;
}
@Override
public int compareTo(BlockCacheColumnFamilySummary o) {
int i = table.compareTo(o.getTable());
if (i != 0) {
return i;
}
return columnFamily.compareTo(o.getColumnFamily());
}
/**
* Creates a new BlockCacheSummaryEntry
*
* @param e BlockCacheSummaryEntry
* @return new BlockCacheSummaryEntry
*/
public static BlockCacheColumnFamilySummary create(BlockCacheColumnFamilySummary e) {
BlockCacheColumnFamilySummary e2 = new BlockCacheColumnFamilySummary();
e2.setTable(e.getTable());
e2.setColumnFamily(e.getColumnFamily());
return e2;
}
}

View File

@ -19,21 +19,32 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -725,6 +736,46 @@ public class LruBlockCache implements BlockCache, HeapSize {
(concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
}
@Override
public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(Configuration conf) throws IOException {
Map<String, Path> sfMap = FSUtils.getTableStoreFilePathMap(
FileSystem.get(conf),
FSUtils.getRootDir(conf));
// quirky, but it's a compound key and this is a shortcut taken instead of
// creating a class that would represent only a key.
Map<BlockCacheColumnFamilySummary, BlockCacheColumnFamilySummary> bcs =
new HashMap<BlockCacheColumnFamilySummary, BlockCacheColumnFamilySummary>();
final String pattern = "\\" + HFile.CACHE_KEY_SEPARATOR;
for (CachedBlock cb : map.values()) {
// split name and get the first part (e.g., "8351478435190657655_0")
// see HFile.getBlockCacheKey for structure of block cache key.
String s[] = cb.getName().split(pattern);
if (s.length > 0) {
String sf = s[0];
Path path = sfMap.get(sf);
if ( path != null) {
BlockCacheColumnFamilySummary lookup =
BlockCacheColumnFamilySummary.createFromStoreFilePath(path);
BlockCacheColumnFamilySummary bcse = bcs.get(lookup);
if (bcse == null) {
bcse = BlockCacheColumnFamilySummary.create(lookup);
bcs.put(lookup,bcse);
}
bcse.incrementBlocks();
bcse.incrementHeapSize(cb.heapSize());
}
}
}
List<BlockCacheColumnFamilySummary> list =
new ArrayList<BlockCacheColumnFamilySummary>(bcs.values());
Collections.sort( list );
return list;
}
// Simple calculators of sizes given factors and maxSize
private long acceptableSize() {

View File

@ -21,10 +21,11 @@ package org.apache.hadoop.hbase.io.hfile;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
@ -123,5 +124,11 @@ public class SimpleBlockCache implements BlockCache {
public int evictBlocksByPrefix(String string) {
throw new UnsupportedOperationException();
}
@Override
public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(Configuration conf) {
throw new UnsupportedOperationException();
}
}

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.ipc.RemoteException;
@ -487,4 +488,14 @@ public interface HRegionInterface extends VersionedProtocol, Stoppable, Abortabl
final byte[] family, final byte[] qualifier, final CompareOp compareOp,
final WritableByteArrayComparable comparator, final Delete delete)
throws IOException;
/**
* Performs a BlockCache summary and returns a List of BlockCacheColumnFamilySummary objects.
* This method could be fairly heavyweight in that it evaluates the entire HBase file-system
* against what is in the RegionServer BlockCache.
*
* @return BlockCacheColumnFamilySummary
* @throws IOException exception
*/
public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries() throws IOException;
}

View File

@ -98,7 +98,7 @@ import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
@ -119,7 +119,6 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler;
import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
@ -3024,4 +3023,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
new HRegionServerCommandLine(regionServerClass).doMain(args);
}
@Override
public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries() throws IOException {
BlockCache c = StoreFile.getBlockCache(this.conf);
return c.getBlockCacheColumnFamilySummaries(this.conf);
}
}

View File

@ -1091,4 +1091,61 @@ public abstract class FSUtils {
out.close();
}
}
/**
* Runs through the HBase rootdir and creates a reverse lookup map for
* table StoreFile names to the full Path.
* <br>
* Example...<br>
* Key = 3944417774205889744 <br>
* Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
*
* @param fs The file system to use.
* @param hbaseRootDir The root directory to scan.
* @return Map keyed by StoreFile name with a value of the full Path.
* @throws IOException When scanning the directory fails.
*/
public static Map<String, Path> getTableStoreFilePathMap(
final FileSystem fs, final Path hbaseRootDir)
throws IOException {
Map<String, Path> map = new HashMap<String, Path>();
// if this method looks similar to 'getTableFragmentation' that is because
// it was borrowed from it.
DirFilter df = new DirFilter(fs);
// presumes any directory under hbase.rootdir is a table
FileStatus [] tableDirs = fs.listStatus(hbaseRootDir, df);
for (FileStatus tableDir : tableDirs) {
// Skip the .log directory. All others should be tables. Inside a table,
// there are compaction.dir directories to skip. Otherwise, all else
// should be regions.
Path d = tableDir.getPath();
if (d.getName().equals(HConstants.HREGION_LOGDIR_NAME)) {
continue;
}
FileStatus[] regionDirs = fs.listStatus(d, df);
for (FileStatus regionDir : regionDirs) {
Path dd = regionDir.getPath();
if (dd.getName().equals(HConstants.HREGION_COMPACTIONDIR_NAME)) {
continue;
}
// else its a region name, now look in region for families
FileStatus[] familyDirs = fs.listStatus(dd, df);
for (FileStatus familyDir : familyDirs) {
Path family = familyDir.getPath();
// now in family, iterate over the StoreFiles and
// put in map
FileStatus[] familyStatus = fs.listStatus(family);
for (FileStatus sfStatus : familyStatus) {
Path sf = sfStatus.getPath();
map.put( sf.getName(), sf);
}
}
}
}
return map;
}
}

View File

@ -0,0 +1,120 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
/**
* Tests the BlockCacheColumnFamilySummary class
*
*/
public class TestBlockCacheColumnFamilySummary {
/**
*
*/
@Test
public void testEquals() {
BlockCacheColumnFamilySummary e1 = new BlockCacheColumnFamilySummary();
e1.setTable("table1");
e1.setColumnFamily("cf1");
BlockCacheColumnFamilySummary e2 = new BlockCacheColumnFamilySummary();
e2.setTable("table1");
e2.setColumnFamily("cf1");
assertEquals("bcse", e1, e2);
}
/**
*
*/
@Test
public void testNotEquals() {
BlockCacheColumnFamilySummary e1 = new BlockCacheColumnFamilySummary();
e1.setTable("table1");
e1.setColumnFamily("cf1");
BlockCacheColumnFamilySummary e2 = new BlockCacheColumnFamilySummary();
e2.setTable("tablexxxxxx");
e2.setColumnFamily("cf1");
assertTrue("bcse", ! e1.equals(e2));
}
/**
*
*/
@Test
public void testMapLookup() {
Map<BlockCacheColumnFamilySummary, BlockCacheColumnFamilySummary> bcs =
new HashMap<BlockCacheColumnFamilySummary, BlockCacheColumnFamilySummary>();
BlockCacheColumnFamilySummary e1 = new BlockCacheColumnFamilySummary("table1","cf1");
BlockCacheColumnFamilySummary lookup = bcs.get(e1);
if (lookup == null) {
lookup = BlockCacheColumnFamilySummary.create(e1);
bcs.put(e1,lookup);
lookup.incrementBlocks();
lookup.incrementHeapSize(100L);
}
BlockCacheColumnFamilySummary e2 = new BlockCacheColumnFamilySummary("table1","cf1");
BlockCacheColumnFamilySummary l2 = bcs.get(e2);
assertEquals("blocks",1,l2.getBlocks());
assertEquals("heap",100L,l2.getHeapSize());
}
/**
*
*/
@Test
public void testMapEntry() {
Map<BlockCacheColumnFamilySummary, BlockCacheColumnFamilySummary> bcs =
new HashMap<BlockCacheColumnFamilySummary, BlockCacheColumnFamilySummary>();
BlockCacheColumnFamilySummary e1 = new BlockCacheColumnFamilySummary("table1","cf1");
bcs.put(e1, e1);
BlockCacheColumnFamilySummary e2 = new BlockCacheColumnFamilySummary("table1","cf1");
bcs.put(e2, e2);
BlockCacheColumnFamilySummary e3 = new BlockCacheColumnFamilySummary("table1","cf1");
bcs.put(e3, e3);
assertEquals("mapSize",1,bcs.size());
}
}

View File

@ -0,0 +1,146 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests the block cache summary functionality in StoreFile,
* which contains the BlockCache
*
*/
public class TestStoreFileBlockCacheSummary {
final Log LOG = LogFactory.getLog(getClass());
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final String TEST_TABLE = "testTable";
private static final String TEST_TABLE2 = "testTable2";
private static final String TEST_CF = "testFamily";
private static byte [] FAMILY = Bytes.toBytes(TEST_CF);
private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte [] VALUE = Bytes.toBytes("testValue");
private final int TOTAL_ROWS = 4;
/**
* @throws java.lang.Exception exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster();
}
/**
* @throws java.lang.Exception exception
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
private Put createPut(byte[] family, String row) {
Put put = new Put( Bytes.toBytes(row));
put.add(family, QUALIFIER, VALUE);
return put;
}
/**
* This test inserts data into multiple tables and then reads both tables to ensure
* they are in the block cache.
*
* @throws Exception exception
*/
@Test
public void testBlockCacheSummary() throws Exception {
HTable ht = TEST_UTIL.createTable(Bytes.toBytes(TEST_TABLE), FAMILY);
addRows(ht, FAMILY);
HTable ht2 = TEST_UTIL.createTable(Bytes.toBytes(TEST_TABLE2), FAMILY);
addRows(ht2, FAMILY);
TEST_UTIL.flush();
scan(ht, FAMILY);
scan(ht2, FAMILY);
BlockCache bc = StoreFile.getBlockCache(TEST_UTIL.getConfiguration());
List<BlockCacheColumnFamilySummary> bcs =
bc.getBlockCacheColumnFamilySummaries(TEST_UTIL.getConfiguration());
LOG.info("blockCacheSummary: " + bcs);
assertEquals("blockCache summary has entries", 3, bcs.size());
BlockCacheColumnFamilySummary e = bcs.get(0);
assertEquals("table", "-ROOT-", e.getTable());
assertEquals("cf", "info", e.getColumnFamily());
e = bcs.get(1);
assertEquals("table", TEST_TABLE, e.getTable());
assertEquals("cf", TEST_CF, e.getColumnFamily());
e = bcs.get(2);
assertEquals("table", TEST_TABLE2, e.getTable());
assertEquals("cf", TEST_CF, e.getColumnFamily());
}
private void addRows(HTable ht, byte[] family) throws IOException {
List<Row> rows = new ArrayList<Row>();
for (int i = 0; i < TOTAL_ROWS;i++) {
rows.add(createPut(family, "row" + i));
}
HTableUtil.bucketRsBatch( ht, rows);
}
private void scan(HTable ht, byte[] family) throws IOException {
Scan scan = new Scan();
scan.addColumn(family, QUALIFIER);
int count = 0;
for(@SuppressWarnings("unused") Result result : ht.getScanner(scan)) {
count++;
}
if (TOTAL_ROWS != count) {
throw new IOException("Incorrect number of rows!");
}
}
}