HBASE-19671 Fix TestMultiParallel#testActiveThreadsCount
This commit is contained in:
parent
6708d54478
commit
6a0e6fefd3
|
@ -27,11 +27,14 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
|
@ -39,7 +42,13 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.codec.KeyValueCodec;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.master.LoadBalancer;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -80,11 +89,14 @@ public class TestMultiParallel {
|
|||
KeyValueCodec.class.getCanonicalName());
|
||||
UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true);
|
||||
UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true);
|
||||
UTIL.getConfiguration()
|
||||
.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MyMasterObserver.class.getName());
|
||||
UTIL.startMiniCluster(slaves);
|
||||
Table t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY));
|
||||
UTIL.waitTableEnabled(TEST_TABLE);
|
||||
t.close();
|
||||
CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration());
|
||||
assertTrue(MyMasterObserver.start.get());
|
||||
}
|
||||
|
||||
@AfterClass public static void afterClass() throws Exception {
|
||||
|
@ -93,13 +105,22 @@ public class TestMultiParallel {
|
|||
}
|
||||
|
||||
@Before public void before() throws Exception {
|
||||
final int balanceCount = MyMasterObserver.postBalanceCount.get();
|
||||
LOG.info("before");
|
||||
if (UTIL.ensureSomeRegionServersAvailable(slaves)) {
|
||||
// Distribute regions
|
||||
UTIL.getMiniHBaseCluster().getMaster().balance();
|
||||
// Some plans are created.
|
||||
if (MyMasterObserver.postBalanceCount.get() > balanceCount) {
|
||||
// It is necessary to wait the move procedure to start.
|
||||
// Otherwise, the next wait may pass immediately.
|
||||
UTIL.waitFor(3 * 1000, 100, false, () ->
|
||||
UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().hasRegionsInTransition()
|
||||
);
|
||||
}
|
||||
|
||||
// Wait until completing balance
|
||||
UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
|
||||
UTIL.waitUntilAllRegionsAssigned(TEST_TABLE);
|
||||
}
|
||||
LOG.info("before done");
|
||||
}
|
||||
|
@ -778,4 +799,27 @@ public class TestMultiParallel {
|
|||
validateEmpty(result);
|
||||
}
|
||||
}
|
||||
|
||||
public static class MyMasterObserver implements MasterObserver, MasterCoprocessor {
|
||||
private static final AtomicInteger postBalanceCount = new AtomicInteger(0);
|
||||
private static final AtomicBoolean start = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
public void start(CoprocessorEnvironment env) throws IOException {
|
||||
start.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<MasterObserver> getMasterObserver() {
|
||||
return Optional.of(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postBalance(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
List<RegionPlan> plans) throws IOException {
|
||||
if (!plans.isEmpty()) {
|
||||
postBalanceCount.incrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue