HBASE-9647 Add a test in TestAsyncProcess to check the number of threads created

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1526828 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2013-09-27 08:22:56 +00:00
parent 20df90d8ee
commit 77ca737375
1 changed files with 93 additions and 7 deletions

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert;
import org.junit.Test;
@ -39,8 +40,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -56,6 +59,7 @@ public class TestAsyncProcess {
private static final Configuration conf = new Configuration();
private static ServerName sn = new ServerName("localhost:10,1254");
private static ServerName sn2 = new ServerName("localhost:140,12540");
private static HRegionInfo hri1 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2);
private static HRegionInfo hri2 =
new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW);
@ -66,15 +70,38 @@ public class TestAsyncProcess {
private static Exception failure = new Exception("failure");
static class MyAsyncProcess<Res> extends AsyncProcess<Res> {
final AtomicInteger nbMultiResponse = new AtomicInteger();
final AtomicInteger nbActions = new AtomicInteger();
static class CountingThreadFactory implements ThreadFactory {
final AtomicInteger nbThreads;
ThreadFactory realFactory = Threads.newDaemonThreadFactory("test-TestAsyncProcess");
@Override
public Thread newThread(Runnable r) {
nbThreads.incrementAndGet();
return realFactory.newThread(r);
}
CountingThreadFactory(AtomicInteger nbThreads){
this.nbThreads = nbThreads;
}
}
public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> callback, Configuration conf) {
super(hc, DUMMY_TABLE, new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("test-TestAsyncProcess")),
this(hc, callback, conf, new AtomicInteger());
}
public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> callback, Configuration conf,
AtomicInteger nbThreads) {
super(hc, DUMMY_TABLE, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
callback, conf, new RpcRetryingCallerFactory(conf));
}
@Override
protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
final MultiResponse mr = createMultiResponse(callable.getLocation(), callable.getMulti());
final MultiResponse mr = createMultiResponse(callable.getLocation(), callable.getMulti(),
nbMultiResponse, nbActions);
return new RpcRetryingCaller<MultiResponse>(conf) {
@Override
public MultiResponse callWithoutRetries( RetryingCallable<MultiResponse> callable)
@ -86,10 +113,12 @@ public class TestAsyncProcess {
}
static MultiResponse createMultiResponse(final HRegionLocation loc,
final MultiAction<Row> multi) {
final MultiAction<Row> multi, AtomicInteger nbMultiResponse, AtomicInteger nbActions) {
final MultiResponse mr = new MultiResponse();
nbMultiResponse.incrementAndGet();
for (Map.Entry<byte[], List<Action<Row>>> entry : multi.actions.entrySet()) {
for (Action a : entry.getValue()) {
nbActions.incrementAndGet();
if (Arrays.equals(FAILS, a.getAction().getRow())) {
mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), failure);
} else {
@ -99,12 +128,12 @@ public class TestAsyncProcess {
}
return mr;
}
/**
* Returns our async process.
*/
static class MyConnectionImpl extends HConnectionManager.HConnectionImplementation {
MyAsyncProcess<?> ap;
final AtomicInteger nbThreads = new AtomicInteger(0);
final static Configuration c = new Configuration();
static {
@ -123,8 +152,8 @@ public class TestAsyncProcess {
protected <R> AsyncProcess createAsyncProcess(TableName tableName,
ExecutorService pool,
AsyncProcess.AsyncProcessCallback<R> callback,
Configuration conf) {
ap = new MyAsyncProcess<R>(this, callback, conf);
Configuration confn ) {
ap = new MyAsyncProcess<R>(this, callback, c, nbThreads);
return ap;
}
@ -133,7 +162,29 @@ public class TestAsyncProcess {
final byte[] row) {
return loc1;
}
}
/**
* Returns our async process.
*/
static class MyConnectionImpl2 extends MyConnectionImpl {
List<HRegionLocation> hrl;
boolean usedRegions[];
protected MyConnectionImpl2(List<HRegionLocation> hrl) {
super(c);
this.hrl = hrl;
this.usedRegions = new boolean[hrl.size()];
}
@Override
public HRegionLocation locateRegion(final TableName tableName,
final byte[] row) {
Random rd = new Random(Bytes.toLong(row));
int pos = rd.nextInt(hrl.size());
usedRegions[pos] = true;
return hrl.get(pos);
}
}
@Test
@ -613,6 +664,41 @@ public class TestAsyncProcess {
Assert.assertTrue((System.currentTimeMillis() - start) < 10000);
}
/**
* This test simulates multiple regions on 2 servers. We should have 2 multi requests and
* 2 threads: 1 per server, this whatever the number of regions.
*/
@Test
public void testThreadCreation() throws Exception {
final int NB_REGS = 10000;
List<HRegionLocation> hrls = new ArrayList<HRegionLocation>(NB_REGS);
List<Get> gets = new ArrayList<Get>(NB_REGS);
for (int i = 0; i < NB_REGS; i++) {
HRegionInfo hri = new HRegionInfo(
DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L));
HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2);
hrls.add(hrl);
Get get = new Get(Bytes.toBytes(i * 10L + 5L));
gets.add(get);
}
HTable ht = new HTable();
MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
ht.connection = con;
ht.batch((List) gets);
Assert.assertEquals(con.ap.nbActions.get(), NB_REGS);
Assert.assertEquals(con.ap.nbMultiResponse.get(), 2); // 1 multi response per server
Assert.assertEquals(con.nbThreads.get(), 2); // 1 thread per server
int nbReg = 0;
for (int i =0; i<NB_REGS; i++){
if (con.usedRegions[i]) nbReg++;
}
Assert.assertTrue("nbReg=" + nbReg, nbReg > NB_REGS / 10);
}
/**
* @param reg1 if true, creates a put on region 1, region 2 otherwise