HBASE-20137 TestRSGroups is flakey
On failed RPC we expire the server and suspend expecting the resultant ServerCrashProcedure to wake us back up again. In tests, TestRSGroup hung because it failed to schedule a server expiration because the server was already expired undergoing processing (the test was shutting down). Deal with this case by having expire servers return false if unable to expire. Callers will then know where a ServerCrashProcedure has been scheduled or not. M hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Have expireServer return true if successful. M hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java The log that included an exception whose message was the current procedure as a String totally baffled me. Make it more obvious what exception is. M hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java If failed expire of a server, wake our procedure -- do not suspend -- and presume ok to move region to CLOSED state (because going down or concurrent crashed server processing ongoing).
This commit is contained in:
parent
7889df3711
commit
1f5e93a8f8
|
@ -555,15 +555,17 @@ public class ServerManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Expire the passed server. Add it to list of dead servers and queue a
|
* Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
|
||||||
* shutdown processing.
|
* @return True if we expired passed <code>serverName</code> else false if we failed to schedule
|
||||||
|
* an expire (and attendant ServerCrashProcedure -- some clients are dependent on
|
||||||
|
* server crash procedure being queued and need to know if has not been queued).
|
||||||
*/
|
*/
|
||||||
public synchronized void expireServer(final ServerName serverName) {
|
public synchronized boolean expireServer(final ServerName serverName) {
|
||||||
if (serverName.equals(master.getServerName())) {
|
if (serverName.equals(master.getServerName())) {
|
||||||
if (!(master.isAborted() || master.isStopped())) {
|
if (!(master.isAborted() || master.isStopped())) {
|
||||||
master.stop("We lost our znode?");
|
master.stop("We lost our znode?");
|
||||||
}
|
}
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
if (!master.isServerCrashProcessingEnabled()) {
|
if (!master.isServerCrashProcessingEnabled()) {
|
||||||
LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
|
LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
|
||||||
|
@ -573,13 +575,13 @@ public class ServerManager {
|
||||||
// the SCP is not enable yet and Meta's RIT may be suspend forever. See HBase-19287
|
// the SCP is not enable yet and Meta's RIT may be suspend forever. See HBase-19287
|
||||||
master.getAssignmentManager().handleMetaRITOnCrashedServer(serverName);
|
master.getAssignmentManager().handleMetaRITOnCrashedServer(serverName);
|
||||||
this.queuedDeadServers.add(serverName);
|
this.queuedDeadServers.add(serverName);
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
if (this.deadservers.isDeadServer(serverName)) {
|
if (this.deadservers.isDeadServer(serverName)) {
|
||||||
// TODO: Can this happen? It shouldn't be online in this case?
|
// TODO: Can this happen? It shouldn't be online in this case?
|
||||||
LOG.warn("Expiration of " + serverName +
|
LOG.warn("Expiration of " + serverName +
|
||||||
" but server shutdown already in progress");
|
" but server shutdown already in progress");
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
moveFromOnlineToDeadServers(serverName);
|
moveFromOnlineToDeadServers(serverName);
|
||||||
|
|
||||||
|
@ -591,7 +593,7 @@ public class ServerManager {
|
||||||
if (this.onlineServers.isEmpty()) {
|
if (this.onlineServers.isEmpty()) {
|
||||||
master.stop("Cluster shutdown set; onlineServer=0");
|
master.stop("Cluster shutdown set; onlineServer=0");
|
||||||
}
|
}
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
|
LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
|
||||||
master.getAssignmentManager().submitServerCrash(serverName, true);
|
master.getAssignmentManager().submitServerCrash(serverName, true);
|
||||||
|
@ -602,6 +604,7 @@ public class ServerManager {
|
||||||
listener.serverRemoved(serverName);
|
listener.serverRemoved(serverName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
|
@ -21,12 +21,17 @@ import org.apache.hadoop.hbase.HBaseIOException;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used internally signaling failed queue of a remote procedure
|
* Used internally signaling failed queue of a remote procedure operation.
|
||||||
* operation.
|
* Usually happens because no such remote server; it is being processed as crashed so it is not
|
||||||
|
* online at time of RPC. Otherwise, something unexpected happened.
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("serial")
|
@SuppressWarnings("serial")
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class FailedRemoteDispatchException extends HBaseIOException {
|
public class FailedRemoteDispatchException extends HBaseIOException {
|
||||||
|
public FailedRemoteDispatchException() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
public FailedRemoteDispatchException(String msg) {
|
public FailedRemoteDispatchException(String msg) {
|
||||||
super(msg);
|
super(msg);
|
||||||
}
|
}
|
||||||
|
|
|
@ -177,10 +177,8 @@ public abstract class RegionTransitionProcedure
|
||||||
public void remoteCallFailed(final MasterProcedureEnv env,
|
public void remoteCallFailed(final MasterProcedureEnv env,
|
||||||
final ServerName serverName, final IOException exception) {
|
final ServerName serverName, final IOException exception) {
|
||||||
final RegionStateNode regionNode = getRegionState(env);
|
final RegionStateNode regionNode = getRegionState(env);
|
||||||
String msg = exception.getMessage() == null? exception.getClass().getSimpleName():
|
LOG.warn("Remote call failed {}; rit={}, exception={}", this, regionNode.getState(),
|
||||||
exception.getMessage();
|
exception.toString());
|
||||||
LOG.warn("Remote call failed " + this + "; " + regionNode.toShortString() +
|
|
||||||
"; exception=" + msg);
|
|
||||||
if (remoteCallFailed(env, regionNode, exception)) {
|
if (remoteCallFailed(env, regionNode, exception)) {
|
||||||
// NOTE: This call to wakeEvent puts this Procedure back on the scheduler.
|
// NOTE: This call to wakeEvent puts this Procedure back on the scheduler.
|
||||||
// Thereafter, another Worker can be in here so DO NOT MESS WITH STATE beyond
|
// Thereafter, another Worker can be in here so DO NOT MESS WITH STATE beyond
|
||||||
|
@ -215,9 +213,14 @@ public abstract class RegionTransitionProcedure
|
||||||
// backtrack on stuff like the 'suspend' done above -- tricky as the 'wake' requests us -- and
|
// backtrack on stuff like the 'suspend' done above -- tricky as the 'wake' requests us -- and
|
||||||
// ditto up in the caller; it needs to undo state changes. Inside in remoteCallFailed, it does
|
// ditto up in the caller; it needs to undo state changes. Inside in remoteCallFailed, it does
|
||||||
// wake to undo the above suspend.
|
// wake to undo the above suspend.
|
||||||
|
//
|
||||||
|
// We fail the addOperationToNode usually because there is no such remote server (it has
|
||||||
|
// crashed and we are currently processing it or something went badly wrong and we have a
|
||||||
|
// bad server).
|
||||||
if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
|
if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
|
||||||
remoteCallFailed(env, targetServer,
|
remoteCallFailed(env, targetServer, targetServer == null?
|
||||||
new FailedRemoteDispatchException(this + " to " + targetServer));
|
new FailedRemoteDispatchException():
|
||||||
|
new FailedRemoteDispatchException(targetServer.toShortString()));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -249,17 +249,12 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
||||||
final IOException exception) {
|
final IOException exception) {
|
||||||
// TODO: Is there on-going rpc to cleanup?
|
// TODO: Is there on-going rpc to cleanup?
|
||||||
if (exception instanceof ServerCrashException) {
|
if (exception instanceof ServerCrashException) {
|
||||||
// This exception comes from ServerCrashProcedure after log splitting.
|
// This exception comes from ServerCrashProcedure after it is done with log splitting.
|
||||||
// SCP found this region as a RIT. Its call into here says it is ok to let this procedure go
|
// SCP found this region as a Region-In-Transition (RIT). Its call into here says it is ok to
|
||||||
// on to a complete close now. This will release lock on this region so subsequent action on
|
// let this procedure go on to a complete close now. This will release lock on this region so
|
||||||
// region can succeed; e.g. the assign that follows this unassign when a move (w/o wait on SCP
|
// subsequent action on region can succeed; e.g. the assign that follows this unassign when
|
||||||
// the assign could run w/o logs being split so data loss).
|
// a move (w/o wait on SCP the assign could run w/o logs being split so data loss).
|
||||||
try {
|
reportTransitionCLOSED(env, regionNode);
|
||||||
reportTransition(env, regionNode, TransitionCode.CLOSED, HConstants.NO_SEQNUM);
|
|
||||||
} catch (UnexpectedStateException e) {
|
|
||||||
// Should never happen.
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
} else if (exception instanceof RegionServerAbortedException ||
|
} else if (exception instanceof RegionServerAbortedException ||
|
||||||
exception instanceof RegionServerStoppedException ||
|
exception instanceof RegionServerStoppedException ||
|
||||||
exception instanceof ServerNotRunningYetException) {
|
exception instanceof ServerNotRunningYetException) {
|
||||||
|
@ -273,17 +268,33 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
||||||
exception);
|
exception);
|
||||||
setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH);
|
setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH);
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Expiring server " + this + "; " + regionNode.toShortString() +
|
LOG.warn("Expiring server {}; rit={}, exception={}", this, regionNode.getState(),
|
||||||
", exception=" + exception);
|
exception.toString());
|
||||||
env.getMasterServices().getServerManager().expireServer(regionNode.getRegionLocation());
|
if (env.getMasterServices().getServerManager().expireServer(regionNode.getRegionLocation())) {
|
||||||
// Return false so this procedure stays in suspended state. It will be woken up by a
|
// Return false so this procedure stays in suspended state. It will be woken up by
|
||||||
// ServerCrashProcedure when it notices this RIT.
|
// ServerCrashProcedure when it notices this RIT and calls this method again but with
|
||||||
|
// a SCPException -- see above.
|
||||||
// TODO: Add a SCP as a new subprocedure that we now come to depend on.
|
// TODO: Add a SCP as a new subprocedure that we now come to depend on.
|
||||||
return false;
|
return false;
|
||||||
|
} else {
|
||||||
|
LOG.warn("Failed expire of {}; presumed CRASHED; moving region to CLOSED state",
|
||||||
|
regionNode.getRegionLocation());
|
||||||
|
reportTransitionCLOSED(env, regionNode);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void reportTransitionCLOSED(final MasterProcedureEnv env,
|
||||||
|
final RegionStateNode regionNode) {
|
||||||
|
try {
|
||||||
|
reportTransition(env, regionNode, TransitionCode.CLOSED, HConstants.NO_SEQNUM);
|
||||||
|
} catch (UnexpectedStateException e) {
|
||||||
|
// Should never happen.
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void toStringClassDetails(StringBuilder sb) {
|
public void toStringClassDetails(StringBuilder sb) {
|
||||||
super.toStringClassDetails(sb);
|
super.toStringClassDetails(sb);
|
||||||
|
|
|
@ -712,7 +712,7 @@ public class TestAssignmentManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
private class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor {
|
private class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor {
|
||||||
public static final int TYPES_OF_FAILURE = 6;
|
public static final int TYPES_OF_FAILURE = 7;
|
||||||
private int invocations;
|
private int invocations;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -724,6 +724,14 @@ public class TestAssignmentManager {
|
||||||
case 2: throw new RegionServerStoppedException("Fake!");
|
case 2: throw new RegionServerStoppedException("Fake!");
|
||||||
case 3: throw new ServerNotRunningYetException("Fake!");
|
case 3: throw new ServerNotRunningYetException("Fake!");
|
||||||
case 4:
|
case 4:
|
||||||
|
// We will expire the server that we failed to rpc against.
|
||||||
|
throw new FailedRemoteDispatchException("Fake!");
|
||||||
|
case 5:
|
||||||
|
// Mark this regionserver as already expiring so we go different code route; i.e. we
|
||||||
|
// FAIL to expire the remote server and presume ok to move region to CLOSED. HBASE-20137.
|
||||||
|
TestAssignmentManager.this.master.getServerManager().expireServer(server);
|
||||||
|
throw new FailedRemoteDispatchException("Fake!");
|
||||||
|
case 6:
|
||||||
LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout");
|
LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout");
|
||||||
executor.schedule(new Runnable() {
|
executor.schedule(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue