diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 5386f6edd3c..78294a08fe0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -37,8 +37,8 @@ import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -95,7 +95,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; -import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.ConfigUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -1985,7 +1984,7 @@ public class AssignmentManager extends ZooKeeperListener { // ClosedRegionhandler can remove the server from this.regions if (!serverManager.isServerOnline(server)) { LOG.debug("Offline " + region.getRegionNameAsString() - + ", no need to unassign since it's on a dead server: " + server); + + ", no need to unassign since it's on a dead server: " + server); if (transitionInZK) { // delete the node. if no node exists need not bother. deleteClosingOrClosedNode(region, server); @@ -1995,42 +1994,39 @@ public class AssignmentManager extends ZooKeeperListener { } return; } + long sleepTime = 0; try { // Send CLOSE RPC - if (serverManager.sendRegionClose(server, region, - versionOfClosingNode, dest, transitionInZK)) { - LOG.debug("Sent CLOSE to " + server + " for region " + - region.getRegionNameAsString()); + if (serverManager.sendRegionClose(server, region, versionOfClosingNode, dest, + transitionInZK)) { + LOG.debug("Sent CLOSE to " + server + " for region " + region.getRegionNameAsString()); if (useZKForAssignment && !transitionInZK && state != null) { // Retry to make sure the region is // closed so as to avoid double assignment. - unassign(region, state, versionOfClosingNode, - dest, transitionInZK, src); + unassign(region, state, versionOfClosingNode, dest, transitionInZK, src); } return; } // This never happens. Currently regionserver close always return true. // Todo; this can now happen (0.96) if there is an exception in a coprocessor - LOG.warn("Server " + server + " region CLOSE RPC returned false for " + - region.getRegionNameAsString()); + LOG.warn("Server " + server + " region CLOSE RPC returned false for " + + region.getRegionNameAsString()); } catch (Throwable t) { - long sleepTime = 0; Configuration conf = this.server.getConfiguration(); if (t instanceof RemoteException) { - t = ((RemoteException)t).unwrapRemoteException(); + t = ((RemoteException) t).unwrapRemoteException(); } boolean logRetries = true; - if (t instanceof RegionServerAbortedException - || t instanceof RegionServerStoppedException + if (t instanceof RegionServerStoppedException || t instanceof ServerNotRunningYetException) { // RS is aborting or stopping, we cannot offline the region since the region may need - // to do WAL recovery. Until we see the RS expiration, we should retry. + // to do WAL recovery. Until we see the RS expiration, we should retry. sleepTime = 1L + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); } else if (t instanceof NotServingRegionException) { - LOG.debug("Offline " + region.getRegionNameAsString() - + ", it's not any more on " + server, t); + LOG.debug( + "Offline " + region.getRegionNameAsString() + ", it's not any more on " + server, t); if (transitionInZK) { deleteClosingOrClosedNode(region, server); } @@ -2038,25 +2034,24 @@ public class AssignmentManager extends ZooKeeperListener { regionOffline(region); } return; - } else if ((t instanceof FailedServerException) || (state != null && - t instanceof RegionAlreadyInTransitionException)) { + } else if ((t instanceof FailedServerException) + || (state != null && t instanceof RegionAlreadyInTransitionException)) { if (t instanceof FailedServerException) { sleepTime = 1L + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, - RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); + RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); } else { // RS is already processing this region, only need to update the timestamp LOG.debug("update " + state + " the timestamp."); state.updateTimestampToNow(); if (maxWaitTime < 0) { - maxWaitTime = - EnvironmentEdgeManager.currentTime() - + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME, - DEFAULT_ALREADY_IN_TRANSITION_WAITTIME); + maxWaitTime = EnvironmentEdgeManager.currentTime() + conf.getLong( + ALREADY_IN_TRANSITION_WAITTIME, DEFAULT_ALREADY_IN_TRANSITION_WAITTIME); } long now = EnvironmentEdgeManager.currentTime(); if (now < maxWaitTime) { - LOG.debug("Region is already in transition; " - + "waiting up to " + (maxWaitTime - now) + "ms", t); + LOG.debug("Region is already in transition; " + "waiting up to " + + (maxWaitTime - now) + "ms", + t); sleepTime = 100; i--; // reset the try count logRetries = false; @@ -2064,28 +2059,40 @@ public class AssignmentManager extends ZooKeeperListener { } } - try { - if (sleepTime > 0) { - Thread.sleep(sleepTime); - } - } catch (InterruptedException ie) { - LOG.warn("Failed to unassign " - + region.getRegionNameAsString() + " since interrupted", ie); - Thread.currentThread().interrupt(); - if (state != null) { - regionStates.updateRegionState(region, State.FAILED_CLOSE); - } - return; - } - if (logRetries) { - LOG.info("Server " + server + " returned " + t + " for " - + region.getRegionNameAsString() + ", try=" + i - + " of " + this.maximumAttempts, t); + LOG.info("Server " + server + " returned " + t + " for " + region.getRegionNameAsString() + + ", try=" + i + " of " + this.maximumAttempts, + t); // Presume retry or server will expire. } } + // sleepTime is set in one of the following cases (reasons commented above): + // 1. Region server stopping or aborting + // 2. Region already in transition + // 3. Connecting to server that is already dead + // + // If sleepTime is not set by any of the cases, set it to sleep for + // configured exponential backoff time + if (sleepTime == 0 && i != maximumAttempts) { + sleepTime = backoffPolicy.getBackoffTime(retryConfig, i); + LOG.info("Waiting for " + sleepTime + "milliseconds exponential backoff time for " + + region.getRegionNameAsString() + " before next retry " + (i + 1) + " of " + + this.maximumAttempts); + } + try { + if (sleepTime > 0 && i != maximumAttempts) { + Thread.sleep(sleepTime); + } + } catch (InterruptedException ie) { + LOG.warn("Failed to unassign " + region.getRegionNameAsString() + " since interrupted", ie); + if (state != null) { + regionStates.updateRegionState(region, State.FAILED_CLOSE); + } + Thread.currentThread().interrupt(); + return; + } } + // Run out of attempts if (state != null) { regionStates.updateRegionState(region, State.FAILED_CLOSE); @@ -2108,54 +2115,57 @@ public class AssignmentManager extends ZooKeeperListener { LOG.debug("Force region state offline " + state); } - switch (state.getState()) { - case OPEN: - case OPENING: - case PENDING_OPEN: - case CLOSING: - case PENDING_CLOSE: - if (!forceNewPlan) { - LOG.debug("Skip assigning " + - region + ", it is already " + state); - return null; + // We need a lock on the region as we could update it + Lock lock = locker.acquireLock(region.getEncodedName()); + try { + switch (state.getState()) { + case OPEN: + case OPENING: + case PENDING_OPEN: + case CLOSING: + case PENDING_CLOSE: + if (!forceNewPlan) { + LOG.debug("Skip assigning " + region + ", it is already " + state); + return null; + } + case FAILED_CLOSE: + case FAILED_OPEN: + unassign(region, state, -1, null, false, null); + state = regionStates.getRegionState(region); + if (state.isFailedClose()) { + // If we can't close the region, we can't re-assign + // it so as to avoid possible double assignment/data loss. + LOG.info("Skip assigning " + region + ", we couldn't close it: " + state); + return null; + } + case OFFLINE: + // This region could have been open on this server + // for a while. If the server is dead and not processed + // yet, we can move on only if the meta shows the + // region is not on this server actually, or on a server + // not dead, or dead and processed already. + // In case not using ZK, we don't need this check because + // we have the latest info in memory, and the caller + // will do another round checking any way. + if (useZKForAssignment && regionStates.isServerDeadAndNotProcessed(sn) + && wasRegionOnDeadServerByMeta(region, sn)) { + if (!regionStates.isRegionInTransition(region)) { + LOG.info( + "Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH"); + regionStates.updateRegionState(region, State.OFFLINE); + } + LOG.info("Skip assigning " + region.getRegionNameAsString() + + ", it is on a dead but not processed yet server: " + sn); + return null; + } + case CLOSED: + break; + default: + LOG.error("Trying to assign region " + region + ", which is " + state); + return null; } - case FAILED_CLOSE: - case FAILED_OPEN: - unassign(region, state, -1, null, false, null); - state = regionStates.getRegionState(region); - if (state.isFailedClose()) { - // If we can't close the region, we can't re-assign - // it so as to avoid possible double assignment/data loss. - LOG.info("Skip assigning " + - region + ", we couldn't close it: " + state); - return null; - } - case OFFLINE: - // This region could have been open on this server - // for a while. If the server is dead and not processed - // yet, we can move on only if the meta shows the - // region is not on this server actually, or on a server - // not dead, or dead and processed already. - // In case not using ZK, we don't need this check because - // we have the latest info in memory, and the caller - // will do another round checking any way. - if (useZKForAssignment - && regionStates.isServerDeadAndNotProcessed(sn) - && wasRegionOnDeadServerByMeta(region, sn)) { - if (!regionStates.isRegionInTransition(region)) { - LOG.info("Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH"); - regionStates.updateRegionState(region, State.OFFLINE); - } - LOG.info("Skip assigning " + region.getRegionNameAsString() - + ", it is on a dead but not processed yet server: " + sn); - return null; - } - case CLOSED: - break; - default: - LOG.error("Trying to assign region " + region - + ", which is " + state); - return null; + } finally { + lock.unlock(); } return state; } @@ -3623,7 +3633,7 @@ public class AssignmentManager extends ZooKeeperListener { if (failedOpenTracker.containsKey(regionInfo.getEncodedName())) { // Sleep before reassigning if this region has failed to open before long sleepTime = backoffPolicy.getBackoffTime(retryConfig, - getFailedAttempts(regionInfo.getEncodedName())); + getFailedAttempts(regionInfo.getEncodedName())); invokeAssignLater(regionInfo, forceNewPlan, sleepTime); } else { // Immediately reassign if this region has never failed an open before diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java index 69dfa40bc9e..28f9e831531 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java @@ -32,11 +32,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; @@ -72,11 +71,13 @@ import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConfigUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.util.TestTableName; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKAssign; @@ -84,9 +85,13 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + /** * This tests AssignmentManager with a testing cluster. @@ -98,6 +103,8 @@ public class TestAssignmentManagerOnCluster { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); final static Configuration conf = TEST_UTIL.getConfiguration(); private static HBaseAdmin admin; + @Rule + public TestTableName testTableName = new TestTableName(); static void setupOnce() throws Exception { // Using the our load balancer to control region plans @@ -598,6 +605,68 @@ public class TestAssignmentManagerOnCluster { } } + /** + * This tests region close with exponential backoff + */ + @Test(timeout = 60000) + public void testCloseRegionWithExponentialBackOff() throws Exception { + TableName tableName = testTableName.getTableName(); + // Set the backoff time between each retry for failed close + TEST_UTIL.getMiniHBaseCluster().getConf().setLong("hbase.assignment.retry.sleep.initial", 1000); + HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster(); + TEST_UTIL.getMiniHBaseCluster().stopMaster(activeMaster.getServerName()); + TEST_UTIL.getMiniHBaseCluster().startMaster(); // restart the master for conf take into affect + + try { + ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = + new ScheduledThreadPoolExecutor(1, Threads.newDaemonThreadFactory("ExponentialBackOff")); + + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(FAMILY)); + admin.createTable(desc); + + Table meta = new HTable(conf, TableName.META_TABLE_NAME); + HRegionInfo hri = + new HRegionInfo(desc.getTableName(), Bytes.toBytes("A"), Bytes.toBytes("Z")); + MetaTableAccessor.addRegionToMeta(meta, hri); + + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + AssignmentManager am = master.getAssignmentManager(); + assertTrue(TEST_UTIL.assignRegion(hri)); + ServerName sn = am.getRegionStates().getRegionServerOfRegion(hri); + TEST_UTIL.assertRegionOnServer(hri, sn, 6000); + + MyRegionObserver.preCloseEnabled.set(true); + // Unset the precloseEnabled flag after 1 second for next retry to succeed + scheduledThreadPoolExecutor.schedule(new Runnable() { + @Override + public void run() { + MyRegionObserver.preCloseEnabled.set(false); + } + }, 1000, TimeUnit.MILLISECONDS); + am.unassign(hri); + + // region may still be assigned now since it's closing, + // let's check if it's assigned after it's out of transition + am.waitOnRegionToClearRegionsInTransition(hri); + + // region should be closed and re-assigned + assertTrue(am.waitForAssignment(hri)); + ServerName serverName = + master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri); + TEST_UTIL.assertRegionOnServer(hri, serverName, 6000); + } finally { + MyRegionObserver.preCloseEnabled.set(false); + TEST_UTIL.deleteTable(tableName); + + // reset the backoff time to default + TEST_UTIL.getMiniHBaseCluster().getConf().unset("hbase.assignment.retry.sleep.initial"); + activeMaster = TEST_UTIL.getMiniHBaseCluster().getMaster(); + TEST_UTIL.getMiniHBaseCluster().stopMaster(activeMaster.getServerName()); + TEST_UTIL.getMiniHBaseCluster().startMaster(); + } + } + /** * This tests region open failed */ @@ -889,7 +958,7 @@ public class TestAssignmentManagerOnCluster { /** * This tests region close racing with open */ - @Test (timeout=60000) + @Test(timeout = 60000) public void testOpenCloseRacing() throws Exception { String table = "testOpenCloseRacing"; try {