diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java index 569ca898800..15250ac35b0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java @@ -21,6 +21,11 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.util.List; import java.util.Random; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -43,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.MemStoreLAB; import org.apache.hadoop.hbase.testclassification.FlakeyTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -85,6 +91,7 @@ public class TestAcidGuarantees implements Tool { // when run as main private Configuration conf; + private ExecutorService sharedPool = null; private void createTableIfMissing(boolean useMob) throws IOException { @@ -117,12 +124,38 @@ public class TestAcidGuarantees implements Tool { conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.9); } util = new HBaseTestingUtility(conf); + sharedPool = createThreadPool(); } public void setHBaseTestingUtil(HBaseTestingUtility util) { this.util = util; } + private ExecutorService createThreadPool() { + + int maxThreads = 256; + int coreThreads = 128; + + long keepAliveTime = 60; + BlockingQueue workQueue = + new LinkedBlockingQueue(maxThreads * + HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS); + + ThreadPoolExecutor tpe = new ThreadPoolExecutor( + coreThreads, + maxThreads, + keepAliveTime, + TimeUnit.SECONDS, + workQueue, + Threads.newDaemonThreadFactory(toString() + "-shared")); + tpe.allowCoreThreadTimeOut(true); + return tpe; + } + + public ExecutorService getSharedThreadPool() { + return sharedPool; + } + /** * Thread that does random full-row writes into a table. */ @@ -136,11 +169,11 @@ public class TestAcidGuarantees implements Tool { AtomicLong numWritten = new AtomicLong(); public AtomicityWriter(TestContext ctx, byte targetRows[][], - byte targetFamilies[][]) throws IOException { + byte targetFamilies[][], ExecutorService pool) throws IOException { super(ctx); this.targetRows = targetRows; this.targetFamilies = targetFamilies; - connection = ConnectionFactory.createConnection(ctx.getConf()); + connection = ConnectionFactory.createConnection(ctx.getConf(), pool); table = connection.getTable(TABLE_NAME); } public void doAnAction() throws Exception { @@ -182,11 +215,11 @@ public class TestAcidGuarantees implements Tool { AtomicLong numRead = new AtomicLong(); public AtomicGetReader(TestContext ctx, byte targetRow[], - byte targetFamilies[][]) throws IOException { + byte targetFamilies[][], ExecutorService pool) throws IOException { super(ctx); this.targetRow = targetRow; this.targetFamilies = targetFamilies; - connection = ConnectionFactory.createConnection(ctx.getConf()); + connection = ConnectionFactory.createConnection(ctx.getConf(), pool); table = connection.getTable(TABLE_NAME); } @@ -251,10 +284,10 @@ public class TestAcidGuarantees implements Tool { AtomicLong numRowsScanned = new AtomicLong(); public AtomicScanReader(TestContext ctx, - byte targetFamilies[][]) throws IOException { + byte targetFamilies[][], ExecutorService pool) throws IOException { super(ctx); this.targetFamilies = targetFamilies; - connection = ConnectionFactory.createConnection(ctx.getConf()); + connection = ConnectionFactory.createConnection(ctx.getConf(), pool); table = connection.getTable(TABLE_NAME); } @@ -344,7 +377,7 @@ public class TestAcidGuarantees implements Tool { List writers = Lists.newArrayList(); for (int i = 0; i < numWriters; i++) { AtomicityWriter writer = new AtomicityWriter( - ctx, rows, FAMILIES); + ctx, rows, FAMILIES, getSharedThreadPool()); writers.add(writer); ctx.addThread(writer); } @@ -372,14 +405,14 @@ public class TestAcidGuarantees implements Tool { List getters = Lists.newArrayList(); for (int i = 0; i < numGetters; i++) { AtomicGetReader getter = new AtomicGetReader( - ctx, rows[i % numUniqueRows], FAMILIES); + ctx, rows[i % numUniqueRows], FAMILIES, getSharedThreadPool()); getters.add(getter); ctx.addThread(getter); } List scanners = Lists.newArrayList(); for (int i = 0; i < numScanners; i++) { - AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES); + AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES, getSharedThreadPool()); scanners.add(scanner); ctx.addThread(scanner); }