HBASE-13036. Meta scanner should use its own threadpool

This commit is contained in:
Devaraj Das 2015-02-18 10:23:02 -08:00
parent eb1f46f2b8
commit e8a34fb384
3 changed files with 90 additions and 34 deletions

View File

@ -564,6 +564,9 @@ class ConnectionManager {
// thread executor shared by all HTableInterface instances created // thread executor shared by all HTableInterface instances created
// by this connection // by this connection
private volatile ExecutorService batchPool = null; private volatile ExecutorService batchPool = null;
// meta thread executor shared by all HTableInterface instances created
// by this connection
private volatile ExecutorService metaLookupPool = null;
private volatile boolean cleanupPool = false; private volatile boolean cleanupPool = false;
private final Configuration conf; private final Configuration conf;
@ -750,52 +753,84 @@ class ConnectionManager {
private ExecutorService getBatchPool() { private ExecutorService getBatchPool() {
if (batchPool == null) { if (batchPool == null) {
// shared HTable thread executor not yet initialized
synchronized (this) { synchronized (this) {
if (batchPool == null) { if (batchPool == null) {
int maxThreads = conf.getInt("hbase.hconnection.threads.max", 256); this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
int coreThreads = conf.getInt("hbase.hconnection.threads.core", 256); conf.getInt("hbase.hconnection.threads.core", 256), "-shared-");
if (maxThreads == 0) { this.cleanupPool = true;
maxThreads = Runtime.getRuntime().availableProcessors() * 8;
}
if (coreThreads == 0) {
coreThreads = Runtime.getRuntime().availableProcessors() * 8;
}
long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
LinkedBlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<Runnable>(maxThreads *
conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
coreThreads,
maxThreads,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
Threads.newDaemonThreadFactory(toString() + "-shared-"));
tpe.allowCoreThreadTimeOut(true);
this.batchPool = tpe;
} }
this.cleanupPool = true;
} }
} }
return this.batchPool; return this.batchPool;
} }
private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint) {
// shared HTable thread executor not yet initialized
if (maxThreads == 0) {
maxThreads = Runtime.getRuntime().availableProcessors() * 8;
}
if (coreThreads == 0) {
coreThreads = Runtime.getRuntime().availableProcessors() * 8;
}
long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
LinkedBlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<Runnable>(maxThreads *
conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
coreThreads,
maxThreads,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
Threads.newDaemonThreadFactory(toString() + "-shared-"));
tpe.allowCoreThreadTimeOut(true);
return tpe;
}
private ExecutorService getMetaLookupPool() {
if (this.metaLookupPool == null) {
synchronized (this) {
if (this.metaLookupPool == null) {
//The meta lookup can happen on replicas of the meta (if the appropriate configs
//are enabled).In a replicated-meta setup, the number '3' is assumed as the max
//number of replicas by default (unless it is configured to be of a higher value).
//In a non-replicated-meta setup, only one thread would be active.
this.metaLookupPool = getThreadPool(
conf.getInt("hbase.hconnection.meta.lookup.threads.max", 3),
conf.getInt("hbase.hconnection.meta.lookup.threads.max.core", 3),
"-metaLookup-shared-");
}
}
}
return this.metaLookupPool;
}
protected ExecutorService getCurrentMetaLookupPool() {
return metaLookupPool;
}
protected ExecutorService getCurrentBatchPool() { protected ExecutorService getCurrentBatchPool() {
return batchPool; return batchPool;
} }
private void shutdownBatchPool() { private void shutdownPools() {
if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) { if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
this.batchPool.shutdown(); shutdownBatchPool(this.batchPool);
try { }
if (!this.batchPool.awaitTermination(10, TimeUnit.SECONDS)) { if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
this.batchPool.shutdownNow(); shutdownBatchPool(this.metaLookupPool);
} }
} catch (InterruptedException e) { }
this.batchPool.shutdownNow();
private void shutdownBatchPool(ExecutorService pool) {
pool.shutdown();
try {
if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
pool.shutdownNow();
} }
} catch (InterruptedException e) {
pool.shutdownNow();
} }
} }
@ -1191,7 +1226,7 @@ class ConnectionManager {
ReversedClientScanner rcs = null; ReversedClientScanner rcs = null;
try { try {
rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this, rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,
rpcCallerFactory, rpcControllerFactory, getBatchPool(), 0); rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0);
regionInfoRow = rcs.next(); regionInfoRow = rcs.next();
} finally { } finally {
if (rcs != null) { if (rcs != null) {
@ -2277,7 +2312,7 @@ class ConnectionManager {
return; return;
} }
closeMaster(); closeMaster();
shutdownBatchPool(); shutdownPools();
this.closed = true; this.closed = true;
closeZooKeeperWatcher(); closeZooKeeperWatcher();
this.stubs.clear(); this.stubs.clear();

View File

@ -1447,6 +1447,7 @@ public class HTable implements HTableInterface {
terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
} while (!terminated); } while (!terminated);
} catch (InterruptedException e) { } catch (InterruptedException e) {
this.pool.shutdownNow();
LOG.warn("waitForTermination interrupted"); LOG.warn("waitForTermination interrupted");
} }
} }

View File

@ -19,10 +19,12 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -38,6 +40,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -221,6 +224,23 @@ public class TestMetaWithReplicas {
assertTrue(Arrays.equals(r.getRow(), row)); assertTrue(Arrays.equals(r.getRow(), row));
} }
@Test
public void testMetaLookupThreadPoolCreated() throws Exception {
byte[] TABLE = Bytes.toBytes("testMetaLookupThreadPoolCreated");
byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") };
if (TEST_UTIL.getHBaseAdmin().tableExists(TABLE)) {
TEST_UTIL.getHBaseAdmin().disableTable(TABLE);
TEST_UTIL.getHBaseAdmin().deleteTable(TABLE);
}
Table htable = TEST_UTIL.createTable(TABLE, FAMILIES, TEST_UTIL.getConfiguration());
byte[] row = "test".getBytes();
HConnectionImplementation c = ((HConnectionImplementation)((HTable)htable).connection);
// check that metalookup pool would get created
c.relocateRegion(TABLE, row);
ExecutorService ex = c.getCurrentMetaLookupPool();
assert(ex != null);
}
@Test @Test
public void testChangingReplicaCount() throws Exception { public void testChangingReplicaCount() throws Exception {
// tests changing the replica count across master restarts // tests changing the replica count across master restarts