HBASE-2938 Add Thread-Local Behavior To HTable Pool

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1125533 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-05-20 20:00:58 +00:00
parent 572ff1d41a
commit 2ed476e3dd
4 changed files with 314 additions and 152 deletions

View File

@ -222,6 +222,8 @@ Release 0.91.0 - Unreleased
HBASE-1476 Multithreaded Compactions
HBASE-3877 Determine Proper Defaults for Compaction ThreadPools
HBASE-3880 Make mapper function in ImportTSV plug-able (Bill Graham)
HBASE-2938 HBASE-2938 Add Thread-Local Behavior To HTable Pool
(Karthick Sankarachary)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -21,14 +21,13 @@ package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
/**
* A simple pool of HTable instances.<p>
@ -44,10 +43,10 @@ import org.apache.hadoop.hbase.util.Bytes;
* <p>Pool will manage its own cluster to the cluster. See {@link HConnectionManager}.
*/
public class HTablePool implements Closeable {
private final Map<String, Queue<HTableInterface>> tables =
new ConcurrentHashMap<String, Queue<HTableInterface>>();
private final Configuration config;
private final PoolMap<String, HTableInterface> tables;
private final int maxSize;
private final PoolType poolType;
private final Configuration config;
private final HTableInterfaceFactory tableFactory;
/**
@ -63,16 +62,82 @@ public class HTablePool implements Closeable {
* @param maxSize maximum number of references to keep for each table
*/
public HTablePool(final Configuration config, final int maxSize) {
this(config, maxSize, null);
this(config, maxSize, null, null);
}
/**
* Constructor to set maximum versions and use the specified configuration and
* table factory.
*
* @param config
* configuration
* @param maxSize
* maximum number of references to keep for each table
* @param tableFactory
* table factory
*/
public HTablePool(final Configuration config, final int maxSize,
final HTableInterfaceFactory tableFactory) {
this(config, maxSize, null, PoolType.Reusable);
}
/**
* Constructor to set maximum versions and use the specified configuration and
* pool type.
*
* @param config
* configuration
* @param maxSize
* maximum number of references to keep for each table
* @param tableFactory
* table factory
* @param poolType
* pool type which is one of {@link PoolType#Reusable} or
* {@link PoolType#ThreadLocal}
*/
public HTablePool(final Configuration config, final int maxSize,
final PoolType poolType) {
this(config, maxSize, null, poolType);
}
/**
* Constructor to set maximum versions and use the specified configuration,
* table factory and pool type. The HTablePool supports the
* {@link PoolType#Reusable} and {@link PoolType#ThreadLocal}. If the pool
* type is null or not one of those two values, then it will default to
* {@link PoolType#Reusable}.
*
* @param config
* configuration
* @param maxSize
* maximum number of references to keep for each table
* @param tableFactory
* table factory
* @param poolType
* pool type which is one of {@link PoolType#Reusable} or
* {@link PoolType#ThreadLocal}
*/
public HTablePool(final Configuration config, final int maxSize,
final HTableInterfaceFactory tableFactory, PoolType poolType) {
// Make a new configuration instance so I can safely cleanup when
// done with the pool.
this.config = config == null? new Configuration(): config;
this.maxSize = maxSize;
this.tableFactory = tableFactory == null? new HTableFactory(): tableFactory;
if (poolType == null) {
this.poolType = PoolType.Reusable;
} else {
switch (poolType) {
case Reusable:
case ThreadLocal:
this.poolType = poolType;
break;
default:
this.poolType = PoolType.Reusable;
break;
}
}
this.tables = new PoolMap<String, HTableInterface>(this.poolType, this.maxSize);
}
/**
@ -84,15 +149,9 @@ public class HTablePool implements Closeable {
* @throws RuntimeException if there is a problem instantiating the HTable
*/
public HTableInterface getTable(String tableName) {
Queue<HTableInterface> queue = tables.get(tableName);
if(queue == null) {
queue = new ConcurrentLinkedQueue<HTableInterface>();
tables.put(tableName, queue);
return createHTable(tableName);
}
HTableInterface table = queue.poll();
HTableInterface table = tables.get(tableName);
if(table == null) {
return createHTable(tableName);
table = createHTable(tableName);
}
return table;
}
@ -117,13 +176,13 @@ public class HTablePool implements Closeable {
* @param table table
*/
public void putTable(HTableInterface table) throws IOException {
Queue<HTableInterface> queue = tables.get(Bytes.toString(table.getTableName()));
if(queue.size() >= maxSize) {
String tableName = Bytes.toString(table.getTableName());
if(tables.size(tableName) >= maxSize) {
// release table instance since we're not reusing it
this.tableFactory.releaseHTableInterface(table);
return;
}
queue.add(table);
tables.put(tableName, table);
}
protected HTableInterface createHTable(String tableName) {
@ -140,14 +199,13 @@ public class HTablePool implements Closeable {
* @param tableName
*/
public void closeTablePool(final String tableName) throws IOException {
Queue<HTableInterface> queue = tables.get(tableName);
if (queue != null) {
HTableInterface table = queue.poll();
while (table != null) {
Collection<HTableInterface> tables = this.tables.values(tableName);
if (tables != null) {
for (HTableInterface table : tables) {
this.tableFactory.releaseHTableInterface(table);
table = queue.poll();
}
}
this.tables.remove(tableName);
}
/**
@ -171,7 +229,6 @@ public class HTablePool implements Closeable {
}
int getCurrentPoolSize(String tableName) {
Queue<HTableInterface> queue = tables.get(tableName);
return queue.size();
return tables.size(tableName);
}
}

View File

@ -21,13 +21,15 @@ package org.apache.hadoop.hbase.util;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -53,8 +55,7 @@ public class PoolMap<K, V> implements Map<K, V> {
private int poolMaxSize;
private Map<K, Pool<V>> pools = Collections
.synchronizedMap(new HashMap<K, Pool<V>>());
private Map<K, Pool<V>> pools = new ConcurrentHashMap<K, Pool<V>>();
public PoolMap(PoolType poolType, int poolMaxSize) {
this.poolType = poolType;
@ -102,6 +103,19 @@ public class PoolMap<K, V> implements Map<K, V> {
return values;
}
public Collection<V> values(K key) {
Collection<V> values = new ArrayList<V>();
Pool<V> pool = pools.get(key);
if (pool != null) {
Collection<V> poolValues = pool.values();
if (poolValues != null) {
values.addAll(poolValues);
}
}
return values;
}
@Override
public boolean isEmpty() {
return pools.isEmpty();
@ -270,7 +284,7 @@ public class PoolMap<K, V> implements Map<K, V> {
* the type of the resource
*/
@SuppressWarnings("serial")
public class ReusablePool<R> extends LinkedList<R> implements Pool<R> {
public class ReusablePool<R> extends ConcurrentLinkedQueue<R> implements Pool<R> {
private int maxSize;
public ReusablePool(int maxSize) {
@ -314,7 +328,7 @@ public class PoolMap<K, V> implements Map<K, V> {
*
*/
@SuppressWarnings("serial")
class RoundRobinPool<R> extends ArrayList<R> implements Pool<R> {
class RoundRobinPool<R> extends CopyOnWriteArrayList<R> implements Pool<R> {
private int maxSize;
private int nextResource = 0;

View File

@ -22,147 +22,236 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import junit.framework.Assert;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
import org.junit.Test;
/**
* Tests HTablePool.
*/
public class TestHTablePool {
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static byte [] TABLENAME = Bytes.toBytes("TestHTablePool");
public class TestHTablePool {
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static byte[] TABLENAME = Bytes.toBytes("TestHTablePool");
@BeforeClass
public static void beforeClass() throws Exception {
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLENAME, HConstants.CATALOG_FAMILY);
}
@AfterClass
public static void afterClass() throws IOException {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testTableWithStringName() throws Exception {
HTablePool pool =
new HTablePool(TEST_UTIL.getConfiguration(), Integer.MAX_VALUE);
String tableName = Bytes.toString(TABLENAME);
// Request a table from an empty pool
HTableInterface table = pool.getTable(tableName);
Assert.assertNotNull(table);
// Return the table to the pool
pool.putTable(table);
// Request a table of the same name
HTableInterface sameTable = pool.getTable(tableName);
Assert.assertSame(table, sameTable);
}
@Test
public void testTableWithByteArrayName() throws IOException {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), Integer.MAX_VALUE);
// Request a table from an empty pool
HTableInterface table = pool.getTable(TABLENAME);
Assert.assertNotNull(table);
// Return the table to the pool
pool.putTable(table);
// Request a table of the same name
HTableInterface sameTable = pool.getTable(TABLENAME);
Assert.assertSame(table, sameTable);
}
@Test
public void testTableWithMaxSize() throws Exception {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 2);
// Request tables from an empty pool
HTableInterface table1 = pool.getTable(TABLENAME);
HTableInterface table2 = pool.getTable(TABLENAME);
HTableInterface table3 = pool.getTable(TABLENAME);
// Return the tables to the pool
pool.putTable(table1);
pool.putTable(table2);
// The pool should reject this one since it is already full
pool.putTable(table3);
// Request tables of the same name
HTableInterface sameTable1 = pool.getTable(TABLENAME);
HTableInterface sameTable2 = pool.getTable(TABLENAME);
HTableInterface sameTable3 = pool.getTable(TABLENAME);
Assert.assertSame(table1, sameTable1);
Assert.assertSame(table2, sameTable2);
Assert.assertNotSame(table3, sameTable3);
}
@Test
public void testTablesWithDifferentNames() throws IOException {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), Integer.MAX_VALUE);
byte [] otherTable = Bytes.toBytes("OtherTable");
TEST_UTIL.createTable(otherTable, HConstants.CATALOG_FAMILY);
// Request a table from an empty pool
HTableInterface table1 = pool.getTable(TABLENAME);
HTableInterface table2 = pool.getTable(otherTable);
Assert.assertNotNull(table2);
// Return the tables to the pool
pool.putTable(table1);
pool.putTable(table2);
// Request tables of the same names
HTableInterface sameTable1 = pool.getTable(TABLENAME);
HTableInterface sameTable2 = pool.getTable(otherTable);
Assert.assertSame(table1, sameTable1);
Assert.assertSame(table2, sameTable2);
}
@Test
public void testCloseTablePool() throws IOException {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 4);
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
if (admin.tableExists(TABLENAME)) {
admin.disableTable(TABLENAME);
admin.deleteTable(TABLENAME);
public abstract static class TestHTablePoolType extends TestCase {
protected void setUp() throws Exception {
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLENAME, HConstants.CATALOG_FAMILY);
}
HTableDescriptor tableDescriptor = new HTableDescriptor(TABLENAME);
tableDescriptor.addFamily(new HColumnDescriptor("randomFamily"));
admin.createTable(tableDescriptor);
// Request tables from an empty pool
HTableInterface[] tables = new HTableInterface[4];
for (int i = 0; i < 4; ++i ) {
tables[i] = pool.getTable(TABLENAME);
protected void tearDown() throws IOException {
TEST_UTIL.shutdownMiniCluster();
}
pool.closeTablePool(TABLENAME);
protected abstract PoolType getPoolType();
for (int i = 0; i < 4; ++i ) {
pool.putTable(tables[i]);
@Test
public void testTableWithStringName() throws Exception {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(),
Integer.MAX_VALUE, getPoolType());
String tableName = Bytes.toString(TABLENAME);
// Request a table from an empty pool
HTableInterface table = pool.getTable(tableName);
Assert.assertNotNull(table);
// Return the table to the pool
pool.putTable(table);
// Request a table of the same name
HTableInterface sameTable = pool.getTable(tableName);
Assert.assertSame(table, sameTable);
}
Assert.assertEquals(4, pool.getCurrentPoolSize(Bytes.toString(TABLENAME)));
@Test
public void testTableWithByteArrayName() throws IOException {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(),
Integer.MAX_VALUE, getPoolType());
pool.closeTablePool(TABLENAME);
// Request a table from an empty pool
HTableInterface table = pool.getTable(TABLENAME);
Assert.assertNotNull(table);
Assert.assertEquals(0, pool.getCurrentPoolSize(Bytes.toString(TABLENAME)));
// Return the table to the pool
pool.putTable(table);
// Request a table of the same name
HTableInterface sameTable = pool.getTable(TABLENAME);
Assert.assertSame(table, sameTable);
}
@Test
public void testTablesWithDifferentNames() throws IOException {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(),
Integer.MAX_VALUE, getPoolType());
byte[] otherTable = Bytes.toBytes("OtherTable");
TEST_UTIL.createTable(otherTable, HConstants.CATALOG_FAMILY);
// Request a table from an empty pool
HTableInterface table1 = pool.getTable(TABLENAME);
HTableInterface table2 = pool.getTable(otherTable);
Assert.assertNotNull(table2);
// Return the tables to the pool
pool.putTable(table1);
pool.putTable(table2);
// Request tables of the same names
HTableInterface sameTable1 = pool.getTable(TABLENAME);
HTableInterface sameTable2 = pool.getTable(otherTable);
Assert.assertSame(table1, sameTable1);
Assert.assertSame(table2, sameTable2);
}
}
public static class TestHTableReusablePool extends TestHTablePoolType {
@Override
protected PoolType getPoolType() {
return PoolType.Reusable;
}
@Test
public void testTableWithMaxSize() throws Exception {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 2,
getPoolType());
// Request tables from an empty pool
HTableInterface table1 = pool.getTable(TABLENAME);
HTableInterface table2 = pool.getTable(TABLENAME);
HTableInterface table3 = pool.getTable(TABLENAME);
// Return the tables to the pool
pool.putTable(table1);
pool.putTable(table2);
// The pool should reject this one since it is already full
pool.putTable(table3);
// Request tables of the same name
HTableInterface sameTable1 = pool.getTable(TABLENAME);
HTableInterface sameTable2 = pool.getTable(TABLENAME);
HTableInterface sameTable3 = pool.getTable(TABLENAME);
Assert.assertSame(table1, sameTable1);
Assert.assertSame(table2, sameTable2);
Assert.assertNotSame(table3, sameTable3);
}
@Test
public void testCloseTablePool() throws IOException {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 4,
getPoolType());
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
if (admin.tableExists(TABLENAME)) {
admin.disableTable(TABLENAME);
admin.deleteTable(TABLENAME);
}
HTableDescriptor tableDescriptor = new HTableDescriptor(TABLENAME);
tableDescriptor.addFamily(new HColumnDescriptor("randomFamily"));
admin.createTable(tableDescriptor);
// Request tables from an empty pool
HTableInterface[] tables = new HTableInterface[4];
for (int i = 0; i < 4; ++i) {
tables[i] = pool.getTable(TABLENAME);
}
pool.closeTablePool(TABLENAME);
for (int i = 0; i < 4; ++i) {
pool.putTable(tables[i]);
}
Assert
.assertEquals(4, pool.getCurrentPoolSize(Bytes.toString(TABLENAME)));
pool.closeTablePool(TABLENAME);
Assert
.assertEquals(0, pool.getCurrentPoolSize(Bytes.toString(TABLENAME)));
}
}
public static class TestHTableThreadLocalPool extends TestHTablePoolType {
@Override
protected PoolType getPoolType() {
return PoolType.ThreadLocal;
}
@Test
public void testTableWithMaxSize() throws Exception {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 2,
getPoolType());
// Request tables from an empty pool
HTableInterface table1 = pool.getTable(TABLENAME);
HTableInterface table2 = pool.getTable(TABLENAME);
HTableInterface table3 = pool.getTable(TABLENAME);
// Return the tables to the pool
pool.putTable(table1);
pool.putTable(table2);
// The pool should not reject this one since the number of threads <= 2
pool.putTable(table3);
// Request tables of the same name
HTableInterface sameTable1 = pool.getTable(TABLENAME);
HTableInterface sameTable2 = pool.getTable(TABLENAME);
HTableInterface sameTable3 = pool.getTable(TABLENAME);
Assert.assertSame(table3, sameTable1);
Assert.assertSame(table3, sameTable2);
Assert.assertSame(table3, sameTable3);
}
@Test
public void testCloseTablePool() throws IOException {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 4,
getPoolType());
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
if (admin.tableExists(TABLENAME)) {
admin.disableTable(TABLENAME);
admin.deleteTable(TABLENAME);
}
HTableDescriptor tableDescriptor = new HTableDescriptor(TABLENAME);
tableDescriptor.addFamily(new HColumnDescriptor("randomFamily"));
admin.createTable(tableDescriptor);
// Request tables from an empty pool
HTableInterface[] tables = new HTableInterface[4];
for (int i = 0; i < 4; ++i) {
tables[i] = pool.getTable(TABLENAME);
}
pool.closeTablePool(TABLENAME);
for (int i = 0; i < 4; ++i) {
pool.putTable(tables[i]);
}
Assert
.assertEquals(1, pool.getCurrentPoolSize(Bytes.toString(TABLENAME)));
pool.closeTablePool(TABLENAME);
Assert
.assertEquals(0, pool.getCurrentPoolSize(Bytes.toString(TABLENAME)));
}
}
public static junit.framework.Test suite() {
TestSuite suite = new TestSuite();
suite.addTestSuite(TestHTableReusablePool.class);
suite.addTestSuite(TestHTableThreadLocalPool.class);
return suite;
}
}