diff --git a/CHANGES.txt b/CHANGES.txt index f17994d9345..cfa78d71598 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -231,6 +231,7 @@ Release 0.90.3 - Unreleased IMPROVEMENTS HBASE-3747 ReplicationSource should differanciate remote and local exceptions HBASE-3652 Speed up tests by lowering some sleeps + HBASE-3767 Improve how HTable handles threads used for multi actions TASKS HBASE-3748 Add rolling of thrift/rest daemons to graceful_stop.sh script diff --git a/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/src/main/java/org/apache/hadoop/hbase/client/HTable.java index bb3a8faec1f..edacf5654f1 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -31,7 +31,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -58,8 +58,6 @@ import org.apache.hadoop.hbase.ipc.ExecRPCInvoker; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.zookeeper.KeeperException; /** * Used to communicate with a single HBase table. @@ -184,18 +182,18 @@ public class HTable implements HTableInterface { HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1); - int nrThreads = conf.getInt("hbase.htable.threads.max", getCurrentNrHRS()); - if (nrThreads == 0) { - nrThreads = 1; // is there a better default? + int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); + if (maxThreads == 0) { + maxThreads = 1; // is there a better default? } - // Unfortunately Executors.newCachedThreadPool does not allow us to - // set the maximum size of the pool, so we have to do it ourselves. - // Must also set set corethreadpool size as with a LinkedBlockingQueue, - // a new thread will not be started until the queue is full - this.pool = new ThreadPoolExecutor(nrThreads, nrThreads, + // Using the "direct handoff" approach, new threads will only be created + // if it is necessary and will grow unbounded. This could be bad but in HCM + // we only create as many Runnables as there are region servers. It means + // it also scales when new region servers are added. + this.pool = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, - new LinkedBlockingQueue(), + new SynchronousQueue(), new DaemonThreadFactory()); ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true); } @@ -204,21 +202,6 @@ public class HTable implements HTableInterface { return configuration; } - /** - * @return the number of region servers that are currently running - * @throws IOException if a remote or network exception occurs - */ - public int getCurrentNrHRS() throws IOException { - try { - // We go to zk rather than to master to get count of regions to avoid - // HTable having a Master dependency. See HBase-2828 - return ZKUtil.getNumberOfChildren(this.connection.getZooKeeperWatcher(), - this.connection.getZooKeeperWatcher().rsZNode); - } catch (KeeperException ke) { - throw new IOException("Unexpected ZooKeeper exception", ke); - } - } - /** * Tells whether or not a table is enabled or not. * @param tableName Name of table to check. @@ -1258,6 +1241,14 @@ public class HTable implements HTableInterface { } } + /** + * The pool is used for mutli requests for this HTable + * @return the pool used for mutli + */ + ExecutorService getPool() { + return this.pool; + } + static class DaemonThreadFactory implements ThreadFactory { static final AtomicInteger poolNumber = new AtomicInteger(1); final ThreadGroup group; diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 199b7aeea5d..9ad0dadc9b4 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -34,9 +34,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.UUID; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -512,7 +515,7 @@ public class TestFromClientSide { assertEquals(count, 10); scanner.close(); } - + /** * Test simple table and non-existent row cases. */ @@ -3970,5 +3973,65 @@ public class TestFromClientSide { assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], 2*(i+1)); } } + + /** + * This test demonstrates how we use ThreadPoolExecutor. + * It needs to show that we only use as many threads in the pool as we have + * region servers. To do this, instead of doing real requests, we use a + * SynchronousQueue where each put must wait for a take (and vice versa) + * so that way we have full control of the number of active threads. + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testPoolBehavior() throws IOException, InterruptedException { + byte[] someBytes = Bytes.toBytes("pool"); + HTable table = TEST_UTIL.createTable(someBytes, someBytes); + ThreadPoolExecutor pool = (ThreadPoolExecutor)table.getPool(); + + // Make sure that the TPE stars with a core pool size of one and 0 + // initialized worker threads + assertEquals(1, pool.getCorePoolSize()); + assertEquals(0, pool.getPoolSize()); + + // Build a SynchronousQueue that we use for thread coordination + final SynchronousQueue queue = new SynchronousQueue(); + List threads = new ArrayList(5); + for (int i = 0; i < 5; i++) { + threads.add(new Thread() { + public void run() { + try { + // The thread blocks here until we decide to let it go + queue.take(); + } catch (InterruptedException ie) { } + } + }); + } + // First, add two threads and make sure the pool size follows + pool.submit(threads.get(0)); + assertEquals(1, pool.getPoolSize()); + pool.submit(threads.get(1)); + assertEquals(2, pool.getPoolSize()); + + // Next, terminate those threads and then make sure the pool is still the + // same size + queue.put(new Object()); + threads.get(0).join(); + queue.put(new Object()); + threads.get(1).join(); + assertEquals(2, pool.getPoolSize()); + + // Now let's simulate adding a RS meaning that we'll go up to three + // concurrent threads. The pool should not grow larger than three. + pool.submit(threads.get(2)); + pool.submit(threads.get(3)); + pool.submit(threads.get(4)); + assertEquals(3, pool.getPoolSize()); + queue.put(new Object()); + queue.put(new Object()); + queue.put(new Object()); + } + + }