HBASE-16794 TestDispatchMergingRegionsProcedure#testMergeRegionsConcurrently is flaky

Signed-off-by: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
This commit is contained in:
ChiaPing Tsai 2016-10-09 16:52:54 -07:00 committed by Matteo Bertozzi
parent e06c3676f1
commit 8a8c60889c
2 changed files with 44 additions and 6 deletions

View File

@ -723,6 +723,11 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
return compactionThroughputController;
}
@VisibleForTesting
public long getCompletedMergeTaskCount() {
return mergePool.getCompletedTaskCount();
}
@VisibleForTesting
/**
* Shutdown the long compaction thread pool.

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -36,6 +38,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.D
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -125,20 +128,21 @@ public class TestDispatchMergingRegionsProcedure {
regionsToMerge[0] = tableRegions.get(0);
regionsToMerge[1] = tableRegions.get(1);
final int initCompletedTaskCount = countOfCompletedMergeTaskCount();
long procId = procExec.submitProcedure(new DispatchMergingRegionsProcedure(
procExec.getEnvironment(), tableName, regionsToMerge, true));
ProcedureTestingUtility.waitProcedure(procExec, procId);
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
assertRegionCount(tableName, 2);
assertRegionCount(tableName, 2, 1, initCompletedTaskCount);
}
/**
* This tests two concurrent region merges
*/
@Test(timeout=90000)
@Test(timeout=60000)
public void testMergeRegionsConcurrently() throws Exception {
final TableName tableName = TableName.valueOf("testMergeTwoRegions");
final TableName tableName = TableName.valueOf("testMergeRegionsConcurrently");
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
List<HRegionInfo> tableRegions = createTable(tableName, 4);
@ -150,6 +154,7 @@ public class TestDispatchMergingRegionsProcedure {
regionsToMerge2[0] = tableRegions.get(2);
regionsToMerge2[1] = tableRegions.get(3);
final int initCompletedTaskCount = countOfCompletedMergeTaskCount();
long procId1 = procExec.submitProcedure(new DispatchMergingRegionsProcedure(
procExec.getEnvironment(), tableName, regionsToMerge1, true));
long procId2 = procExec.submitProcedure(new DispatchMergingRegionsProcedure(
@ -158,8 +163,28 @@ public class TestDispatchMergingRegionsProcedure {
ProcedureTestingUtility.waitProcedure(procExec, procId2);
ProcedureTestingUtility.assertProcNotFailed(procExec, procId1);
ProcedureTestingUtility.assertProcNotFailed(procExec, procId2);
assertRegionCount(tableName, 2, 2, initCompletedTaskCount);
}
assertRegionCount(tableName, 2);
private void waitForCompletedMergeTask(int expectedTaskCount, int initCompletedTaskCount)
throws IOException, InterruptedException {
while (true) {
long currentCompletedTaskCount = countOfCompletedMergeTaskCount() - initCompletedTaskCount;
if (currentCompletedTaskCount == expectedTaskCount) {
return;
}
LOG.info("There are " + (expectedTaskCount - currentCompletedTaskCount) +
" merge requests are not completed, wait 100 ms");
TimeUnit.MILLISECONDS.sleep(100);
}
}
private static int countOfCompletedMergeTaskCount() {
int completedTaskCount = 0;
for (RegionServerThread server : UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
completedTaskCount += server.getRegionServer().getCompactSplitThread().getCompletedMergeTaskCount();
}
return completedTaskCount;
}
@Test(timeout=60000)
@ -173,6 +198,7 @@ public class TestDispatchMergingRegionsProcedure {
regionsToMerge[0] = tableRegions.get(0);
regionsToMerge[1] = tableRegions.get(1);
final int initCompletedTaskCount = countOfCompletedMergeTaskCount();
long procId1 = procExec.submitProcedure(new DispatchMergingRegionsProcedure(
procExec.getEnvironment(), tableName, regionsToMerge, true), nonceGroup, nonce);
long procId2 = procExec.submitProcedure(new DispatchMergingRegionsProcedure(
@ -185,7 +211,7 @@ public class TestDispatchMergingRegionsProcedure {
ProcedureTestingUtility.waitProcedure(procExec, procId2);
ProcedureTestingUtility.assertProcNotFailed(procExec, procId2);
assertRegionCount(tableName, 2);
assertRegionCount(tableName, 2, 1, initCompletedTaskCount);
}
@Test(timeout=60000)
@ -202,6 +228,7 @@ public class TestDispatchMergingRegionsProcedure {
regionsToMerge[0] = tableRegions.get(0);
regionsToMerge[1] = tableRegions.get(1);
final int initCompletedTaskCount = countOfCompletedMergeTaskCount();
long procId = procExec.submitProcedure(
new DispatchMergingRegionsProcedure(
procExec.getEnvironment(), tableName, regionsToMerge, true));
@ -211,7 +238,7 @@ public class TestDispatchMergingRegionsProcedure {
MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId, numberOfSteps);
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
assertRegionCount(tableName, 2);
assertRegionCount(tableName, 2, 1, initCompletedTaskCount);
}
@Test(timeout = 60000)
@ -256,6 +283,12 @@ public class TestDispatchMergingRegionsProcedure {
return tableRegions;
}
public List<HRegionInfo> assertRegionCount(final TableName tableName, final int nregions,
int expectedTaskCount, int initCompletedTaskCount) throws Exception {
waitForCompletedMergeTask(expectedTaskCount, initCompletedTaskCount);
return assertRegionCount(tableName, nregions);
}
private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
}