HBASE-22964 Fix flaky TestClusterRestartFailover and TestClusterRestartFailoverSplitWithoutZk (#574)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
78bae9e3d0
commit
d8e5c87cf8
|
@ -1239,6 +1239,13 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
|
||||||
|
|
||||||
public void restartHBaseCluster(int servers, List<Integer> ports)
|
public void restartHBaseCluster(int servers, List<Integer> ports)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
|
StartMiniClusterOption option =
|
||||||
|
StartMiniClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
|
||||||
|
restartHBaseCluster(option);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void restartHBaseCluster(StartMiniClusterOption option)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
if (hbaseAdmin != null) {
|
if (hbaseAdmin != null) {
|
||||||
hbaseAdmin.close();
|
hbaseAdmin.close();
|
||||||
hbaseAdmin = null;
|
hbaseAdmin = null;
|
||||||
|
@ -1247,7 +1254,9 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
|
||||||
this.asyncConnection.close();
|
this.asyncConnection.close();
|
||||||
this.asyncConnection = null;
|
this.asyncConnection = null;
|
||||||
}
|
}
|
||||||
this.hbaseCluster = new MiniHBaseCluster(this.conf, 1, servers, ports, null, null);
|
this.hbaseCluster =
|
||||||
|
new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumRegionServers(),
|
||||||
|
option.getRsPorts(), option.getMasterClass(), option.getRsClass());
|
||||||
// Don't leave here till we've done a successful scan of the hbase:meta
|
// Don't leave here till we've done a successful scan of the hbase:meta
|
||||||
Connection conn = ConnectionFactory.createConnection(this.conf);
|
Connection conn = ConnectionFactory.createConnection(this.conf);
|
||||||
Table t = conn.getTable(TableName.META_TABLE_NAME);
|
Table t = conn.getTable(TableName.META_TABLE_NAME);
|
||||||
|
|
|
@ -17,19 +17,32 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.master;
|
package org.apache.hadoop.hbase.master;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||||
import org.apache.hadoop.hbase.master.assignment.ServerState;
|
import org.apache.hadoop.hbase.master.assignment.ServerState;
|
||||||
import org.apache.hadoop.hbase.master.assignment.ServerStateNode;
|
import org.apache.hadoop.hbase.master.assignment.ServerStateNode;
|
||||||
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
|
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
import org.junit.Assert;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
@ -45,6 +58,9 @@ public class TestClusterRestartFailover extends AbstractTestRestartCluster {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestClusterRestartFailover.class);
|
private static final Logger LOG = LoggerFactory.getLogger(TestClusterRestartFailover.class);
|
||||||
|
|
||||||
|
private static CountDownLatch SCP_LATCH;
|
||||||
|
private static ServerName SERVER_FOR_TEST;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean splitWALCoordinatedByZk() {
|
protected boolean splitWALCoordinatedByZk() {
|
||||||
return true;
|
return true;
|
||||||
|
@ -55,60 +71,111 @@ public class TestClusterRestartFailover extends AbstractTestRestartCluster {
|
||||||
.getServerNode(serverName);
|
.getServerNode(serverName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for HBASE-22964
|
||||||
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void test() throws Exception {
|
public void test() throws Exception {
|
||||||
UTIL.startMiniCluster(3);
|
setupCluster();
|
||||||
UTIL.waitFor(60000, () -> UTIL.getMiniHBaseCluster().getMaster().isInitialized());
|
setupTable();
|
||||||
// wait for all SCPs finished
|
|
||||||
UTIL.waitFor(60000, () -> UTIL.getHBaseCluster().getMaster().getProcedures().stream()
|
SERVER_FOR_TEST = UTIL.getHBaseCluster().getRegionServer(0).getServerName();
|
||||||
.noneMatch(p -> p instanceof ServerCrashProcedure));
|
UTIL.waitFor(60000, () -> getServerStateNode(SERVER_FOR_TEST) != null);
|
||||||
TableName tableName = TABLES[0];
|
ServerStateNode serverNode = getServerStateNode(SERVER_FOR_TEST);
|
||||||
ServerName testServer = UTIL.getHBaseCluster().getRegionServer(0).getServerName();
|
assertNotNull(serverNode);
|
||||||
UTIL.waitFor(30000, () -> getServerStateNode(testServer) != null);
|
assertTrue("serverNode should be ONLINE when cluster runs normally",
|
||||||
ServerStateNode serverNode = getServerStateNode(testServer);
|
|
||||||
Assert.assertNotNull(serverNode);
|
|
||||||
Assert.assertTrue("serverNode should be ONLINE when cluster runs normally",
|
|
||||||
serverNode.isInState(ServerState.ONLINE));
|
serverNode.isInState(ServerState.ONLINE));
|
||||||
UTIL.createMultiRegionTable(tableName, FAMILY);
|
|
||||||
UTIL.waitTableEnabled(tableName);
|
SCP_LATCH = new CountDownLatch(1);
|
||||||
Table table = UTIL.getConnection().getTable(tableName);
|
|
||||||
for (int i = 0; i < 100; i++) {
|
// Shutdown cluster and restart
|
||||||
UTIL.loadTable(table, FAMILY);
|
|
||||||
}
|
|
||||||
List<Integer> ports =
|
List<Integer> ports =
|
||||||
UTIL.getHBaseCluster().getMaster().getServerManager().getOnlineServersList().stream()
|
UTIL.getHBaseCluster().getMaster().getServerManager().getOnlineServersList().stream()
|
||||||
.map(serverName -> serverName.getPort()).collect(Collectors.toList());
|
.map(serverName -> serverName.getPort()).collect(Collectors.toList());
|
||||||
LOG.info("Shutting down cluster");
|
LOG.info("Shutting down cluster");
|
||||||
UTIL.getHBaseCluster().killAll();
|
UTIL.getHBaseCluster().killAll();
|
||||||
UTIL.getHBaseCluster().waitUntilShutDown();
|
UTIL.getHBaseCluster().waitUntilShutDown();
|
||||||
LOG.info("Starting cluster the second time");
|
LOG.info("Restarting cluster");
|
||||||
UTIL.restartHBaseCluster(3, ports);
|
UTIL.restartHBaseCluster(StartMiniClusterOption.builder().masterClass(HMasterForTest.class)
|
||||||
UTIL.waitFor(30000, () -> UTIL.getHBaseCluster().getMaster().isInitialized());
|
.numMasters(1).numRegionServers(3).rsPorts(ports).build());
|
||||||
serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
|
UTIL.waitFor(60000, () -> UTIL.getHBaseCluster().getMaster().isInitialized());
|
||||||
.getServerNode(testServer);
|
|
||||||
Assert.assertNotNull("serverNode should not be null when restart whole cluster", serverNode);
|
UTIL.waitFor(60000, () -> getServerStateNode(SERVER_FOR_TEST) != null);
|
||||||
Assert.assertFalse(serverNode.isInState(ServerState.ONLINE));
|
serverNode = getServerStateNode(SERVER_FOR_TEST);
|
||||||
LOG.info("start to find the procedure of SCP for the severName we choose");
|
assertFalse("serverNode should not be ONLINE during SCP processing",
|
||||||
UTIL.waitFor(60000,
|
|
||||||
() -> UTIL.getHBaseCluster().getMaster().getProcedures().stream()
|
|
||||||
.anyMatch(procedure -> (procedure instanceof ServerCrashProcedure) &&
|
|
||||||
((ServerCrashProcedure) procedure).getServerName().equals(testServer)));
|
|
||||||
Assert.assertFalse("serverNode should not be ONLINE during SCP processing",
|
|
||||||
serverNode.isInState(ServerState.ONLINE));
|
serverNode.isInState(ServerState.ONLINE));
|
||||||
LOG.info("start to submit the SCP for the same serverName {} which should fail", testServer);
|
Optional<Procedure<?>> procedure = UTIL.getHBaseCluster().getMaster().getProcedures().stream()
|
||||||
Assert
|
|
||||||
.assertFalse(UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(testServer));
|
|
||||||
Procedure<?> procedure = UTIL.getHBaseCluster().getMaster().getProcedures().stream()
|
|
||||||
.filter(p -> (p instanceof ServerCrashProcedure) &&
|
.filter(p -> (p instanceof ServerCrashProcedure) &&
|
||||||
((ServerCrashProcedure) p).getServerName().equals(testServer))
|
((ServerCrashProcedure) p).getServerName().equals(SERVER_FOR_TEST)).findAny();
|
||||||
.findAny().get();
|
assertTrue("Should have one SCP for " + SERVER_FOR_TEST, procedure.isPresent());
|
||||||
UTIL.waitFor(60000, () -> procedure.isFinished());
|
assertFalse("Submit the SCP for the same serverName " + SERVER_FOR_TEST + " which should fail",
|
||||||
LOG.info("even when the SCP is finished, the duplicate SCP should not be scheduled for {}",
|
UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST));
|
||||||
testServer);
|
|
||||||
Assert
|
// Wait the SCP to finish
|
||||||
.assertFalse(UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(testServer));
|
SCP_LATCH.countDown();
|
||||||
|
UTIL.waitFor(60000, () -> procedure.get().isFinished());
|
||||||
|
|
||||||
|
assertFalse("Even when the SCP is finished, the duplicate SCP should not be scheduled for " +
|
||||||
|
SERVER_FOR_TEST,
|
||||||
|
UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST));
|
||||||
serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
|
serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
|
||||||
.getServerNode(testServer);
|
.getServerNode(SERVER_FOR_TEST);
|
||||||
Assert.assertNull("serverNode should be deleted after SCP finished", serverNode);
|
assertNull("serverNode should be deleted after SCP finished", serverNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setupCluster() throws Exception {
|
||||||
|
UTIL.startMiniCluster(
|
||||||
|
StartMiniClusterOption.builder().masterClass(HMasterForTest.class).numMasters(1)
|
||||||
|
.numRegionServers(3).build());
|
||||||
|
UTIL.waitFor(60000, () -> UTIL.getMiniHBaseCluster().getMaster().isInitialized());
|
||||||
|
// wait for all SCPs finished
|
||||||
|
UTIL.waitFor(60000, () -> UTIL.getHBaseCluster().getMaster().getProcedures().stream()
|
||||||
|
.noneMatch(p -> p instanceof ServerCrashProcedure));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setupTable() throws Exception {
|
||||||
|
TableName tableName = TABLES[0];
|
||||||
|
UTIL.createMultiRegionTable(tableName, FAMILY);
|
||||||
|
UTIL.waitTableAvailable(tableName);
|
||||||
|
Table table = UTIL.getConnection().getTable(tableName);
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
UTIL.loadTable(table, FAMILY);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class HMasterForTest extends HMaster {
|
||||||
|
|
||||||
|
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
|
||||||
|
super(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AssignmentManager createAssignmentManager(MasterServices master) {
|
||||||
|
return new AssignmentManagerForTest(master);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class AssignmentManagerForTest extends AssignmentManager {
|
||||||
|
|
||||||
|
public AssignmentManagerForTest(MasterServices master) {
|
||||||
|
super(master);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
|
||||||
|
List<RegionInfo> regions = super.getRegionsOnServer(serverName);
|
||||||
|
// ServerCrashProcedure will call this method, so wait the CountDownLatch here
|
||||||
|
if (SCP_LATCH != null && SERVER_FOR_TEST != null && serverName.equals(SERVER_FOR_TEST)) {
|
||||||
|
try {
|
||||||
|
LOG.info("ServerCrashProcedure wait the CountDownLatch here");
|
||||||
|
SCP_LATCH.await();
|
||||||
|
LOG.info("Continue the ServerCrashProcedure");
|
||||||
|
SCP_LATCH = null;
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return regions;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue