HBASE-24069 Provide an ExponentialBackOffPolicy sleep between failed … (#1755)
HBASE-24069: Provide an ExponentialBackOffPolicy sleep between failed region close requests Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
parent
ee869b9aea
commit
3c138845d9
|
@ -37,8 +37,8 @@ import java.util.TreeMap;
|
|||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -95,7 +95,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||
import org.apache.hadoop.hbase.util.ConfigUtil;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -1985,7 +1984,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
// ClosedRegionhandler can remove the server from this.regions
|
||||
if (!serverManager.isServerOnline(server)) {
|
||||
LOG.debug("Offline " + region.getRegionNameAsString()
|
||||
+ ", no need to unassign since it's on a dead server: " + server);
|
||||
+ ", no need to unassign since it's on a dead server: " + server);
|
||||
if (transitionInZK) {
|
||||
// delete the node. if no node exists need not bother.
|
||||
deleteClosingOrClosedNode(region, server);
|
||||
|
@ -1995,42 +1994,39 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
return;
|
||||
}
|
||||
long sleepTime = 0;
|
||||
try {
|
||||
// Send CLOSE RPC
|
||||
if (serverManager.sendRegionClose(server, region,
|
||||
versionOfClosingNode, dest, transitionInZK)) {
|
||||
LOG.debug("Sent CLOSE to " + server + " for region " +
|
||||
region.getRegionNameAsString());
|
||||
if (serverManager.sendRegionClose(server, region, versionOfClosingNode, dest,
|
||||
transitionInZK)) {
|
||||
LOG.debug("Sent CLOSE to " + server + " for region " + region.getRegionNameAsString());
|
||||
if (useZKForAssignment && !transitionInZK && state != null) {
|
||||
// Retry to make sure the region is
|
||||
// closed so as to avoid double assignment.
|
||||
unassign(region, state, versionOfClosingNode,
|
||||
dest, transitionInZK, src);
|
||||
unassign(region, state, versionOfClosingNode, dest, transitionInZK, src);
|
||||
}
|
||||
return;
|
||||
}
|
||||
// This never happens. Currently regionserver close always return true.
|
||||
// Todo; this can now happen (0.96) if there is an exception in a coprocessor
|
||||
LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
|
||||
region.getRegionNameAsString());
|
||||
LOG.warn("Server " + server + " region CLOSE RPC returned false for "
|
||||
+ region.getRegionNameAsString());
|
||||
} catch (Throwable t) {
|
||||
long sleepTime = 0;
|
||||
Configuration conf = this.server.getConfiguration();
|
||||
if (t instanceof RemoteException) {
|
||||
t = ((RemoteException)t).unwrapRemoteException();
|
||||
t = ((RemoteException) t).unwrapRemoteException();
|
||||
}
|
||||
boolean logRetries = true;
|
||||
if (t instanceof RegionServerAbortedException
|
||||
|| t instanceof RegionServerStoppedException
|
||||
if (t instanceof RegionServerStoppedException
|
||||
|| t instanceof ServerNotRunningYetException) {
|
||||
// RS is aborting or stopping, we cannot offline the region since the region may need
|
||||
// to do WAL recovery. Until we see the RS expiration, we should retry.
|
||||
// to do WAL recovery. Until we see the RS expiration, we should retry.
|
||||
sleepTime = 1L + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
|
||||
RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
|
||||
|
||||
} else if (t instanceof NotServingRegionException) {
|
||||
LOG.debug("Offline " + region.getRegionNameAsString()
|
||||
+ ", it's not any more on " + server, t);
|
||||
LOG.debug(
|
||||
"Offline " + region.getRegionNameAsString() + ", it's not any more on " + server, t);
|
||||
if (transitionInZK) {
|
||||
deleteClosingOrClosedNode(region, server);
|
||||
}
|
||||
|
@ -2038,25 +2034,24 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
regionOffline(region);
|
||||
}
|
||||
return;
|
||||
} else if ((t instanceof FailedServerException) || (state != null &&
|
||||
t instanceof RegionAlreadyInTransitionException)) {
|
||||
} else if ((t instanceof FailedServerException)
|
||||
|| (state != null && t instanceof RegionAlreadyInTransitionException)) {
|
||||
if (t instanceof FailedServerException) {
|
||||
sleepTime = 1L + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
|
||||
RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
|
||||
RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
|
||||
} else {
|
||||
// RS is already processing this region, only need to update the timestamp
|
||||
LOG.debug("update " + state + " the timestamp.");
|
||||
state.updateTimestampToNow();
|
||||
if (maxWaitTime < 0) {
|
||||
maxWaitTime =
|
||||
EnvironmentEdgeManager.currentTime()
|
||||
+ conf.getLong(ALREADY_IN_TRANSITION_WAITTIME,
|
||||
DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
|
||||
maxWaitTime = EnvironmentEdgeManager.currentTime() + conf.getLong(
|
||||
ALREADY_IN_TRANSITION_WAITTIME, DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
|
||||
}
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
if (now < maxWaitTime) {
|
||||
LOG.debug("Region is already in transition; "
|
||||
+ "waiting up to " + (maxWaitTime - now) + "ms", t);
|
||||
LOG.debug("Region is already in transition; " + "waiting up to "
|
||||
+ (maxWaitTime - now) + "ms",
|
||||
t);
|
||||
sleepTime = 100;
|
||||
i--; // reset the try count
|
||||
logRetries = false;
|
||||
|
@ -2064,28 +2059,40 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
}
|
||||
|
||||
try {
|
||||
if (sleepTime > 0) {
|
||||
Thread.sleep(sleepTime);
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Failed to unassign "
|
||||
+ region.getRegionNameAsString() + " since interrupted", ie);
|
||||
Thread.currentThread().interrupt();
|
||||
if (state != null) {
|
||||
regionStates.updateRegionState(region, State.FAILED_CLOSE);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (logRetries) {
|
||||
LOG.info("Server " + server + " returned " + t + " for "
|
||||
+ region.getRegionNameAsString() + ", try=" + i
|
||||
+ " of " + this.maximumAttempts, t);
|
||||
LOG.info("Server " + server + " returned " + t + " for " + region.getRegionNameAsString()
|
||||
+ ", try=" + i + " of " + this.maximumAttempts,
|
||||
t);
|
||||
// Presume retry or server will expire.
|
||||
}
|
||||
}
|
||||
// sleepTime is set in one of the following cases (reasons commented above):
|
||||
// 1. Region server stopping or aborting
|
||||
// 2. Region already in transition
|
||||
// 3. Connecting to server that is already dead
|
||||
//
|
||||
// If sleepTime is not set by any of the cases, set it to sleep for
|
||||
// configured exponential backoff time
|
||||
if (sleepTime == 0 && i != maximumAttempts) {
|
||||
sleepTime = backoffPolicy.getBackoffTime(retryConfig, i);
|
||||
LOG.info("Waiting for " + sleepTime + "milliseconds exponential backoff time for "
|
||||
+ region.getRegionNameAsString() + " before next retry " + (i + 1) + " of "
|
||||
+ this.maximumAttempts);
|
||||
}
|
||||
try {
|
||||
if (sleepTime > 0 && i != maximumAttempts) {
|
||||
Thread.sleep(sleepTime);
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Failed to unassign " + region.getRegionNameAsString() + " since interrupted", ie);
|
||||
if (state != null) {
|
||||
regionStates.updateRegionState(region, State.FAILED_CLOSE);
|
||||
}
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Run out of attempts
|
||||
if (state != null) {
|
||||
regionStates.updateRegionState(region, State.FAILED_CLOSE);
|
||||
|
@ -2108,54 +2115,57 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
LOG.debug("Force region state offline " + state);
|
||||
}
|
||||
|
||||
switch (state.getState()) {
|
||||
case OPEN:
|
||||
case OPENING:
|
||||
case PENDING_OPEN:
|
||||
case CLOSING:
|
||||
case PENDING_CLOSE:
|
||||
if (!forceNewPlan) {
|
||||
LOG.debug("Skip assigning " +
|
||||
region + ", it is already " + state);
|
||||
return null;
|
||||
// We need a lock on the region as we could update it
|
||||
Lock lock = locker.acquireLock(region.getEncodedName());
|
||||
try {
|
||||
switch (state.getState()) {
|
||||
case OPEN:
|
||||
case OPENING:
|
||||
case PENDING_OPEN:
|
||||
case CLOSING:
|
||||
case PENDING_CLOSE:
|
||||
if (!forceNewPlan) {
|
||||
LOG.debug("Skip assigning " + region + ", it is already " + state);
|
||||
return null;
|
||||
}
|
||||
case FAILED_CLOSE:
|
||||
case FAILED_OPEN:
|
||||
unassign(region, state, -1, null, false, null);
|
||||
state = regionStates.getRegionState(region);
|
||||
if (state.isFailedClose()) {
|
||||
// If we can't close the region, we can't re-assign
|
||||
// it so as to avoid possible double assignment/data loss.
|
||||
LOG.info("Skip assigning " + region + ", we couldn't close it: " + state);
|
||||
return null;
|
||||
}
|
||||
case OFFLINE:
|
||||
// This region could have been open on this server
|
||||
// for a while. If the server is dead and not processed
|
||||
// yet, we can move on only if the meta shows the
|
||||
// region is not on this server actually, or on a server
|
||||
// not dead, or dead and processed already.
|
||||
// In case not using ZK, we don't need this check because
|
||||
// we have the latest info in memory, and the caller
|
||||
// will do another round checking any way.
|
||||
if (useZKForAssignment && regionStates.isServerDeadAndNotProcessed(sn)
|
||||
&& wasRegionOnDeadServerByMeta(region, sn)) {
|
||||
if (!regionStates.isRegionInTransition(region)) {
|
||||
LOG.info(
|
||||
"Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH");
|
||||
regionStates.updateRegionState(region, State.OFFLINE);
|
||||
}
|
||||
LOG.info("Skip assigning " + region.getRegionNameAsString()
|
||||
+ ", it is on a dead but not processed yet server: " + sn);
|
||||
return null;
|
||||
}
|
||||
case CLOSED:
|
||||
break;
|
||||
default:
|
||||
LOG.error("Trying to assign region " + region + ", which is " + state);
|
||||
return null;
|
||||
}
|
||||
case FAILED_CLOSE:
|
||||
case FAILED_OPEN:
|
||||
unassign(region, state, -1, null, false, null);
|
||||
state = regionStates.getRegionState(region);
|
||||
if (state.isFailedClose()) {
|
||||
// If we can't close the region, we can't re-assign
|
||||
// it so as to avoid possible double assignment/data loss.
|
||||
LOG.info("Skip assigning " +
|
||||
region + ", we couldn't close it: " + state);
|
||||
return null;
|
||||
}
|
||||
case OFFLINE:
|
||||
// This region could have been open on this server
|
||||
// for a while. If the server is dead and not processed
|
||||
// yet, we can move on only if the meta shows the
|
||||
// region is not on this server actually, or on a server
|
||||
// not dead, or dead and processed already.
|
||||
// In case not using ZK, we don't need this check because
|
||||
// we have the latest info in memory, and the caller
|
||||
// will do another round checking any way.
|
||||
if (useZKForAssignment
|
||||
&& regionStates.isServerDeadAndNotProcessed(sn)
|
||||
&& wasRegionOnDeadServerByMeta(region, sn)) {
|
||||
if (!regionStates.isRegionInTransition(region)) {
|
||||
LOG.info("Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH");
|
||||
regionStates.updateRegionState(region, State.OFFLINE);
|
||||
}
|
||||
LOG.info("Skip assigning " + region.getRegionNameAsString()
|
||||
+ ", it is on a dead but not processed yet server: " + sn);
|
||||
return null;
|
||||
}
|
||||
case CLOSED:
|
||||
break;
|
||||
default:
|
||||
LOG.error("Trying to assign region " + region
|
||||
+ ", which is " + state);
|
||||
return null;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
@ -3623,7 +3633,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
if (failedOpenTracker.containsKey(regionInfo.getEncodedName())) {
|
||||
// Sleep before reassigning if this region has failed to open before
|
||||
long sleepTime = backoffPolicy.getBackoffTime(retryConfig,
|
||||
getFailedAttempts(regionInfo.getEncodedName()));
|
||||
getFailedAttempts(regionInfo.getEncodedName()));
|
||||
invokeAssignLater(regionInfo, forceNewPlan, sleepTime);
|
||||
} else {
|
||||
// Immediately reassign if this region has never failed an open before
|
||||
|
|
|
@ -32,11 +32,11 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
|
||||
|
@ -72,11 +71,13 @@ import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ConfigUtil;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.util.TestTableName;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
|
@ -84,9 +85,13 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
|||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
|
||||
/**
|
||||
* This tests AssignmentManager with a testing cluster.
|
||||
|
@ -98,6 +103,8 @@ public class TestAssignmentManagerOnCluster {
|
|||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
final static Configuration conf = TEST_UTIL.getConfiguration();
|
||||
private static HBaseAdmin admin;
|
||||
@Rule
|
||||
public TestTableName testTableName = new TestTableName();
|
||||
|
||||
static void setupOnce() throws Exception {
|
||||
// Using the our load balancer to control region plans
|
||||
|
@ -598,6 +605,68 @@ public class TestAssignmentManagerOnCluster {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This tests region close with exponential backoff
|
||||
*/
|
||||
@Test(timeout = 60000)
|
||||
public void testCloseRegionWithExponentialBackOff() throws Exception {
|
||||
TableName tableName = testTableName.getTableName();
|
||||
// Set the backoff time between each retry for failed close
|
||||
TEST_UTIL.getMiniHBaseCluster().getConf().setLong("hbase.assignment.retry.sleep.initial", 1000);
|
||||
HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
|
||||
TEST_UTIL.getMiniHBaseCluster().stopMaster(activeMaster.getServerName());
|
||||
TEST_UTIL.getMiniHBaseCluster().startMaster(); // restart the master for conf take into affect
|
||||
|
||||
try {
|
||||
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
|
||||
new ScheduledThreadPoolExecutor(1, Threads.newDaemonThreadFactory("ExponentialBackOff"));
|
||||
|
||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
desc.addFamily(new HColumnDescriptor(FAMILY));
|
||||
admin.createTable(desc);
|
||||
|
||||
Table meta = new HTable(conf, TableName.META_TABLE_NAME);
|
||||
HRegionInfo hri =
|
||||
new HRegionInfo(desc.getTableName(), Bytes.toBytes("A"), Bytes.toBytes("Z"));
|
||||
MetaTableAccessor.addRegionToMeta(meta, hri);
|
||||
|
||||
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
|
||||
AssignmentManager am = master.getAssignmentManager();
|
||||
assertTrue(TEST_UTIL.assignRegion(hri));
|
||||
ServerName sn = am.getRegionStates().getRegionServerOfRegion(hri);
|
||||
TEST_UTIL.assertRegionOnServer(hri, sn, 6000);
|
||||
|
||||
MyRegionObserver.preCloseEnabled.set(true);
|
||||
// Unset the precloseEnabled flag after 1 second for next retry to succeed
|
||||
scheduledThreadPoolExecutor.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
MyRegionObserver.preCloseEnabled.set(false);
|
||||
}
|
||||
}, 1000, TimeUnit.MILLISECONDS);
|
||||
am.unassign(hri);
|
||||
|
||||
// region may still be assigned now since it's closing,
|
||||
// let's check if it's assigned after it's out of transition
|
||||
am.waitOnRegionToClearRegionsInTransition(hri);
|
||||
|
||||
// region should be closed and re-assigned
|
||||
assertTrue(am.waitForAssignment(hri));
|
||||
ServerName serverName =
|
||||
master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
|
||||
TEST_UTIL.assertRegionOnServer(hri, serverName, 6000);
|
||||
} finally {
|
||||
MyRegionObserver.preCloseEnabled.set(false);
|
||||
TEST_UTIL.deleteTable(tableName);
|
||||
|
||||
// reset the backoff time to default
|
||||
TEST_UTIL.getMiniHBaseCluster().getConf().unset("hbase.assignment.retry.sleep.initial");
|
||||
activeMaster = TEST_UTIL.getMiniHBaseCluster().getMaster();
|
||||
TEST_UTIL.getMiniHBaseCluster().stopMaster(activeMaster.getServerName());
|
||||
TEST_UTIL.getMiniHBaseCluster().startMaster();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This tests region open failed
|
||||
*/
|
||||
|
@ -889,7 +958,7 @@ public class TestAssignmentManagerOnCluster {
|
|||
/**
|
||||
* This tests region close racing with open
|
||||
*/
|
||||
@Test (timeout=60000)
|
||||
@Test(timeout = 60000)
|
||||
public void testOpenCloseRacing() throws Exception {
|
||||
String table = "testOpenCloseRacing";
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue