HBASE-3553 Make HTable ThreadPoolExecutor actually launch up to max threads

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1074331 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Helmling 2011-02-24 22:34:44 +00:00
parent 6d2cd102c5
commit a5057da6cf
3 changed files with 34 additions and 4 deletions

View File

@ -106,6 +106,9 @@ Release 0.90.2 - Unreleased
HBASE-3545 Possible liveness issue with MasterServerAddress in HBASE-3545 Possible liveness issue with MasterServerAddress in
HRegionServer getMaster (Greg Bowyer via Stack) HRegionServer getMaster (Greg Bowyer via Stack)
HBASE-3548 Fix type in documentation of pseudo distributed mode HBASE-3548 Fix type in documentation of pseudo distributed mode
HBASE-3553 HTable ThreadPoolExecutor does not properly initialize
for hbase.htable.threads.max threads
(Himanshu Vashishtha via garyh)
IMPROVEMENTS IMPROVEMENTS
HBASE-3542 MultiGet methods in Thrift HBASE-3542 MultiGet methods in Thrift

View File

@ -190,10 +190,13 @@ public class HTable implements HTableInterface {
// Unfortunately Executors.newCachedThreadPool does not allow us to // Unfortunately Executors.newCachedThreadPool does not allow us to
// set the maximum size of the pool, so we have to do it ourselves. // set the maximum size of the pool, so we have to do it ourselves.
this.pool = new ThreadPoolExecutor(0, nrThreads, // Must also set set corethreadpool size as with a LinkedBlockingQueue,
// a new thread will not be started until the queue is full
this.pool = new ThreadPoolExecutor(nrThreads, nrThreads,
60, TimeUnit.SECONDS, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new LinkedBlockingQueue<Runnable>(),
new DaemonThreadFactory()); new DaemonThreadFactory());
((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true);
} }
public Configuration getConfiguration() { public Configuration getConfiguration() {

View File

@ -20,8 +20,11 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -48,8 +51,9 @@ public class TestMultiParallel {
private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
private static final byte [][] KEYS = makeKeys(); private static final byte [][] KEYS = makeKeys();
private static final int slaves = 2; // also used for testing HTable pool size
@BeforeClass public static void beforeClass() throws Exception { @BeforeClass public static void beforeClass() throws Exception {
UTIL.startMiniCluster(2); UTIL.startMiniCluster(slaves);
HTable t = UTIL.createTable(Bytes.toBytes(TEST_TABLE), Bytes.toBytes(FAMILY)); HTable t = UTIL.createTable(Bytes.toBytes(TEST_TABLE), Bytes.toBytes(FAMILY));
UTIL.createMultiRegions(t, Bytes.toBytes(FAMILY)); UTIL.createMultiRegions(t, Bytes.toBytes(FAMILY));
} }
@ -60,7 +64,7 @@ public class TestMultiParallel {
@Before public void before() throws IOException { @Before public void before() throws IOException {
LOG.info("before"); LOG.info("before");
if (UTIL.ensureSomeRegionServersAvailable(2)) { if (UTIL.ensureSomeRegionServersAvailable(slaves)) {
// Distribute regions // Distribute regions
UTIL.getMiniHBaseCluster().getMaster().balance(); UTIL.getMiniHBaseCluster().getMaster().balance();
} }
@ -462,4 +466,24 @@ public class TestMultiParallel {
validateEmpty(result); validateEmpty(result);
} }
} }
/**
* This is for testing the active number of threads that were used while
* doing a batch operation. It inserts one row per region via the batch
* operation, and then checks the number of active threads.
* For HBASE-3553
* @throws IOException
* @throws InterruptedException
* @throws NoSuchFieldException
* @throws SecurityException
*/
@Test public void testActiveThreadsCount() throws Exception{
HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
List<Row> puts = constructPutRequests(); // creates a Put for every region
table.batch(puts);
Field poolField = table.getClass().getDeclaredField("pool");
poolField.setAccessible(true);
ThreadPoolExecutor tExecutor = (ThreadPoolExecutor) poolField.get(table);
assertEquals(slaves, tExecutor.getLargestPoolSize());
}
} }