From 77ca737375ab08c10f3416eea284fbd04f21a9b3 Mon Sep 17 00:00:00 2001 From: nkeywal Date: Fri, 27 Sep 2013 08:22:56 +0000 Subject: [PATCH] HBASE-9647 Add a test in TestAsyncProcess to check the number of threads created git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1526828 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/client/TestAsyncProcess.java | 100 ++++++++++++++++-- 1 file changed, 93 insertions(+), 7 deletions(-) diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index d6984dcc1f0..9c2b7075243 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.junit.Assert; import org.junit.Test; @@ -39,8 +40,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -56,6 +59,7 @@ public class TestAsyncProcess { private static final Configuration conf = new Configuration(); private static ServerName sn = new ServerName("localhost:10,1254"); + private static ServerName sn2 = new ServerName("localhost:140,12540"); private static HRegionInfo hri1 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2); private static HRegionInfo hri2 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW); @@ -66,15 +70,38 @@ public class TestAsyncProcess { private static Exception failure = new Exception("failure"); static class MyAsyncProcess extends AsyncProcess { + final AtomicInteger nbMultiResponse = new AtomicInteger(); + final AtomicInteger nbActions = new AtomicInteger(); + + static class CountingThreadFactory implements ThreadFactory { + final AtomicInteger nbThreads; + ThreadFactory realFactory = Threads.newDaemonThreadFactory("test-TestAsyncProcess"); + @Override + public Thread newThread(Runnable r) { + nbThreads.incrementAndGet(); + return realFactory.newThread(r); + } + + CountingThreadFactory(AtomicInteger nbThreads){ + this.nbThreads = nbThreads; + } + } + public MyAsyncProcess(HConnection hc, AsyncProcessCallback callback, Configuration conf) { - super(hc, DUMMY_TABLE, new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, - new SynchronousQueue(), Threads.newDaemonThreadFactory("test-TestAsyncProcess")), + this(hc, callback, conf, new AtomicInteger()); + } + + public MyAsyncProcess(HConnection hc, AsyncProcessCallback callback, Configuration conf, + AtomicInteger nbThreads) { + super(hc, DUMMY_TABLE, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, + new SynchronousQueue(), new CountingThreadFactory(nbThreads)), callback, conf, new RpcRetryingCallerFactory(conf)); } @Override protected RpcRetryingCaller createCaller(MultiServerCallable callable) { - final MultiResponse mr = createMultiResponse(callable.getLocation(), callable.getMulti()); + final MultiResponse mr = createMultiResponse(callable.getLocation(), callable.getMulti(), + nbMultiResponse, nbActions); return new RpcRetryingCaller(conf) { @Override public MultiResponse callWithoutRetries( RetryingCallable callable) @@ -86,10 +113,12 @@ public class TestAsyncProcess { } static MultiResponse createMultiResponse(final HRegionLocation loc, - final MultiAction multi) { + final MultiAction multi, AtomicInteger nbMultiResponse, AtomicInteger nbActions) { final MultiResponse mr = new MultiResponse(); + nbMultiResponse.incrementAndGet(); for (Map.Entry>> entry : multi.actions.entrySet()) { for (Action a : entry.getValue()) { + nbActions.incrementAndGet(); if (Arrays.equals(FAILS, a.getAction().getRow())) { mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), failure); } else { @@ -99,12 +128,12 @@ public class TestAsyncProcess { } return mr; } - /** * Returns our async process. */ static class MyConnectionImpl extends HConnectionManager.HConnectionImplementation { MyAsyncProcess ap; + final AtomicInteger nbThreads = new AtomicInteger(0); final static Configuration c = new Configuration(); static { @@ -123,8 +152,8 @@ public class TestAsyncProcess { protected AsyncProcess createAsyncProcess(TableName tableName, ExecutorService pool, AsyncProcess.AsyncProcessCallback callback, - Configuration conf) { - ap = new MyAsyncProcess(this, callback, conf); + Configuration confn ) { + ap = new MyAsyncProcess(this, callback, c, nbThreads); return ap; } @@ -133,7 +162,29 @@ public class TestAsyncProcess { final byte[] row) { return loc1; } + } + /** + * Returns our async process. + */ + static class MyConnectionImpl2 extends MyConnectionImpl { + List hrl; + boolean usedRegions[]; + + protected MyConnectionImpl2(List hrl) { + super(c); + this.hrl = hrl; + this.usedRegions = new boolean[hrl.size()]; + } + + @Override + public HRegionLocation locateRegion(final TableName tableName, + final byte[] row) { + Random rd = new Random(Bytes.toLong(row)); + int pos = rd.nextInt(hrl.size()); + usedRegions[pos] = true; + return hrl.get(pos); + } } @Test @@ -613,6 +664,41 @@ public class TestAsyncProcess { Assert.assertTrue((System.currentTimeMillis() - start) < 10000); } + /** + * This test simulates multiple regions on 2 servers. We should have 2 multi requests and + * 2 threads: 1 per server, this whatever the number of regions. + */ + @Test + public void testThreadCreation() throws Exception { + final int NB_REGS = 10000; + List hrls = new ArrayList(NB_REGS); + List gets = new ArrayList(NB_REGS); + for (int i = 0; i < NB_REGS; i++) { + HRegionInfo hri = new HRegionInfo( + DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L)); + HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2); + hrls.add(hrl); + + Get get = new Get(Bytes.toBytes(i * 10L + 5L)); + gets.add(get); + } + + HTable ht = new HTable(); + MyConnectionImpl2 con = new MyConnectionImpl2(hrls); + ht.connection = con; + ht.batch((List) gets); + + Assert.assertEquals(con.ap.nbActions.get(), NB_REGS); + Assert.assertEquals(con.ap.nbMultiResponse.get(), 2); // 1 multi response per server + Assert.assertEquals(con.nbThreads.get(), 2); // 1 thread per server + + int nbReg = 0; + for (int i =0; i