From a5057da6cf54d1982f1d97da8eda78e73f5c6187 Mon Sep 17 00:00:00 2001 From: Gary Helmling Date: Thu, 24 Feb 2011 22:34:44 +0000 Subject: [PATCH] HBASE-3553 Make HTable ThreadPoolExecutor actually launch up to max threads git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1074331 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 3 ++ .../apache/hadoop/hbase/client/HTable.java | 5 +++- .../hbase/client/TestMultiParallel.java | 30 +++++++++++++++++-- 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 47872a6470c..4e61fcfc9c3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -106,6 +106,9 @@ Release 0.90.2 - Unreleased HBASE-3545 Possible liveness issue with MasterServerAddress in HRegionServer getMaster (Greg Bowyer via Stack) HBASE-3548 Fix type in documentation of pseudo distributed mode + HBASE-3553 HTable ThreadPoolExecutor does not properly initialize + for hbase.htable.threads.max threads + (Himanshu Vashishtha via garyh) IMPROVEMENTS HBASE-3542 MultiGet methods in Thrift diff --git a/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/src/main/java/org/apache/hadoop/hbase/client/HTable.java index a5a90db6aaa..43ce6a9684c 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -190,10 +190,13 @@ public class HTable implements HTableInterface { // Unfortunately Executors.newCachedThreadPool does not allow us to // set the maximum size of the pool, so we have to do it ourselves. - this.pool = new ThreadPoolExecutor(0, nrThreads, + // Must also set set corethreadpool size as with a LinkedBlockingQueue, + // a new thread will not be started until the queue is full + this.pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(), new DaemonThreadFactory()); + ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true); } public Configuration getConfiguration() { diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 6974f889523..081ce769a58 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -20,8 +20,11 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -48,8 +51,9 @@ public class TestMultiParallel { private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); private static final byte [][] KEYS = makeKeys(); + private static final int slaves = 2; // also used for testing HTable pool size @BeforeClass public static void beforeClass() throws Exception { - UTIL.startMiniCluster(2); + UTIL.startMiniCluster(slaves); HTable t = UTIL.createTable(Bytes.toBytes(TEST_TABLE), Bytes.toBytes(FAMILY)); UTIL.createMultiRegions(t, Bytes.toBytes(FAMILY)); } @@ -60,7 +64,7 @@ public class TestMultiParallel { @Before public void before() throws IOException { LOG.info("before"); - if (UTIL.ensureSomeRegionServersAvailable(2)) { + if (UTIL.ensureSomeRegionServersAvailable(slaves)) { // Distribute regions UTIL.getMiniHBaseCluster().getMaster().balance(); } @@ -462,4 +466,24 @@ public class TestMultiParallel { validateEmpty(result); } } -} \ No newline at end of file + + /** + * This is for testing the active number of threads that were used while + * doing a batch operation. It inserts one row per region via the batch + * operation, and then checks the number of active threads. + * For HBASE-3553 + * @throws IOException + * @throws InterruptedException + * @throws NoSuchFieldException + * @throws SecurityException + */ + @Test public void testActiveThreadsCount() throws Exception{ + HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); + List puts = constructPutRequests(); // creates a Put for every region + table.batch(puts); + Field poolField = table.getClass().getDeclaredField("pool"); + poolField.setAccessible(true); + ThreadPoolExecutor tExecutor = (ThreadPoolExecutor) poolField.get(table); + assertEquals(slaves, tExecutor.getLargestPoolSize()); + } +}