HBASE-21565 Delete dead server from dead server list too early leads to concurrent Server Crash Procedures(SCP) for a same server

This commit is contained in:
Jingyun Tian 2018-12-17 19:32:23 +08:00 committed by Jingyun Tian
parent f78284685f
commit c448604ceb
10 changed files with 155 additions and 34 deletions

View File

@ -128,6 +128,9 @@ public class RegionServerTracker extends ZKListener {
// '-SPLITTING'. Each splitting server should have a corresponding SCP. Log if not.
splittingServersFromWALDir.stream().filter(s -> !deadServersFromPE.contains(s)).
forEach(s -> LOG.error("{} has no matching ServerCrashProcedure", s));
//create ServerNode for all possible live servers from wal directory
liveServersFromWALDir.stream()
.forEach(sn -> server.getAssignmentManager().getRegionStates().getOrCreateServer(sn));
watcher.registerListener(this);
synchronized (this) {
List<String> servers =

View File

@ -602,19 +602,22 @@ public class ServerManager {
return false;
}
LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
master.getAssignmentManager().submitServerCrash(serverName, true);
// Tell our listeners that a server was removed
if (!this.listeners.isEmpty()) {
for (ServerListener listener : this.listeners) {
listener.serverRemoved(serverName);
long pid = master.getAssignmentManager().submitServerCrash(serverName, true);
if(pid <= 0) {
return false;
} else {
// Tell our listeners that a server was removed
if (!this.listeners.isEmpty()) {
for (ServerListener listener : this.listeners) {
listener.serverRemoved(serverName);
}
}
// trigger a persist of flushedSeqId
if (flushedSeqIdFlusher != null) {
flushedSeqIdFlusher.triggerNow();
}
return true;
}
// trigger a persist of flushedSeqId
if (flushedSeqIdFlusher != null) {
flushedSeqIdFlusher.triggerNow();
}
return true;
}
@VisibleForTesting

View File

@ -1343,24 +1343,36 @@ public class AssignmentManager {
public long submitServerCrash(ServerName serverName, boolean shouldSplitWal) {
boolean carryingMeta;
long pid;
ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
ServerStateNode serverNode = regionStates.getServerNode(serverName);
if(serverNode == null){
LOG.info("Skip to add SCP for {} since this server should be OFFLINE already", serverName);
return -1;
}
// we hold the write lock here for fencing on reportRegionStateTransition. Once we set the
// server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
// this server. This is used to simplify the implementation for TRSP and SCP, where we can make
// sure that, the region list fetched by SCP will not be changed any more.
serverNode.writeLock().lock();
try {
serverNode.setState(ServerState.CRASHED);
carryingMeta = isCarryingMeta(serverName);
ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), serverName,
shouldSplitWal, carryingMeta));
carryingMeta = isCarryingMeta(serverName);
if (!serverNode.isInState(ServerState.ONLINE)) {
LOG.info(
"Skip to add SCP for {} with meta= {}, " +
"since there should be a SCP is processing or already done for this server node",
serverName, carryingMeta);
return -1;
} else {
serverNode.setState(ServerState.CRASHED);
pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(),
serverName, shouldSplitWal, carryingMeta));
LOG.info(
"Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}",
serverName, carryingMeta, pid);
}
} finally {
serverNode.writeLock().unlock();
}
LOG.info(
"Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}",
serverName, carryingMeta, pid);
return pid;
}

View File

