HBASE-21775 ADDENDUM - fix TestAsyncProcess

This commit is contained in:
Tommy Li 2019-01-30 13:19:07 -08:00 committed by stack
parent 5ddda1a1f6
commit 513ba9ac59
1 changed files with 27 additions and 19 deletions

View File

@ -69,7 +69,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.Before;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -91,9 +91,8 @@ public class TestAsyncProcess {
private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2"); private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2");
private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3"); private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3");
private static final byte[] FAILS = Bytes.toBytes("FAILS"); private static final byte[] FAILS = Bytes.toBytes("FAILS");
private static final Configuration CONF = new Configuration(); private Configuration CONF;
private static final ConnectionConfiguration CONNECTION_CONFIG = private ConnectionConfiguration CONNECTION_CONFIG;
new ConnectionConfiguration(CONF);
private static final ServerName sn = ServerName.valueOf("s1,1,1"); private static final ServerName sn = ServerName.valueOf("s1,1,1");
private static final ServerName sn2 = ServerName.valueOf("s2,2,2"); private static final ServerName sn2 = ServerName.valueOf("s2,2,2");
private static final ServerName sn3 = ServerName.valueOf("s3,3,3"); private static final ServerName sn3 = ServerName.valueOf("s3,3,3");
@ -123,13 +122,18 @@ public class TestAsyncProcess {
private static final int NB_RETRIES = 3; private static final int NB_RETRIES = 3;
private static final int RPC_TIMEOUT = CONF.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, private int RPC_TIMEOUT;
HConstants.DEFAULT_HBASE_RPC_TIMEOUT); private int OPERATION_TIMEOUT;
private static final int OPERATION_TIMEOUT = CONF.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); @Before
@BeforeClass public void beforeEach() {
public static void beforeClass(){ this.CONF = new Configuration();
CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES); CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES);
this.CONNECTION_CONFIG = new ConnectionConfiguration(CONF);
this.RPC_TIMEOUT = CONF.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.OPERATION_TIMEOUT = CONF.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
} }
static class CountingThreadFactory implements ThreadFactory { static class CountingThreadFactory implements ThreadFactory {
@ -151,6 +155,7 @@ public class TestAsyncProcess {
final AtomicInteger nbActions = new AtomicInteger(); final AtomicInteger nbActions = new AtomicInteger();
public List<AsyncRequestFuture> allReqs = new ArrayList<>(); public List<AsyncRequestFuture> allReqs = new ArrayList<>();
public AtomicInteger callsCt = new AtomicInteger(); public AtomicInteger callsCt = new AtomicInteger();
private Configuration conf;
private long previousTimeout = -1; private long previousTimeout = -1;
final ExecutorService service; final ExecutorService service;
@ -174,6 +179,7 @@ public class TestAsyncProcess {
super(hc, conf, super(hc, conf,
new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
service = Executors.newFixedThreadPool(5); service = Executors.newFixedThreadPool(5);
this.conf = conf;
} }
public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
@ -191,8 +197,10 @@ public class TestAsyncProcess {
.setRowAccess(rows) .setRowAccess(rows)
.setSubmittedRows(atLeastOne ? SubmittedRows.AT_LEAST_ONE : SubmittedRows.NORMAL) .setSubmittedRows(atLeastOne ? SubmittedRows.AT_LEAST_ONE : SubmittedRows.NORMAL)
.setNeedResults(needResults) .setNeedResults(needResults)
.setRpcTimeout(RPC_TIMEOUT) .setRpcTimeout(conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
.setOperationTimeout(OPERATION_TIMEOUT) HConstants.DEFAULT_HBASE_RPC_TIMEOUT))
.setOperationTimeout(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT))
.build(); .build();
return submit(task); return submit(task);
} }
@ -502,8 +510,8 @@ public class TestAsyncProcess {
List<HRegionLocation> hrl; List<HRegionLocation> hrl;
final boolean usedRegions[]; final boolean usedRegions[];
protected MyConnectionImpl2(List<HRegionLocation> hrl) throws IOException { protected MyConnectionImpl2(List<HRegionLocation> hrl, Configuration conf) throws IOException {
super(CONF); super(conf);
this.hrl = hrl; this.hrl = hrl;
this.usedRegions = new boolean[hrl.size()]; this.usedRegions = new boolean[hrl.size()];
} }
@ -1030,7 +1038,7 @@ public class TestAsyncProcess {
} }
} }
private static ClusterConnection createHConnection() throws IOException { private ClusterConnection createHConnection() throws IOException {
ClusterConnection hc = createHConnectionCommon(); ClusterConnection hc = createHConnectionCommon();
setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1)); setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1));
setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2)); setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2));
@ -1041,7 +1049,7 @@ public class TestAsyncProcess {
return hc; return hc;
} }
private static ClusterConnection createHConnectionWithReplicas() throws IOException { private ClusterConnection createHConnectionWithReplicas() throws IOException {
ClusterConnection hc = createHConnectionCommon(); ClusterConnection hc = createHConnectionCommon();
setMockLocation(hc, DUMMY_BYTES_1, hrls1); setMockLocation(hc, DUMMY_BYTES_1, hrls1);
setMockLocation(hc, DUMMY_BYTES_2, hrls2); setMockLocation(hc, DUMMY_BYTES_2, hrls2);
@ -1069,7 +1077,7 @@ public class TestAsyncProcess {
Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(result); Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(result);
} }
private static ClusterConnection createHConnectionCommon() { private ClusterConnection createHConnectionCommon() {
ClusterConnection hc = Mockito.mock(ClusterConnection.class); ClusterConnection hc = Mockito.mock(ClusterConnection.class);
NonceGenerator ng = Mockito.mock(NonceGenerator.class); NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
@ -1424,7 +1432,7 @@ public class TestAsyncProcess {
gets.add(get); gets.add(get);
} }
MyConnectionImpl2 con = new MyConnectionImpl2(hrls); MyConnectionImpl2 con = new MyConnectionImpl2(hrls, CONF);
MyAsyncProcess ap = new MyAsyncProcess(con, CONF, con.nbThreads); MyAsyncProcess ap = new MyAsyncProcess(con, CONF, con.nbThreads);
HTable ht = (HTable) con.getTable(DUMMY_TABLE, ap.service); HTable ht = (HTable) con.getTable(DUMMY_TABLE, ap.service);
ht.multiAp = ap; ht.multiAp = ap;
@ -1611,7 +1619,7 @@ public class TestAsyncProcess {
return ap; return ap;
} }
private static BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap, private BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap,
TableName name) { TableName name) {
return new BufferedMutatorParams(name) return new BufferedMutatorParams(name)
.pool(ap.service) .pool(ap.service)