HBASE-17792 Use a shared thread pool for AtomicityWriter, AtomicGetReader, AtomicScanReader's connections in TestAcidGuarantees (Huaxiang Sun)

This commit is contained in:
Michael Stack 2017-03-16 15:15:28 -07:00
parent 6fb44f7eb8
commit 7c19490bac
1 changed files with 42 additions and 9 deletions

View File

@ -21,6 +21,11 @@ package org.apache.hadoop.hbase;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -43,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.testclassification.FlakeyTests; import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
@ -85,6 +91,7 @@ public class TestAcidGuarantees implements Tool {
// when run as main // when run as main
private Configuration conf; private Configuration conf;
private ExecutorService sharedPool = null;
private void createTableIfMissing(boolean useMob) private void createTableIfMissing(boolean useMob)
throws IOException { throws IOException {
@ -117,12 +124,38 @@ public class TestAcidGuarantees implements Tool {
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.9); conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.9);
} }
util = new HBaseTestingUtility(conf); util = new HBaseTestingUtility(conf);
sharedPool = createThreadPool();
} }
public void setHBaseTestingUtil(HBaseTestingUtility util) { public void setHBaseTestingUtil(HBaseTestingUtility util) {
this.util = util; this.util = util;
} }
private ExecutorService createThreadPool() {
int maxThreads = 256;
int coreThreads = 128;
long keepAliveTime = 60;
BlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<Runnable>(maxThreads *
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
coreThreads,
maxThreads,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
Threads.newDaemonThreadFactory(toString() + "-shared"));
tpe.allowCoreThreadTimeOut(true);
return tpe;
}
public ExecutorService getSharedThreadPool() {
return sharedPool;
}
/** /**
* Thread that does random full-row writes into a table. * Thread that does random full-row writes into a table.
*/ */
@ -136,11 +169,11 @@ public class TestAcidGuarantees implements Tool {
AtomicLong numWritten = new AtomicLong(); AtomicLong numWritten = new AtomicLong();
public AtomicityWriter(TestContext ctx, byte targetRows[][], public AtomicityWriter(TestContext ctx, byte targetRows[][],
byte targetFamilies[][]) throws IOException { byte targetFamilies[][], ExecutorService pool) throws IOException {
super(ctx); super(ctx);
this.targetRows = targetRows; this.targetRows = targetRows;
this.targetFamilies = targetFamilies; this.targetFamilies = targetFamilies;
connection = ConnectionFactory.createConnection(ctx.getConf()); connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
table = connection.getTable(TABLE_NAME); table = connection.getTable(TABLE_NAME);
} }
public void doAnAction() throws Exception { public void doAnAction() throws Exception {
@ -182,11 +215,11 @@ public class TestAcidGuarantees implements Tool {
AtomicLong numRead = new AtomicLong(); AtomicLong numRead = new AtomicLong();
public AtomicGetReader(TestContext ctx, byte targetRow[], public AtomicGetReader(TestContext ctx, byte targetRow[],
byte targetFamilies[][]) throws IOException { byte targetFamilies[][], ExecutorService pool) throws IOException {
super(ctx); super(ctx);
this.targetRow = targetRow; this.targetRow = targetRow;
this.targetFamilies = targetFamilies; this.targetFamilies = targetFamilies;
connection = ConnectionFactory.createConnection(ctx.getConf()); connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
table = connection.getTable(TABLE_NAME); table = connection.getTable(TABLE_NAME);
} }
@ -251,10 +284,10 @@ public class TestAcidGuarantees implements Tool {
AtomicLong numRowsScanned = new AtomicLong(); AtomicLong numRowsScanned = new AtomicLong();
public AtomicScanReader(TestContext ctx, public AtomicScanReader(TestContext ctx,
byte targetFamilies[][]) throws IOException { byte targetFamilies[][], ExecutorService pool) throws IOException {
super(ctx); super(ctx);
this.targetFamilies = targetFamilies; this.targetFamilies = targetFamilies;
connection = ConnectionFactory.createConnection(ctx.getConf()); connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
table = connection.getTable(TABLE_NAME); table = connection.getTable(TABLE_NAME);
} }
@ -344,7 +377,7 @@ public class TestAcidGuarantees implements Tool {
List<AtomicityWriter> writers = Lists.newArrayList(); List<AtomicityWriter> writers = Lists.newArrayList();
for (int i = 0; i < numWriters; i++) { for (int i = 0; i < numWriters; i++) {
AtomicityWriter writer = new AtomicityWriter( AtomicityWriter writer = new AtomicityWriter(
ctx, rows, FAMILIES); ctx, rows, FAMILIES, getSharedThreadPool());
writers.add(writer); writers.add(writer);
ctx.addThread(writer); ctx.addThread(writer);
} }
@ -372,14 +405,14 @@ public class TestAcidGuarantees implements Tool {
List<AtomicGetReader> getters = Lists.newArrayList(); List<AtomicGetReader> getters = Lists.newArrayList();
for (int i = 0; i < numGetters; i++) { for (int i = 0; i < numGetters; i++) {
AtomicGetReader getter = new AtomicGetReader( AtomicGetReader getter = new AtomicGetReader(
ctx, rows[i % numUniqueRows], FAMILIES); ctx, rows[i % numUniqueRows], FAMILIES, getSharedThreadPool());
getters.add(getter); getters.add(getter);
ctx.addThread(getter); ctx.addThread(getter);
} }
List<AtomicScanReader> scanners = Lists.newArrayList(); List<AtomicScanReader> scanners = Lists.newArrayList();
for (int i = 0; i < numScanners; i++) { for (int i = 0; i < numScanners; i++) {
AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES); AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES, getSharedThreadPool());
scanners.add(scanner); scanners.add(scanner);
ctx.addThread(scanner); ctx.addThread(scanner);
} }