HBASE-13036. Meta scanner should use its own threadpool

This commit is contained in:
Devaraj Das 2015-02-18 10:23:02 -08:00
parent 1b84101fe3
commit 14bb352b02
3 changed files with 90 additions and 34 deletions

View File

@ -579,6 +579,9 @@ final class ConnectionManager {
// thread executor shared by all HTableInterface instances created
// by this connection
private volatile ExecutorService batchPool = null;
// meta thread executor shared by all HTableInterface instances created
// by this connection
private volatile ExecutorService metaLookupPool = null;
private volatile boolean cleanupPool = false;
private final Configuration conf;
@ -765,52 +768,84 @@ final class ConnectionManager {
private ExecutorService getBatchPool() {
if (batchPool == null) {
// shared HTable thread executor not yet initialized
synchronized (this) {
if (batchPool == null) {
int maxThreads = conf.getInt("hbase.hconnection.threads.max", 256);
int coreThreads = conf.getInt("hbase.hconnection.threads.core", 256);
if (maxThreads == 0) {
maxThreads = Runtime.getRuntime().availableProcessors() * 8;
}
if (coreThreads == 0) {
coreThreads = Runtime.getRuntime().availableProcessors() * 8;
}
long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
LinkedBlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<Runnable>(maxThreads *
conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
coreThreads,
maxThreads,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
Threads.newDaemonThreadFactory(toString() + "-shared-"));
tpe.allowCoreThreadTimeOut(true);
this.batchPool = tpe;
this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
conf.getInt("hbase.hconnection.threads.core", 256), "-shared-");
this.cleanupPool = true;
}
this.cleanupPool = true;
}
}
return this.batchPool;
}
private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint) {
// shared HTable thread executor not yet initialized
if (maxThreads == 0) {
maxThreads = Runtime.getRuntime().availableProcessors() * 8;
}
if (coreThreads == 0) {
coreThreads = Runtime.getRuntime().availableProcessors() * 8;
}
long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
LinkedBlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<Runnable>(maxThreads *
conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
coreThreads,
maxThreads,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
Threads.newDaemonThreadFactory(toString() + "-shared-"));
tpe.allowCoreThreadTimeOut(true);
return tpe;
}
private ExecutorService getMetaLookupPool() {
if (this.metaLookupPool == null) {
synchronized (this) {
if (this.metaLookupPool == null) {
//The meta lookup can happen on replicas of the meta (if the appropriate configs
//are enabled).In a replicated-meta setup, the number '3' is assumed as the max
//number of replicas by default (unless it is configured to be of a higher value).
//In a non-replicated-meta setup, only one thread would be active.
this.metaLookupPool = getThreadPool(
conf.getInt("hbase.hconnection.meta.lookup.threads.max", 3),
conf.getInt("hbase.hconnection.meta.lookup.threads.max.core", 3),
"-metaLookup-shared-");
}
}
}
return this.metaLookupPool;
}
protected ExecutorService getCurrentMetaLookupPool() {
return metaLookupPool;
}
protected ExecutorService getCurrentBatchPool() {
return batchPool;
}
private void shutdownBatchPool() {
private void shutdownPools() {
if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
this.batchPool.shutdown();
try {
if (!this.batchPool.awaitTermination(10, TimeUnit.SECONDS)) {
this.batchPool.shutdownNow();
}
} catch (InterruptedException e) {
this.batchPool.shutdownNow();
shutdownBatchPool(this.batchPool);
}
if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
shutdownBatchPool(this.metaLookupPool);
}
}
private void shutdownBatchPool(ExecutorService pool) {
pool.shutdown();
try {
if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
pool.shutdownNow();
}
} catch (InterruptedException e) {
pool.shutdownNow();
}
}
@ -1206,7 +1241,7 @@ final class ConnectionManager {
ReversedClientScanner rcs = null;
try {
rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,
rpcCallerFactory, rpcControllerFactory, getBatchPool(), 0);
rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0);
regionInfoRow = rcs.next();
} finally {
if (rcs != null) {
@ -2327,7 +2362,7 @@ final class ConnectionManager {
return;
}
closeMaster();
shutdownBatchPool();
shutdownPools();
this.closed = true;
closeZooKeeperWatcher();
this.stubs.clear();

View File

@ -1438,6 +1438,7 @@ public class HTable implements HTableInterface {
terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
} while (!terminated);
} catch (InterruptedException e) {
this.pool.shutdownNow();
LOG.warn("waitForTermination interrupted");
}
}

View File

@ -19,10 +19,12 @@
package org.apache.hadoop.hbase.client;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -38,6 +40,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -220,6 +223,23 @@ public class TestMetaWithReplicas {
assertTrue(Arrays.equals(r.getRow(), row));
}
@Test
public void testMetaLookupThreadPoolCreated() throws Exception {
byte[] TABLE = Bytes.toBytes("testMetaLookupThreadPoolCreated");
byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") };
if (TEST_UTIL.getHBaseAdmin().tableExists(TABLE)) {
TEST_UTIL.getHBaseAdmin().disableTable(TABLE);
TEST_UTIL.getHBaseAdmin().deleteTable(TABLE);
}
Table htable = TEST_UTIL.createTable(TABLE, FAMILIES, TEST_UTIL.getConfiguration());
byte[] row = "test".getBytes();
HConnectionImplementation c = ((HConnectionImplementation)((HTable)htable).connection);
// check that metalookup pool would get created
c.relocateRegion(TABLE, row);
ExecutorService ex = c.getCurrentMetaLookupPool();
assert(ex != null);
}
@Test
public void testChangingReplicaCount() throws Exception {
// tests changing the replica count across master restarts