From e8a34fb3845ddce36dc3aa2afc0b5f7fce62a77d Mon Sep 17 00:00:00 2001 From: Devaraj Das Date: Wed, 18 Feb 2015 10:23:02 -0800 Subject: [PATCH] HBASE-13036. Meta scanner should use its own threadpool --- .../hbase/client/ConnectionManager.java | 103 ++++++++++++------ .../apache/hadoop/hbase/client/HTable.java | 1 + .../hbase/client/TestMetaWithReplicas.java | 20 ++++ 3 files changed, 90 insertions(+), 34 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 4b267c0a9f3..17d13781ab8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -564,6 +564,9 @@ class ConnectionManager { // thread executor shared by all HTableInterface instances created // by this connection private volatile ExecutorService batchPool = null; + // meta thread executor shared by all HTableInterface instances created + // by this connection + private volatile ExecutorService metaLookupPool = null; private volatile boolean cleanupPool = false; private final Configuration conf; @@ -750,52 +753,84 @@ class ConnectionManager { private ExecutorService getBatchPool() { if (batchPool == null) { - // shared HTable thread executor not yet initialized synchronized (this) { if (batchPool == null) { - int maxThreads = conf.getInt("hbase.hconnection.threads.max", 256); - int coreThreads = conf.getInt("hbase.hconnection.threads.core", 256); - if (maxThreads == 0) { - maxThreads = Runtime.getRuntime().availableProcessors() * 8; - } - if (coreThreads == 0) { - coreThreads = Runtime.getRuntime().availableProcessors() * 8; - } - long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); - LinkedBlockingQueue workQueue = - new LinkedBlockingQueue(maxThreads * - conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, - HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); - ThreadPoolExecutor tpe = new ThreadPoolExecutor( - coreThreads, - maxThreads, - keepAliveTime, - TimeUnit.SECONDS, - workQueue, - Threads.newDaemonThreadFactory(toString() + "-shared-")); - tpe.allowCoreThreadTimeOut(true); - this.batchPool = tpe; + this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256), + conf.getInt("hbase.hconnection.threads.core", 256), "-shared-"); + this.cleanupPool = true; } - this.cleanupPool = true; } } return this.batchPool; } + private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint) { + // shared HTable thread executor not yet initialized + if (maxThreads == 0) { + maxThreads = Runtime.getRuntime().availableProcessors() * 8; + } + if (coreThreads == 0) { + coreThreads = Runtime.getRuntime().availableProcessors() * 8; + } + long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); + LinkedBlockingQueue workQueue = + new LinkedBlockingQueue(maxThreads * + conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, + HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); + ThreadPoolExecutor tpe = new ThreadPoolExecutor( + coreThreads, + maxThreads, + keepAliveTime, + TimeUnit.SECONDS, + workQueue, + Threads.newDaemonThreadFactory(toString() + "-shared-")); + tpe.allowCoreThreadTimeOut(true); + return tpe; + } + + private ExecutorService getMetaLookupPool() { + if (this.metaLookupPool == null) { + synchronized (this) { + if (this.metaLookupPool == null) { + //The meta lookup can happen on replicas of the meta (if the appropriate configs + //are enabled).In a replicated-meta setup, the number '3' is assumed as the max + //number of replicas by default (unless it is configured to be of a higher value). + //In a non-replicated-meta setup, only one thread would be active. + this.metaLookupPool = getThreadPool( + conf.getInt("hbase.hconnection.meta.lookup.threads.max", 3), + conf.getInt("hbase.hconnection.meta.lookup.threads.max.core", 3), + "-metaLookup-shared-"); + } + } + } + return this.metaLookupPool; + } + + protected ExecutorService getCurrentMetaLookupPool() { + return metaLookupPool; + } + protected ExecutorService getCurrentBatchPool() { return batchPool; } - private void shutdownBatchPool() { + private void shutdownPools() { if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) { - this.batchPool.shutdown(); - try { - if (!this.batchPool.awaitTermination(10, TimeUnit.SECONDS)) { - this.batchPool.shutdownNow(); - } - } catch (InterruptedException e) { - this.batchPool.shutdownNow(); + shutdownBatchPool(this.batchPool); + } + if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) { + shutdownBatchPool(this.metaLookupPool); + } + } + + private void shutdownBatchPool(ExecutorService pool) { + pool.shutdown(); + try { + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + pool.shutdownNow(); } + } catch (InterruptedException e) { + pool.shutdownNow(); } } @@ -1191,7 +1226,7 @@ class ConnectionManager { ReversedClientScanner rcs = null; try { rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this, - rpcCallerFactory, rpcControllerFactory, getBatchPool(), 0); + rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0); regionInfoRow = rcs.next(); } finally { if (rcs != null) { @@ -2277,7 +2312,7 @@ class ConnectionManager { return; } closeMaster(); - shutdownBatchPool(); + shutdownPools(); this.closed = true; closeZooKeeperWatcher(); this.stubs.clear(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 4afaf227dce..d36cd0f1412 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -1447,6 +1447,7 @@ public class HTable implements HTableInterface { terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); } while (!terminated); } catch (InterruptedException e) { + this.pool.shutdownNow(); LOG.warn("waitForTermination interrupted"); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java index dcd4a5e2b0f..31188212c03 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java @@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.client; import javax.annotation.Nullable; + import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,6 +40,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation; import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -221,6 +224,23 @@ public class TestMetaWithReplicas { assertTrue(Arrays.equals(r.getRow(), row)); } + @Test + public void testMetaLookupThreadPoolCreated() throws Exception { + byte[] TABLE = Bytes.toBytes("testMetaLookupThreadPoolCreated"); + byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") }; + if (TEST_UTIL.getHBaseAdmin().tableExists(TABLE)) { + TEST_UTIL.getHBaseAdmin().disableTable(TABLE); + TEST_UTIL.getHBaseAdmin().deleteTable(TABLE); + } + Table htable = TEST_UTIL.createTable(TABLE, FAMILIES, TEST_UTIL.getConfiguration()); + byte[] row = "test".getBytes(); + HConnectionImplementation c = ((HConnectionImplementation)((HTable)htable).connection); + // check that metalookup pool would get created + c.relocateRegion(TABLE, row); + ExecutorService ex = c.getCurrentMetaLookupPool(); + assert(ex != null); + } + @Test public void testChangingReplicaCount() throws Exception { // tests changing the replica count across master restarts