HBASE-9793 Offline a region before it's closed could cause double assignment
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1533525 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2082bd4967
commit
000596f7c3
|
@ -114,6 +114,10 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
public static final String ASSIGNMENT_TIMEOUT_MANAGEMENT = "hbase.assignment.timeout.management";
|
||||
public static final boolean DEFAULT_ASSIGNMENT_TIMEOUT_MANAGEMENT = false;
|
||||
|
||||
public static final String ALREADY_IN_TRANSITION_WAITTIME
|
||||
= "hbase.assignment.already.intransition.waittime";
|
||||
public static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute
|
||||
|
||||
protected final Server server;
|
||||
|
||||
private ServerManager serverManager;
|
||||
|
@ -1666,7 +1670,12 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
if (state != null) {
|
||||
server = state.getServerName();
|
||||
}
|
||||
long maxWaitTime = -1;
|
||||
for (int i = 1; i <= this.maximumAttempts; i++) {
|
||||
if (this.server.isStopped() || this.server.isAborted()) {
|
||||
LOG.debug("Server stopped/aborted; skipping unassign of " + region);
|
||||
return;
|
||||
}
|
||||
// ClosedRegionhandler can remove the server from this.regions
|
||||
if (!serverManager.isServerOnline(server)) {
|
||||
if (transitionInZK) {
|
||||
|
@ -1685,7 +1694,10 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
LOG.debug("Sent CLOSE to " + server + " for region " +
|
||||
region.getRegionNameAsString());
|
||||
if (!transitionInZK && state != null) {
|
||||
regionOffline(region);
|
||||
// Retry to make sure the region is
|
||||
// closed so as to avoid double assignment.
|
||||
unassign(region, state, versionOfClosingNode,
|
||||
dest, transitionInZK,src);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -1711,13 +1723,36 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
// RS is already processing this region, only need to update the timestamp
|
||||
LOG.debug("update " + state + " the timestamp.");
|
||||
state.updateTimestampToNow();
|
||||
if (maxWaitTime < 0) {
|
||||
maxWaitTime = EnvironmentEdgeManager.currentTimeMillis()
|
||||
+ this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
|
||||
DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
|
||||
}
|
||||
try {
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
if (now < maxWaitTime) {
|
||||
LOG.debug("Region is already in transition; "
|
||||
+ "waiting up to " + (maxWaitTime - now) + "ms", t);
|
||||
Thread.sleep(100);
|
||||
i--; // reset the try count
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Failed to unassign "
|
||||
+ region.getRegionNameAsString() + " since interrupted", ie);
|
||||
Thread.currentThread().interrupt();
|
||||
if (!tomActivated) {
|
||||
regionStates.updateRegionState(region, State.FAILED_CLOSE);
|
||||
}
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
LOG.info("Server " + server + " returned " + t + " for "
|
||||
+ region.getRegionNameAsString() + ", try=" + i
|
||||
+ " of " + this.maximumAttempts, t);
|
||||
// Presume retry or server will expire.
|
||||
}
|
||||
}
|
||||
}
|
||||
// Run out of attempts
|
||||
if (!tomActivated && state != null) {
|
||||
regionStates.updateRegionState(region, State.FAILED_CLOSE);
|
||||
|
@ -1819,10 +1854,15 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
RegionState currentState = state;
|
||||
int versionOfOfflineNode = -1;
|
||||
RegionPlan plan = null;
|
||||
long maxRegionServerStartupWaitTime = -1;
|
||||
long maxWaitTime = -1;
|
||||
HRegionInfo region = state.getRegion();
|
||||
RegionOpeningState regionOpenState;
|
||||
for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
|
||||
for (int i = 1; i <= maximumAttempts; i++) {
|
||||
if (server.isStopped() || server.isAborted()) {
|
||||
LOG.info("Skip assigning " + region.getRegionNameAsString()
|
||||
+ ", the server is stopped/aborted");
|
||||
return;
|
||||
}
|
||||
if (plan == null) { // Get a server for the region at first
|
||||
try {
|
||||
plan = getRegionPlan(region, forceNewPlan);
|
||||
|
@ -1883,10 +1923,6 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
continue;
|
||||
}
|
||||
}
|
||||
if (this.server.isStopped() || this.server.isAborted()) {
|
||||
LOG.debug("Server stopped/aborted; skipping assign of " + region);
|
||||
return;
|
||||
}
|
||||
LOG.info("Assigning " + region.getRegionNameAsString() +
|
||||
" to " + plan.getDestination().toString());
|
||||
// Transition RegionState to PENDING_OPEN
|
||||
|
@ -1942,20 +1978,25 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
|
||||
"try=" + i + " of " + this.maximumAttempts, t);
|
||||
|
||||
if (maxRegionServerStartupWaitTime < 0) {
|
||||
maxRegionServerStartupWaitTime = EnvironmentEdgeManager.currentTimeMillis() +
|
||||
this.server.getConfiguration().
|
||||
if (maxWaitTime < 0) {
|
||||
if (t instanceof RegionAlreadyInTransitionException) {
|
||||
maxWaitTime = EnvironmentEdgeManager.currentTimeMillis()
|
||||
+ this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
|
||||
DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
|
||||
} else {
|
||||
maxWaitTime = this.server.getConfiguration().
|
||||
getLong("hbase.regionserver.rpc.startup.waittime", 60000);
|
||||
}
|
||||
}
|
||||
try {
|
||||
needNewPlan = false;
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
if (now < maxRegionServerStartupWaitTime) {
|
||||
LOG.debug("Server is not yet up; waiting up to " +
|
||||
(maxRegionServerStartupWaitTime - now) + "ms", t);
|
||||
if (now < maxWaitTime) {
|
||||
LOG.debug("Server is not yet up or region is already in transition; "
|
||||
+ "waiting up to " + (maxWaitTime - now) + "ms", t);
|
||||
Thread.sleep(100);
|
||||
i--; // reset the try count
|
||||
needNewPlan = false;
|
||||
} else {
|
||||
} else if (!(t instanceof RegionAlreadyInTransitionException)) {
|
||||
LOG.debug("Server is not up for a while; try a new one", t);
|
||||
needNewPlan = true;
|
||||
}
|
||||
|
|
|
@ -553,6 +553,8 @@ public class TestAssignmentManagerOnCluster {
|
|||
// Unassign it again forcefully so that we can trigger already
|
||||
// in transition exception. This test is to make sure this scenario
|
||||
// is handled properly.
|
||||
am.server.getConfiguration().setLong(
|
||||
AssignmentManager.ALREADY_IN_TRANSITION_WAITTIME, 1000);
|
||||
am.unassign(hri, true);
|
||||
RegionState state = am.getRegionStates().getRegionState(hri);
|
||||
assertEquals(RegionState.State.FAILED_CLOSE, state.getState());
|
||||
|
|
Loading…
Reference in New Issue