diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java index f419732eb85..9d33a212085 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java @@ -128,6 +128,9 @@ public class RegionServerTracker extends ZKListener { // '-SPLITTING'. Each splitting server should have a corresponding SCP. Log if not. splittingServersFromWALDir.stream().filter(s -> !deadServersFromPE.contains(s)). forEach(s -> LOG.error("{} has no matching ServerCrashProcedure", s)); + //create ServerNode for all possible live servers from wal directory + liveServersFromWALDir.stream() + .forEach(sn -> server.getAssignmentManager().getRegionStates().getOrCreateServer(sn)); watcher.registerListener(this); synchronized (this) { List servers = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index e1ffda3b2fb..b4c474765fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -560,15 +560,18 @@ public class ServerManager { return false; } LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName()); - master.getAssignmentManager().submitServerCrash(serverName, true); - - // Tell our listeners that a server was removed - if (!this.listeners.isEmpty()) { - for (ServerListener listener : this.listeners) { - listener.serverRemoved(serverName); + long pid = master.getAssignmentManager().submitServerCrash(serverName, true); + if (pid <= 0) { + return false; + } else { + // Tell our listeners that a server was removed + if (!this.listeners.isEmpty()) { + for (ServerListener listener : this.listeners) { + listener.serverRemoved(serverName); + } } + return true; } - return true; } @VisibleForTesting diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index a564ea925a6..b7c2203051a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -1343,24 +1343,36 @@ public class AssignmentManager { public long submitServerCrash(ServerName serverName, boolean shouldSplitWal) { boolean carryingMeta; long pid; - ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); + ServerStateNode serverNode = regionStates.getServerNode(serverName); + if(serverNode == null){ + LOG.info("Skip to add SCP for {} since this server should be OFFLINE already", serverName); + return -1; + } // we hold the write lock here for fencing on reportRegionStateTransition. Once we set the // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from // this server. This is used to simplify the implementation for TRSP and SCP, where we can make // sure that, the region list fetched by SCP will not be changed any more. serverNode.writeLock().lock(); try { - serverNode.setState(ServerState.CRASHED); - carryingMeta = isCarryingMeta(serverName); ProcedureExecutor procExec = this.master.getMasterProcedureExecutor(); - pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), serverName, - shouldSplitWal, carryingMeta)); + carryingMeta = isCarryingMeta(serverName); + if (!serverNode.isInState(ServerState.ONLINE)) { + LOG.info( + "Skip to add SCP for {} with meta= {}, " + + "since there should be a SCP is processing or already done for this server node", + serverName, carryingMeta); + return -1; + } else { + serverNode.setState(ServerState.CRASHED); + pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), + serverName, shouldSplitWal, carryingMeta)); + LOG.info( + "Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}", + serverName, carryingMeta, pid); + } } finally { serverNode.writeLock().unlock(); } - LOG.info( - "Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}", - serverName, carryingMeta, pid); return pid; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java index 7b854095e3c..1470a5a8532 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java @@ -738,7 +738,8 @@ public class RegionStates { serverMap.remove(serverName); } - ServerStateNode getServerNode(final ServerName serverName) { + @VisibleForTesting + public ServerStateNode getServerNode(final ServerName serverName) { return serverMap.get(serverName); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java index 3efe6e22662..c86a60ea445 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java @@ -23,7 +23,7 @@ import org.apache.yetus.audience.InterfaceAudience; * Server State. */ @InterfaceAudience.Private -enum ServerState { +public enum ServerState { /** * Initial state. Available. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java index 6f763aad7c2..11883db86cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java @@ -32,7 +32,7 @@ import org.apache.yetus.audience.InterfaceAudience; * State of Server; list of hosted regions, etc. */ @InterfaceAudience.Private -class ServerStateNode implements Comparable { +public class ServerStateNode implements Comparable { private final Set regions; private final ServerName serverName; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index b93f8fa9b46..05bcd28d1ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -333,17 +333,6 @@ public class ServerCrashProcedure return ServerOperationType.CRASH_HANDLER; } - /** - * For this procedure, yield at end of each successful flow step so that all crashed servers - * can make progress rather than do the default which has each procedure running to completion - * before we move to the next. For crashed servers, especially if running with distributed log - * replay, we will want all servers to come along; we do not want the scenario where a server is - * stuck waiting for regions to online so it can replay edits. - */ - @Override - protected boolean isYieldBeforeExecuteFromState(MasterProcedureEnv env, ServerCrashState state) { - return true; - } @Override protected boolean shouldWaitClientAck(MasterProcedureEnv env) { @@ -390,4 +379,9 @@ public class ServerCrashProcedure protected ProcedureMetrics getProcedureMetrics(MasterProcedureEnv env) { return env.getMasterServices().getMasterMetrics().getServerCrashProcMetrics(); } + + @Override + protected boolean holdLock(MasterProcedureEnv env) { + return true; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 949e1313af9..5516e11ce4e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -1187,6 +1187,11 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { * @param servers number of region servers */ public void restartHBaseCluster(int servers) throws IOException, InterruptedException { + this.restartHBaseCluster(servers, null); + } + + public void restartHBaseCluster(int servers, List ports) + throws IOException, InterruptedException { if (hbaseAdmin != null) { hbaseAdmin.close(); hbaseAdmin = null; @@ -1195,7 +1200,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { this.connection.close(); this.connection = null; } - this.hbaseCluster = new MiniHBaseCluster(this.conf, servers); + this.hbaseCluster = new MiniHBaseCluster(this.conf, 1, servers, ports, null, null); // Don't leave here till we've done a successful scan of the hbase:meta Connection conn = ConnectionFactory.createConnection(this.conf); Table t = conn.getTable(TableName.META_TABLE_NAME); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java index 04720cdaa02..e031d3467a0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java @@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -33,12 +35,18 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.assignment.ServerState; +import org.apache.hadoop.hbase.master.assignment.ServerStateNode; +import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Threads; import org.junit.After; +import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -66,6 +74,63 @@ public class TestRestartCluster { UTIL.shutdownMiniCluster(); } + @Test + public void testClusterRestartFailOver() throws Exception { + UTIL.startMiniCluster(3); + UTIL.waitFor(60000, () -> UTIL.getMiniHBaseCluster().getMaster().isInitialized()); + //wait for all SCPs finished + UTIL.waitFor(20000, () -> UTIL.getHBaseCluster().getMaster().getProcedures().stream() + .noneMatch(p -> p instanceof ServerCrashProcedure)); + TableName tableName = TABLES[0]; + ServerName testServer = UTIL.getHBaseCluster().getRegionServer(0).getServerName(); + ServerStateNode serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager() + .getRegionStates().getServerNode(testServer); + Assert.assertNotNull(serverNode); + Assert.assertTrue("serverNode should be ONLINE when cluster runs normally", + serverNode.isInState(ServerState.ONLINE)); + UTIL.createMultiRegionTable(tableName, FAMILY); + UTIL.waitTableEnabled(tableName); + Table table = UTIL.getConnection().getTable(tableName); + for (int i = 0; i < 100; i++) { + UTIL.loadTable(table, FAMILY); + } + List ports = + UTIL.getHBaseCluster().getMaster().getServerManager().getOnlineServersList().stream() + .map(serverName -> serverName.getPort()).collect(Collectors.toList()); + LOG.info("Shutting down cluster"); + UTIL.getHBaseCluster().killAll(); + UTIL.getHBaseCluster().waitUntilShutDown(); + LOG.info("Starting cluster the second time"); + UTIL.restartHBaseCluster(3, ports); + UTIL.waitFor(10000, () -> UTIL.getHBaseCluster().getMaster().isInitialized()); + serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates() + .getServerNode(testServer); + Assert.assertNotNull("serverNode should not be null when restart whole cluster", serverNode); + Assert.assertFalse(serverNode.isInState(ServerState.ONLINE)); + LOG.info("start to find the procedure of SCP for the severName we choose"); + UTIL.waitFor(20000, + () -> UTIL.getHBaseCluster().getMaster().getProcedures().stream() + .anyMatch(procedure -> (procedure instanceof ServerCrashProcedure) + && ((ServerCrashProcedure) procedure).getServerName().equals(testServer))); + Assert.assertFalse("serverNode should not be ONLINE during SCP processing", + serverNode.isInState(ServerState.ONLINE)); + LOG.info("start to submit the SCP for the same serverName {} which should fail", testServer); + Assert.assertFalse( + UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(testServer)); + Procedure procedure = UTIL.getHBaseCluster().getMaster().getProcedures().stream() + .filter(p -> (p instanceof ServerCrashProcedure) + && ((ServerCrashProcedure) p).getServerName().equals(testServer)) + .findAny().get(); + UTIL.waitFor(20000, () -> procedure.isFinished()); + LOG.info("even when the SCP is finished, the duplicate SCP should not be scheduled for {}", + testServer); + Assert.assertFalse( + UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(testServer)); + serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates() + .getServerNode(testServer); + Assert.assertNull("serverNode should be deleted after SCP finished", serverNode); + } + @Test public void testClusterRestart() throws Exception { UTIL.startMiniCluster(3); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java index 0e4a84b5c76..af2076e2e82 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; @@ -170,6 +171,43 @@ public class TestServerCrashProcedure { } } + @Test + public void testConcurrentSCPForSameServer() throws Exception { + final TableName tableName = TableName.valueOf("testConcurrentSCPForSameServer"); + try (Table t = createTable(tableName)) { + // Load the table with a bit of data so some logs to split and some edits in each region. + this.util.loadTable(t, HBaseTestingUtility.COLUMNS[0]); + final int count = util.countRows(t); + assertTrue("expected some rows", count > 0); + // find the first server that match the request and executes the test + ServerName rsToKill = null; + for (RegionInfo hri : util.getAdmin().getRegions(tableName)) { + final ServerName serverName = AssignmentTestingUtil.getServerHoldingRegion(util, hri); + if (AssignmentTestingUtil.isServerHoldingMeta(util, serverName) == true) { + rsToKill = serverName; + break; + } + } + HMaster master = util.getHBaseCluster().getMaster(); + final ProcedureExecutor pExecutor = master.getMasterProcedureExecutor(); + ServerCrashProcedure procB = + new ServerCrashProcedure(pExecutor.getEnvironment(), rsToKill, false, false); + AssignmentTestingUtil.killRs(util, rsToKill); + long procId = getSCPProcId(pExecutor); + Procedure procA = pExecutor.getProcedure(procId); + LOG.info("submit SCP procedureA"); + util.waitFor(5000, () -> procA.hasLock()); + LOG.info("procedureA acquired the lock"); + assertEquals(Procedure.LockState.LOCK_EVENT_WAIT, + procB.acquireLock(pExecutor.getEnvironment())); + LOG.info("procedureB should not be able to get the lock"); + util.waitFor(60000, + () -> procB.acquireLock(pExecutor.getEnvironment()) == Procedure.LockState.LOCK_ACQUIRED); + LOG.info("when procedure B get the lock, procedure A should be finished"); + assertTrue(procA.isFinished()); + } + } + protected void assertReplicaDistributed(final Table t) { return; }