HBASE-3767 Improve how HTable handles threads used for multi actions

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1094741 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2011-04-18 20:58:42 +00:00
parent 879ff4a39c
commit 24444f3c85
3 changed files with 83 additions and 28 deletions

View File

@ -231,6 +231,7 @@ Release 0.90.3 - Unreleased
IMPROVEMENTS
HBASE-3747 ReplicationSource should differanciate remote and local exceptions
HBASE-3652 Speed up tests by lowering some sleeps
HBASE-3767 Improve how HTable handles threads used for multi actions
TASKS
HBASE-3748 Add rolling of thrift/rest daemons to graceful_stop.sh script

View File

@ -31,7 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -58,8 +58,6 @@ import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.zookeeper.KeeperException;
/**
* Used to communicate with a single HBase table.
@ -184,18 +182,18 @@ public class HTable implements HTableInterface {
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
int nrThreads = conf.getInt("hbase.htable.threads.max", getCurrentNrHRS());
if (nrThreads == 0) {
nrThreads = 1; // is there a better default?
int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
if (maxThreads == 0) {
maxThreads = 1; // is there a better default?
}
// Unfortunately Executors.newCachedThreadPool does not allow us to
// set the maximum size of the pool, so we have to do it ourselves.
// Must also set set corethreadpool size as with a LinkedBlockingQueue,
// a new thread will not be started until the queue is full
this.pool = new ThreadPoolExecutor(nrThreads, nrThreads,
// Using the "direct handoff" approach, new threads will only be created
// if it is necessary and will grow unbounded. This could be bad but in HCM
// we only create as many Runnables as there are region servers. It means
// it also scales when new region servers are added.
this.pool = new ThreadPoolExecutor(1, maxThreads,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new SynchronousQueue<Runnable>(),
new DaemonThreadFactory());
((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true);
}
@ -204,21 +202,6 @@ public class HTable implements HTableInterface {
return configuration;
}
/**
* @return the number of region servers that are currently running
* @throws IOException if a remote or network exception occurs
*/
public int getCurrentNrHRS() throws IOException {
try {
// We go to zk rather than to master to get count of regions to avoid
// HTable having a Master dependency. See HBase-2828
return ZKUtil.getNumberOfChildren(this.connection.getZooKeeperWatcher(),
this.connection.getZooKeeperWatcher().rsZNode);
} catch (KeeperException ke) {
throw new IOException("Unexpected ZooKeeper exception", ke);
}
}
/**
* Tells whether or not a table is enabled or not.
* @param tableName Name of table to check.
@ -1258,6 +1241,14 @@ public class HTable implements HTableInterface {
}
}
/**
* The pool is used for mutli requests for this HTable
* @return the pool used for mutli
*/
ExecutorService getPool() {
return this.pool;
}
static class DaemonThreadFactory implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final ThreadGroup group;

View File

@ -34,9 +34,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -512,7 +515,7 @@ public class TestFromClientSide {
assertEquals(count, 10);
scanner.close();
}
/**
* Test simple table and non-existent row cases.
*/
@ -3970,5 +3973,65 @@ public class TestFromClientSide {
assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], 2*(i+1));
}
}
/**
* This test demonstrates how we use ThreadPoolExecutor.
* It needs to show that we only use as many threads in the pool as we have
* region servers. To do this, instead of doing real requests, we use a
* SynchronousQueue where each put must wait for a take (and vice versa)
* so that way we have full control of the number of active threads.
* @throws IOException
* @throws InterruptedException
*/
@Test
public void testPoolBehavior() throws IOException, InterruptedException {
byte[] someBytes = Bytes.toBytes("pool");
HTable table = TEST_UTIL.createTable(someBytes, someBytes);
ThreadPoolExecutor pool = (ThreadPoolExecutor)table.getPool();
// Make sure that the TPE stars with a core pool size of one and 0
// initialized worker threads
assertEquals(1, pool.getCorePoolSize());
assertEquals(0, pool.getPoolSize());
// Build a SynchronousQueue that we use for thread coordination
final SynchronousQueue<Object> queue = new SynchronousQueue<Object>();
List<Thread> threads = new ArrayList<Thread>(5);
for (int i = 0; i < 5; i++) {
threads.add(new Thread() {
public void run() {
try {
// The thread blocks here until we decide to let it go
queue.take();
} catch (InterruptedException ie) { }
}
});
}
// First, add two threads and make sure the pool size follows
pool.submit(threads.get(0));
assertEquals(1, pool.getPoolSize());
pool.submit(threads.get(1));
assertEquals(2, pool.getPoolSize());
// Next, terminate those threads and then make sure the pool is still the
// same size
queue.put(new Object());
threads.get(0).join();
queue.put(new Object());
threads.get(1).join();
assertEquals(2, pool.getPoolSize());
// Now let's simulate adding a RS meaning that we'll go up to three
// concurrent threads. The pool should not grow larger than three.
pool.submit(threads.get(2));
pool.submit(threads.get(3));
pool.submit(threads.get(4));
assertEquals(3, pool.getPoolSize());
queue.put(new Object());
queue.put(new Object());
queue.put(new Object());
}
}