HBASE-21565 Delete dead server from dead server list too early leads to concurrent Server Crash Procedures(SCP) for a same server
This commit is contained in:
parent
d2832c1708
commit
83e12153d1
|
@ -128,6 +128,9 @@ public class RegionServerTracker extends ZKListener {
|
||||||
// '-SPLITTING'. Each splitting server should have a corresponding SCP. Log if not.
|
// '-SPLITTING'. Each splitting server should have a corresponding SCP. Log if not.
|
||||||
splittingServersFromWALDir.stream().filter(s -> !deadServersFromPE.contains(s)).
|
splittingServersFromWALDir.stream().filter(s -> !deadServersFromPE.contains(s)).
|
||||||
forEach(s -> LOG.error("{} has no matching ServerCrashProcedure", s));
|
forEach(s -> LOG.error("{} has no matching ServerCrashProcedure", s));
|
||||||
|
//create ServerNode for all possible live servers from wal directory
|
||||||
|
liveServersFromWALDir.stream()
|
||||||
|
.forEach(sn -> server.getAssignmentManager().getRegionStates().getOrCreateServer(sn));
|
||||||
watcher.registerListener(this);
|
watcher.registerListener(this);
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
List<String> servers =
|
List<String> servers =
|
||||||
|
|
|
@ -560,8 +560,10 @@ public class ServerManager {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
|
LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
|
||||||
master.getAssignmentManager().submitServerCrash(serverName, true);
|
long pid = master.getAssignmentManager().submitServerCrash(serverName, true);
|
||||||
|
if (pid <= 0) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
// Tell our listeners that a server was removed
|
// Tell our listeners that a server was removed
|
||||||
if (!this.listeners.isEmpty()) {
|
if (!this.listeners.isEmpty()) {
|
||||||
for (ServerListener listener : this.listeners) {
|
for (ServerListener listener : this.listeners) {
|
||||||
|
@ -570,6 +572,7 @@ public class ServerManager {
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void moveFromOnlineToDeadServers(final ServerName sn) {
|
public void moveFromOnlineToDeadServers(final ServerName sn) {
|
||||||
|
|
|
@ -1343,24 +1343,36 @@ public class AssignmentManager {
|
||||||
public long submitServerCrash(ServerName serverName, boolean shouldSplitWal) {
|
public long submitServerCrash(ServerName serverName, boolean shouldSplitWal) {
|
||||||
boolean carryingMeta;
|
boolean carryingMeta;
|
||||||
long pid;
|
long pid;
|
||||||
ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
|
ServerStateNode serverNode = regionStates.getServerNode(serverName);
|
||||||
|
if(serverNode == null){
|
||||||
|
LOG.info("Skip to add SCP for {} since this server should be OFFLINE already", serverName);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
// we hold the write lock here for fencing on reportRegionStateTransition. Once we set the
|
// we hold the write lock here for fencing on reportRegionStateTransition. Once we set the
|
||||||
// server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
|
// server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
|
||||||
// this server. This is used to simplify the implementation for TRSP and SCP, where we can make
|
// this server. This is used to simplify the implementation for TRSP and SCP, where we can make
|
||||||
// sure that, the region list fetched by SCP will not be changed any more.
|
// sure that, the region list fetched by SCP will not be changed any more.
|
||||||
serverNode.writeLock().lock();
|
serverNode.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
serverNode.setState(ServerState.CRASHED);
|
|
||||||
carryingMeta = isCarryingMeta(serverName);
|
|
||||||
ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
|
ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
|
||||||
pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), serverName,
|
carryingMeta = isCarryingMeta(serverName);
|
||||||
shouldSplitWal, carryingMeta));
|
if (!serverNode.isInState(ServerState.ONLINE)) {
|
||||||
} finally {
|
LOG.info(
|
||||||
serverNode.writeLock().unlock();
|
"Skip to add SCP for {} with meta= {}, " +
|
||||||
}
|
"since there should be a SCP is processing or already done for this server node",
|
||||||
|
serverName, carryingMeta);
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
serverNode.setState(ServerState.CRASHED);
|
||||||
|
pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(),
|
||||||
|
serverName, shouldSplitWal, carryingMeta));
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}",
|
"Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}",
|
||||||
serverName, carryingMeta, pid);
|
serverName, carryingMeta, pid);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
serverNode.writeLock().unlock();
|
||||||
|
}
|
||||||
return pid;
|
return pid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -738,7 +738,8 @@ public class RegionStates {
|
||||||
serverMap.remove(serverName);
|
serverMap.remove(serverName);
|
||||||
}
|
}
|
||||||
|
|
||||||
ServerStateNode getServerNode(final ServerName serverName) {
|
@VisibleForTesting
|
||||||
|
public ServerStateNode getServerNode(final ServerName serverName) {
|
||||||
return serverMap.get(serverName);
|
return serverMap.get(serverName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
* Server State.
|
* Server State.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
enum ServerState {
|
public enum ServerState {
|
||||||
/**
|
/**
|
||||||
* Initial state. Available.
|
* Initial state. Available.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
* State of Server; list of hosted regions, etc.
|
* State of Server; list of hosted regions, etc.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
class ServerStateNode implements Comparable<ServerStateNode> {
|
public class ServerStateNode implements Comparable<ServerStateNode> {
|
||||||
|
|
||||||
private final Set<RegionStateNode> regions;
|
private final Set<RegionStateNode> regions;
|
||||||
private final ServerName serverName;
|
private final ServerName serverName;
|
||||||
|
|
|
@ -333,17 +333,6 @@ public class ServerCrashProcedure
|
||||||
return ServerOperationType.CRASH_HANDLER;
|
return ServerOperationType.CRASH_HANDLER;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* For this procedure, yield at end of each successful flow step so that all crashed servers
|
|
||||||
* can make progress rather than do the default which has each procedure running to completion
|
|
||||||
* before we move to the next. For crashed servers, especially if running with distributed log
|
|
||||||
* replay, we will want all servers to come along; we do not want the scenario where a server is
|
|
||||||
* stuck waiting for regions to online so it can replay edits.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
protected boolean isYieldBeforeExecuteFromState(MasterProcedureEnv env, ServerCrashState state) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean shouldWaitClientAck(MasterProcedureEnv env) {
|
protected boolean shouldWaitClientAck(MasterProcedureEnv env) {
|
||||||
|
@ -390,4 +379,9 @@ public class ServerCrashProcedure
|
||||||
protected ProcedureMetrics getProcedureMetrics(MasterProcedureEnv env) {
|
protected ProcedureMetrics getProcedureMetrics(MasterProcedureEnv env) {
|
||||||
return env.getMasterServices().getMasterMetrics().getServerCrashProcMetrics();
|
return env.getMasterServices().getMasterMetrics().getServerCrashProcMetrics();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean holdLock(MasterProcedureEnv env) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1187,6 +1187,11 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
|
||||||
* @param servers number of region servers
|
* @param servers number of region servers
|
||||||
*/
|
*/
|
||||||
public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
|
public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
|
||||||
|
this.restartHBaseCluster(servers, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void restartHBaseCluster(int servers, List<Integer> ports)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
if (hbaseAdmin != null) {
|
if (hbaseAdmin != null) {
|
||||||
hbaseAdmin.close();
|
hbaseAdmin.close();
|
||||||
hbaseAdmin = null;
|
hbaseAdmin = null;
|
||||||
|
@ -1195,7 +1200,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
|
||||||
this.connection.close();
|
this.connection.close();
|
||||||
this.connection = null;
|
this.connection = null;
|
||||||
}
|
}
|
||||||
this.hbaseCluster = new MiniHBaseCluster(this.conf, servers);
|
this.hbaseCluster = new MiniHBaseCluster(this.conf, 1, servers, ports, null, null);
|
||||||
// Don't leave here till we've done a successful scan of the hbase:meta
|
// Don't leave here till we've done a successful scan of the hbase:meta
|
||||||
Connection conn = ConnectionFactory.createConnection(this.conf);
|
Connection conn = ConnectionFactory.createConnection(this.conf);
|
||||||
Table t = conn.getTable(TableName.META_TABLE_NAME);
|
Table t = conn.getTable(TableName.META_TABLE_NAME);
|
||||||
|
|
|
@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
@ -33,12 +35,18 @@ import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableExistsException;
|
import org.apache.hadoop.hbase.TableExistsException;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.master.assignment.ServerState;
|
||||||
|
import org.apache.hadoop.hbase.master.assignment.ServerStateNode;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
@ -66,6 +74,63 @@ public class TestRestartCluster {
|
||||||
UTIL.shutdownMiniCluster();
|
UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClusterRestartFailOver() throws Exception {
|
||||||
|
UTIL.startMiniCluster(3);
|
||||||
|
UTIL.waitFor(60000, () -> UTIL.getMiniHBaseCluster().getMaster().isInitialized());
|
||||||
|
//wait for all SCPs finished
|
||||||
|
UTIL.waitFor(20000, () -> UTIL.getHBaseCluster().getMaster().getProcedures().stream()
|
||||||
|
.noneMatch(p -> p instanceof ServerCrashProcedure));
|
||||||
|
TableName tableName = TABLES[0];
|
||||||
|
ServerName testServer = UTIL.getHBaseCluster().getRegionServer(0).getServerName();
|
||||||
|
ServerStateNode serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager()
|
||||||
|
.getRegionStates().getServerNode(testServer);
|
||||||
|
Assert.assertNotNull(serverNode);
|
||||||
|
Assert.assertTrue("serverNode should be ONLINE when cluster runs normally",
|
||||||
|
serverNode.isInState(ServerState.ONLINE));
|
||||||
|
UTIL.createMultiRegionTable(tableName, FAMILY);
|
||||||
|
UTIL.waitTableEnabled(tableName);
|
||||||
|
Table table = UTIL.getConnection().getTable(tableName);
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
UTIL.loadTable(table, FAMILY);
|
||||||
|
}
|
||||||
|
List<Integer> ports =
|
||||||
|
UTIL.getHBaseCluster().getMaster().getServerManager().getOnlineServersList().stream()
|
||||||
|
.map(serverName -> serverName.getPort()).collect(Collectors.toList());
|
||||||
|
LOG.info("Shutting down cluster");
|
||||||
|
UTIL.getHBaseCluster().killAll();
|
||||||
|
UTIL.getHBaseCluster().waitUntilShutDown();
|
||||||
|
LOG.info("Starting cluster the second time");
|
||||||
|
UTIL.restartHBaseCluster(3, ports);
|
||||||
|
UTIL.waitFor(10000, () -> UTIL.getHBaseCluster().getMaster().isInitialized());
|
||||||
|
serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
|
||||||
|
.getServerNode(testServer);
|
||||||
|
Assert.assertNotNull("serverNode should not be null when restart whole cluster", serverNode);
|
||||||
|
Assert.assertFalse(serverNode.isInState(ServerState.ONLINE));
|
||||||
|
LOG.info("start to find the procedure of SCP for the severName we choose");
|
||||||
|
UTIL.waitFor(20000,
|
||||||
|
() -> UTIL.getHBaseCluster().getMaster().getProcedures().stream()
|
||||||
|
.anyMatch(procedure -> (procedure instanceof ServerCrashProcedure)
|
||||||
|
&& ((ServerCrashProcedure) procedure).getServerName().equals(testServer)));
|
||||||
|
Assert.assertFalse("serverNode should not be ONLINE during SCP processing",
|
||||||
|
serverNode.isInState(ServerState.ONLINE));
|
||||||
|
LOG.info("start to submit the SCP for the same serverName {} which should fail", testServer);
|
||||||
|
Assert.assertFalse(
|
||||||
|
UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(testServer));
|
||||||
|
Procedure procedure = UTIL.getHBaseCluster().getMaster().getProcedures().stream()
|
||||||
|
.filter(p -> (p instanceof ServerCrashProcedure)
|
||||||
|
&& ((ServerCrashProcedure) p).getServerName().equals(testServer))
|
||||||
|
.findAny().get();
|
||||||
|
UTIL.waitFor(20000, () -> procedure.isFinished());
|
||||||
|
LOG.info("even when the SCP is finished, the duplicate SCP should not be scheduled for {}",
|
||||||
|
testServer);
|
||||||
|
Assert.assertFalse(
|
||||||
|
UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(testServer));
|
||||||
|
serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
|
||||||
|
.getServerNode(testServer);
|
||||||
|
Assert.assertNull("serverNode should be deleted after SCP finished", serverNode);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testClusterRestart() throws Exception {
|
public void testClusterRestart() throws Exception {
|
||||||
UTIL.startMiniCluster(3);
|
UTIL.startMiniCluster(3);
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.master.HMaster;
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
|
import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||||
|
@ -170,6 +171,43 @@ public class TestServerCrashProcedure {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConcurrentSCPForSameServer() throws Exception {
|
||||||
|
final TableName tableName = TableName.valueOf("testConcurrentSCPForSameServer");
|
||||||
|
try (Table t = createTable(tableName)) {
|
||||||
|
// Load the table with a bit of data so some logs to split and some edits in each region.
|
||||||
|
this.util.loadTable(t, HBaseTestingUtility.COLUMNS[0]);
|
||||||
|
final int count = util.countRows(t);
|
||||||
|
assertTrue("expected some rows", count > 0);
|
||||||
|
// find the first server that match the request and executes the test
|
||||||
|
ServerName rsToKill = null;
|
||||||
|
for (RegionInfo hri : util.getAdmin().getRegions(tableName)) {
|
||||||
|
final ServerName serverName = AssignmentTestingUtil.getServerHoldingRegion(util, hri);
|
||||||
|
if (AssignmentTestingUtil.isServerHoldingMeta(util, serverName) == true) {
|
||||||
|
rsToKill = serverName;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
HMaster master = util.getHBaseCluster().getMaster();
|
||||||
|
final ProcedureExecutor<MasterProcedureEnv> pExecutor = master.getMasterProcedureExecutor();
|
||||||
|
ServerCrashProcedure procB =
|
||||||
|
new ServerCrashProcedure(pExecutor.getEnvironment(), rsToKill, false, false);
|
||||||
|
AssignmentTestingUtil.killRs(util, rsToKill);
|
||||||
|
long procId = getSCPProcId(pExecutor);
|
||||||
|
Procedure procA = pExecutor.getProcedure(procId);
|
||||||
|
LOG.info("submit SCP procedureA");
|
||||||
|
util.waitFor(5000, () -> procA.hasLock());
|
||||||
|
LOG.info("procedureA acquired the lock");
|
||||||
|
assertEquals(Procedure.LockState.LOCK_EVENT_WAIT,
|
||||||
|
procB.acquireLock(pExecutor.getEnvironment()));
|
||||||
|
LOG.info("procedureB should not be able to get the lock");
|
||||||
|
util.waitFor(60000,
|
||||||
|
() -> procB.acquireLock(pExecutor.getEnvironment()) == Procedure.LockState.LOCK_ACQUIRED);
|
||||||
|
LOG.info("when procedure B get the lock, procedure A should be finished");
|
||||||
|
assertTrue(procA.isFinished());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected void assertReplicaDistributed(final Table t) {
|
protected void assertReplicaDistributed(final Table t) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue