diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 14ab6c903bf..8e446403f22 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -27,11 +27,14 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadPoolExecutor; - +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; @@ -39,7 +42,13 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.codec.KeyValueCodec; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.MasterObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.master.LoadBalancer; +import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.testclassification.FlakeyTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -80,11 +89,14 @@ public class TestMultiParallel { KeyValueCodec.class.getCanonicalName()); UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true); UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true); + UTIL.getConfiguration() + .set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MyMasterObserver.class.getName()); UTIL.startMiniCluster(slaves); Table t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY)); UTIL.waitTableEnabled(TEST_TABLE); t.close(); CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration()); + assertTrue(MyMasterObserver.start.get()); } @AfterClass public static void afterClass() throws Exception { @@ -93,13 +105,22 @@ public class TestMultiParallel { } @Before public void before() throws Exception { + final int balanceCount = MyMasterObserver.postBalanceCount.get(); LOG.info("before"); if (UTIL.ensureSomeRegionServersAvailable(slaves)) { // Distribute regions UTIL.getMiniHBaseCluster().getMaster().balance(); + // Some plans are created. + if (MyMasterObserver.postBalanceCount.get() > balanceCount) { + // It is necessary to wait the move procedure to start. + // Otherwise, the next wait may pass immediately. + UTIL.waitFor(3 * 1000, 100, false, () -> + UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().hasRegionsInTransition() + ); + } // Wait until completing balance - UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition()); + UTIL.waitUntilAllRegionsAssigned(TEST_TABLE); } LOG.info("before done"); } @@ -778,4 +799,27 @@ public class TestMultiParallel { validateEmpty(result); } } + + public static class MyMasterObserver implements MasterObserver, MasterCoprocessor { + private static final AtomicInteger postBalanceCount = new AtomicInteger(0); + private static final AtomicBoolean start = new AtomicBoolean(false); + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + start.set(true); + } + + @Override + public Optional getMasterObserver() { + return Optional.of(this); + } + + @Override + public void postBalance(final ObserverContext ctx, + List plans) throws IOException { + if (!plans.isEmpty()) { + postBalanceCount.incrementAndGet(); + } + } + } }