@ -738,7 +738,8 @@ public class RegionStates {
serverMap.remove(serverName);
}
ServerStateNode getServerNode(final ServerName serverName) {
@VisibleForTesting
public ServerStateNode getServerNode(final ServerName serverName) {
return serverMap.get(serverName);
}

View File

@ -23,7 +23,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* Server State.
*/
@InterfaceAudience.Private
enum ServerState {
public enum ServerState {
/**
* Initial state. Available.
*/

View File

@ -32,7 +32,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* State of Server; list of hosted regions, etc.
*/
@InterfaceAudience.Private
class ServerStateNode implements Comparable<ServerStateNode> {
public class ServerStateNode implements Comparable<ServerStateNode> {
private final Set<RegionStateNode> regions;
private final ServerName serverName;

View File

@ -333,17 +333,6 @@ public class ServerCrashProcedure
return ServerOperationType.CRASH_HANDLER;
}
/**
* For this procedure, yield at end of each successful flow step so that all crashed servers
* can make progress rather than do the default which has each procedure running to completion
* before we move to the next. For crashed servers, especially if running with distributed log
* replay, we will want all servers to come along; we do not want the scenario where a server is
* stuck waiting for regions to online so it can replay edits.
*/
@Override
protected boolean isYieldBeforeExecuteFromState(MasterProcedureEnv env, ServerCrashState state) {
return true;
}
@Override
protected boolean shouldWaitClientAck(MasterProcedureEnv env) {
@ -390,4 +379,9 @@ public class ServerCrashProcedure
protected ProcedureMetrics getProcedureMetrics(MasterProcedureEnv env) {
return env.getMasterServices().getMasterMetrics().getServerCrashProcMetrics();
}
@Override
protected boolean holdLock(MasterProcedureEnv env) {
return true;
}
}

View File

@ -1187,6 +1187,11 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
* @param servers number of region servers
*/
public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
this.restartHBaseCluster(servers, null);
}
public void restartHBaseCluster(int servers, List<Integer> ports)
throws IOException, InterruptedException {
if (hbaseAdmin != null) {
hbaseAdmin.close();
hbaseAdmin = null;
@ -1195,7 +1200,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
this.connection.close();
this.connection = null;
}
this.hbaseCluster = new MiniHBaseCluster(this.conf, servers);
this.hbaseCluster = new MiniHBaseCluster(this.conf, 1, servers, ports, null, null);
// Don't leave here till we've done a successful scan of the hbase:meta
Connection conn = ConnectionFactory.createConnection(this.conf);
Table t = conn.getTable(TableName.META_TABLE_NAME);

View File

@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@ -33,12 +35,18 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.assignment.ServerState;
import org.apache.hadoop.hbase.master.assignment.ServerStateNode;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -66,6 +74,63 @@ public class TestRestartCluster {
UTIL.shutdownMiniCluster();
}
@Test
public void testClusterRestartFailOver() throws Exception {
UTIL.startMiniCluster(3);
UTIL.waitFor(60000, () -> UTIL.getMiniHBaseCluster().getMaster().isInitialized());
//wait for all SCPs finished
UTIL.waitFor(20000, () -> UTIL.getHBaseCluster().getMaster().getProcedures().stream()
.noneMatch(p -> p instanceof ServerCrashProcedure));
TableName tableName = TABLES[0];
ServerName testServer = UTIL.getHBaseCluster().getRegionServer(0).getServerName();
ServerStateNode serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager()
.getRegionStates().getServerNode(testServer);
Assert.assertNotNull(serverNode);
Assert.assertTrue("serverNode should be ONLINE when cluster runs normally",
serverNode.isInState(ServerState.ONLINE));
UTIL.createMultiRegionTable(tableName, FAMILY);
UTIL.waitTableEnabled(tableName);
Table table = UTIL.getConnection().getTable(tableName);
for (int i = 0; i < 100; i++) {
UTIL.loadTable(table, FAMILY);
}
List<Integer> ports =
UTIL.getHBaseCluster().getMaster().getServerManager().getOnlineServersList().stream()
.map(serverName -> serverName.getPort()).collect(Collectors.toList());
LOG.info("Shutting down cluster");
UTIL.getHBaseCluster().killAll();
UTIL.getHBaseCluster().waitUntilShutDown();
LOG.info("Starting cluster the second time");
UTIL.restartHBaseCluster(3, ports);
UTIL.waitFor(10000, () -> UTIL.getHBaseCluster().getMaster().isInitialized());
serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
.getServerNode(testServer);
Assert.assertNotNull("serverNode should not be null when restart whole cluster", serverNode);
Assert.assertFalse(serverNode.isInState(ServerState.ONLINE));
LOG.info("start to find the procedure of SCP for the severName we choose");
UTIL.waitFor(20000,
() -> UTIL.getHBaseCluster().getMaster().getProcedures().stream()
.anyMatch(procedure -> (procedure instanceof ServerCrashProcedure)
&& ((ServerCrashProcedure) procedure).getServerName().equals(testServer)));
Assert.assertFalse("serverNode should not be ONLINE during SCP processing",
serverNode.isInState(ServerState.ONLINE));
LOG.info("start to submit the SCP for the same serverName {} which should fail", testServer);
Assert.assertFalse(
UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(testServer));
Procedure procedure = UTIL.getHBaseCluster().getMaster().getProcedures().stream()
.filter(p -> (p instanceof ServerCrashProcedure)
&& ((ServerCrashProcedure) p).getServerName().equals(testServer))
.findAny().get();
UTIL.waitFor(20000, () -> procedure.isFinished());
LOG.info("even when the SCP is finished, the duplicate SCP should not be scheduled for {}",
testServer);
Assert.assertFalse(
UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(testServer));
serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
.getServerNode(testServer);
Assert.assertNull("serverNode should be deleted after SCP finished", serverNode);
}
@Test
public void testClusterRestart() throws Exception {
UTIL.startMiniCluster(3);

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
@ -170,6 +171,43 @@ public class TestServerCrashProcedure {
}
}
@Test
public void testConcurrentSCPForSameServer() throws Exception {
final TableName tableName = TableName.valueOf("testConcurrentSCPForSameServer");
try (Table t = createTable(tableName)) {
// Load the table with a bit of data so some logs to split and some edits in each region.
this.util.loadTable(t, HBaseTestingUtility.COLUMNS[0]);
final int count = util.countRows(t);
assertTrue("expected some rows", count > 0);
// find the first server that match the request and executes the test
ServerName rsToKill = null;
for (RegionInfo hri : util.getAdmin().getRegions(tableName)) {
final ServerName serverName = AssignmentTestingUtil.getServerHoldingRegion(util, hri);
if (AssignmentTestingUtil.isServerHoldingMeta(util, serverName) == true) {
rsToKill = serverName;
break;
}
}
HMaster master = util.getHBaseCluster().getMaster();
final ProcedureExecutor<MasterProcedureEnv> pExecutor = master.getMasterProcedureExecutor();
ServerCrashProcedure procB =
new ServerCrashProcedure(pExecutor.getEnvironment(), rsToKill, false, false);
AssignmentTestingUtil.killRs(util, rsToKill);
long procId = getSCPProcId(pExecutor);
Procedure procA = pExecutor.getProcedure(procId);
LOG.info("submit SCP procedureA");
util.waitFor(5000, () -> procA.hasLock());
LOG.info("procedureA acquired the lock");
assertEquals(Procedure.LockState.LOCK_EVENT_WAIT,
procB.acquireLock(pExecutor.getEnvironment()));
LOG.info("procedureB should not be able to get the lock");
util.waitFor(60000,
() -> procB.acquireLock(pExecutor.getEnvironment()) == Procedure.LockState.LOCK_ACQUIRED);
LOG.info("when procedure B get the lock, procedure A should be finished");
assertTrue(procA.isFinished());
}
}
protected void assertReplicaDistributed(final Table t) {
return;
}