HBASE-20173 [AMv2] DisableTableProcedure concurrent to ServerCrashProcedure can deadlock
Allow that DisableTableProcedue can grab a region lock before ServerCrashProcedure can. Cater to this cricumstance where SCP was not unable to make progress by running the search for RIT against the crashed server a second time, post creation of all crashed-server assignemnts. The second run will uncover such as the above DisableTableProcedure unassign and will interrupt its suspend allowing both procedures to make progress. M hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto Add new procedure step post-assigns that reruns the RIT finder method. M hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java Make this important log more specific as to what is going on. M hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java Better explanation as to what is going on. M hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java Add extra step and run handleRIT a second time after we've queued up all SCP assigns. Also fix a but. SCP was adding an assign of a RIT that was actually trying to unassign (made the deadlock more likely).
This commit is contained in:
parent
e80ce3d91b
commit
260ee0da60
|
@ -53,12 +53,12 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* Integration test that verifies Procedure V2. <br/><br/>
|
* Integration test that verifies Procedure V2.
|
||||||
*
|
*
|
||||||
* DDL operations should go through (rollforward or rollback) when primary master is killed by
|
* DDL operations should go through (rollforward or rollback) when primary master is killed by
|
||||||
* ChaosMonkey (default MASTER_KILLING)<br/><br/>
|
* ChaosMonkey (default MASTER_KILLING).
|
||||||
*
|
*
|
||||||
* Multiple Worker threads are started to randomly do the following Actions in loops:<br/>
|
* <p></p>Multiple Worker threads are started to randomly do the following Actions in loops:
|
||||||
* Actions generating and populating tables:
|
* Actions generating and populating tables:
|
||||||
* <ul>
|
* <ul>
|
||||||
* <li>CreateTableAction</li>
|
* <li>CreateTableAction</li>
|
||||||
|
|
|
@ -302,6 +302,7 @@ enum ServerCrashState {
|
||||||
// Removed SERVER_CRASH_CALC_REGIONS_TO_ASSIGN = 7;
|
// Removed SERVER_CRASH_CALC_REGIONS_TO_ASSIGN = 7;
|
||||||
SERVER_CRASH_ASSIGN = 8;
|
SERVER_CRASH_ASSIGN = 8;
|
||||||
SERVER_CRASH_WAIT_ON_ASSIGN = 9;
|
SERVER_CRASH_WAIT_ON_ASSIGN = 9;
|
||||||
|
SERVER_CRASH_HANDLE_RIT2 = 20;
|
||||||
SERVER_CRASH_FINISH = 100;
|
SERVER_CRASH_FINISH = 100;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -412,4 +413,4 @@ message AddPeerStateData {
|
||||||
|
|
||||||
message UpdatePeerConfigStateData {
|
message UpdatePeerConfigStateData {
|
||||||
required ReplicationPeer peer_config = 1;
|
required ReplicationPeer peer_config = 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1196,7 +1196,7 @@ public class AssignmentManager implements ServerListener {
|
||||||
private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) {
|
private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) {
|
||||||
final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
|
final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
|
||||||
//if (regionNode.isStuck()) {
|
//if (regionNode.isStuck()) {
|
||||||
LOG.warn("TODO Handle stuck in transition: " + regionNode);
|
LOG.warn("STUCK Region-In-Transition {}", regionNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ============================================================================================
|
// ============================================================================================
|
||||||
|
|
|
@ -249,10 +249,10 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
||||||
final IOException exception) {
|
final IOException exception) {
|
||||||
// TODO: Is there on-going rpc to cleanup?
|
// TODO: Is there on-going rpc to cleanup?
|
||||||
if (exception instanceof ServerCrashException) {
|
if (exception instanceof ServerCrashException) {
|
||||||
// This exception comes from ServerCrashProcedure after log splitting.
|
// This exception comes from ServerCrashProcedure AFTER log splitting.
|
||||||
// SCP found this region as a RIT. Its call into here says it is ok to let this procedure go
|
// SCP found this region as a RIT. Its call into here says it is ok to let this procedure go
|
||||||
// on to a complete close now. This will release lock on this region so subsequent action on
|
// complete. This complete will release lock on this region so subsequent action on region
|
||||||
// region can succeed; e.g. the assign that follows this unassign when a move (w/o wait on SCP
|
// can succeed; e.g. the assign that follows this unassign when a move (w/o wait on SCP
|
||||||
// the assign could run w/o logs being split so data loss).
|
// the assign could run w/o logs being split so data loss).
|
||||||
try {
|
try {
|
||||||
reportTransition(env, regionNode, TransitionCode.CLOSED, HConstants.NO_SEQNUM);
|
reportTransition(env, regionNode, TransitionCode.CLOSED, HConstants.NO_SEQNUM);
|
||||||
|
@ -263,7 +263,6 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
||||||
} else if (exception instanceof RegionServerAbortedException ||
|
} else if (exception instanceof RegionServerAbortedException ||
|
||||||
exception instanceof RegionServerStoppedException ||
|
exception instanceof RegionServerStoppedException ||
|
||||||
exception instanceof ServerNotRunningYetException) {
|
exception instanceof ServerNotRunningYetException) {
|
||||||
// TODO
|
|
||||||
// RS is aborting, we cannot offline the region since the region may need to do WAL
|
// RS is aborting, we cannot offline the region since the region may need to do WAL
|
||||||
// recovery. Until we see the RS expiration, we should retry.
|
// recovery. Until we see the RS expiration, we should retry.
|
||||||
// TODO: This should be suspend like the below where we call expire on server?
|
// TODO: This should be suspend like the below where we call expire on server?
|
||||||
|
@ -276,8 +275,10 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
||||||
LOG.warn("Expiring server " + this + "; " + regionNode.toShortString() +
|
LOG.warn("Expiring server " + this + "; " + regionNode.toShortString() +
|
||||||
", exception=" + exception);
|
", exception=" + exception);
|
||||||
env.getMasterServices().getServerManager().expireServer(regionNode.getRegionLocation());
|
env.getMasterServices().getServerManager().expireServer(regionNode.getRegionLocation());
|
||||||
// Return false so this procedure stays in suspended state. It will be woken up by a
|
// Return false so this procedure stays in suspended state. It will be woken up by the
|
||||||
// ServerCrashProcedure when it notices this RIT.
|
// ServerCrashProcedure that was scheduled when we called #expireServer above. SCP calls
|
||||||
|
// #handleRIT which will call this method only the exception will be a ServerCrashException
|
||||||
|
// this time around (See above).
|
||||||
// TODO: Add a SCP as a new subprocedure that we now come to depend on.
|
// TODO: Add a SCP as a new subprocedure that we now come to depend on.
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -159,6 +159,8 @@ public class RecoverMetaProcedure
|
||||||
* just as we do over in ServerCrashProcedure#handleRIT except less to do here; less context
|
* just as we do over in ServerCrashProcedure#handleRIT except less to do here; less context
|
||||||
* to carry.
|
* to carry.
|
||||||
*/
|
*/
|
||||||
|
// NOTE: Make sure any fix or improvement done here is also done in SCP#handleRIT; the methods
|
||||||
|
// have overlap.
|
||||||
private void handleRIT(MasterProcedureEnv env, RegionInfo ri, ServerName crashedServerName) {
|
private void handleRIT(MasterProcedureEnv env, RegionInfo ri, ServerName crashedServerName) {
|
||||||
AssignmentManager am = env.getAssignmentManager();
|
AssignmentManager am = env.getAssignmentManager();
|
||||||
RegionTransitionProcedure rtp = am.getRegionStates().getRegionTransitionProcedure(ri);
|
RegionTransitionProcedure rtp = am.getRegionStates().getRegionTransitionProcedure(ri);
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -28,7 +29,6 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||||
import org.apache.hadoop.hbase.master.MasterServices;
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
import org.apache.hadoop.hbase.master.MasterWalManager;
|
import org.apache.hadoop.hbase.master.MasterWalManager;
|
||||||
import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
|
|
||||||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||||
import org.apache.hadoop.hbase.master.assignment.RegionTransitionProcedure;
|
import org.apache.hadoop.hbase.master.assignment.RegionTransitionProcedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
||||||
|
@ -114,68 +114,79 @@ implements ServerProcedureInterface {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
switch (state) {
|
switch (state) {
|
||||||
case SERVER_CRASH_START:
|
case SERVER_CRASH_START:
|
||||||
LOG.info("Start " + this);
|
LOG.info("Start " + this);
|
||||||
// If carrying meta, process it first. Else, get list of regions on crashed server.
|
// If carrying meta, process it first. Else, get list of regions on crashed server.
|
||||||
if (this.carryingMeta) {
|
if (this.carryingMeta) {
|
||||||
setNextState(ServerCrashState.SERVER_CRASH_PROCESS_META);
|
setNextState(ServerCrashState.SERVER_CRASH_PROCESS_META);
|
||||||
} else {
|
} else {
|
||||||
setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
|
setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
case SERVER_CRASH_GET_REGIONS:
|
|
||||||
// If hbase:meta is not assigned, yield.
|
|
||||||
if (env.getAssignmentManager().waitMetaLoaded(this)) {
|
|
||||||
throw new ProcedureSuspendedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
this.regionsOnCrashedServer = services.getAssignmentManager().getRegionStates()
|
|
||||||
.getServerRegionInfoSet(serverName);
|
|
||||||
// Where to go next? Depends on whether we should split logs at all or
|
|
||||||
// if we should do distributed log splitting.
|
|
||||||
if (!this.shouldSplitWal) {
|
|
||||||
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
|
|
||||||
} else {
|
|
||||||
setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
case SERVER_CRASH_PROCESS_META:
|
|
||||||
processMeta(env);
|
|
||||||
setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case SERVER_CRASH_SPLIT_LOGS:
|
|
||||||
splitLogs(env);
|
|
||||||
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case SERVER_CRASH_ASSIGN:
|
|
||||||
// If no regions to assign, skip assign and skip to the finish.
|
|
||||||
// Filter out meta regions. Those are handled elsewhere in this procedure.
|
|
||||||
// Filter changes this.regionsOnCrashedServer.
|
|
||||||
if (filterDefaultMetaRegions(regionsOnCrashedServer)) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Assigning regions " +
|
|
||||||
RegionInfo.getShortNameToLog(regionsOnCrashedServer) + ", " + this +
|
|
||||||
"; cycles=" + getCycles());
|
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case SERVER_CRASH_GET_REGIONS:
|
||||||
|
// If hbase:meta is not assigned, yield.
|
||||||
|
if (env.getAssignmentManager().waitMetaLoaded(this)) {
|
||||||
|
throw new ProcedureSuspendedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
this.regionsOnCrashedServer = services.getAssignmentManager().getRegionStates()
|
||||||
|
.getServerRegionInfoSet(serverName);
|
||||||
|
// Where to go next? Depends on whether we should split logs at all or
|
||||||
|
// if we should do distributed log splitting.
|
||||||
|
if (!this.shouldSplitWal) {
|
||||||
|
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
|
||||||
|
} else {
|
||||||
|
setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case SERVER_CRASH_PROCESS_META:
|
||||||
|
processMeta(env);
|
||||||
|
setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case SERVER_CRASH_SPLIT_LOGS:
|
||||||
|
splitLogs(env);
|
||||||
|
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case SERVER_CRASH_ASSIGN:
|
||||||
|
// If no regions to assign, skip assign and skip to the finish.
|
||||||
|
// Filter out meta regions. Those are handled elsewhere in this procedure.
|
||||||
|
// Filter changes this.regionsOnCrashedServer.
|
||||||
|
if (filterDefaultMetaRegions(regionsOnCrashedServer)) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Assigning regions " +
|
||||||
|
RegionInfo.getShortNameToLog(regionsOnCrashedServer) + ", " + this +
|
||||||
|
"; cycles=" + getCycles());
|
||||||
|
}
|
||||||
|
// Handle RIT against crashed server. Will cancel any ongoing assigns/unassigns.
|
||||||
|
// Returns list of regions we need to reassign.
|
||||||
|
List<RegionInfo> toAssign = handleRIT(env, regionsOnCrashedServer);
|
||||||
|
AssignmentManager am = env.getAssignmentManager();
|
||||||
|
// CreateAssignProcedure will try to use the old location for the region deploy.
|
||||||
|
addChildProcedure(am.createAssignProcedures(toAssign));
|
||||||
|
setNextState(ServerCrashState.SERVER_CRASH_HANDLE_RIT2);
|
||||||
|
} else {
|
||||||
|
setNextState(ServerCrashState.SERVER_CRASH_FINISH);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case SERVER_CRASH_HANDLE_RIT2:
|
||||||
|
// Run the handleRIT again for case where another procedure managed to grab the lock on
|
||||||
|
// a region ahead of this crash handling procedure. Can happen in rare case. See
|
||||||
handleRIT(env, regionsOnCrashedServer);
|
handleRIT(env, regionsOnCrashedServer);
|
||||||
AssignmentManager am = env.getAssignmentManager();
|
setNextState(ServerCrashState.SERVER_CRASH_FINISH);
|
||||||
// createAssignProcedure will try to use the old location for the region deploy.
|
break;
|
||||||
addChildProcedure(am.createAssignProcedures(regionsOnCrashedServer));
|
|
||||||
}
|
|
||||||
setNextState(ServerCrashState.SERVER_CRASH_FINISH);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case SERVER_CRASH_FINISH:
|
case SERVER_CRASH_FINISH:
|
||||||
services.getAssignmentManager().getRegionStates().removeServer(serverName);
|
services.getAssignmentManager().getRegionStates().removeServer(serverName);
|
||||||
services.getServerManager().getDeadServers().finish(serverName);
|
services.getServerManager().getDeadServers().finish(serverName);
|
||||||
return Flow.NO_MORE_STATE;
|
return Flow.NO_MORE_STATE;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Failed state=" + state + ", retry " + this + "; cycles=" + getCycles(), e);
|
LOG.warn("Failed state=" + state + ", retry " + this + "; cycles=" + getCycles(), e);
|
||||||
|
@ -360,18 +371,24 @@ implements ServerProcedureInterface {
|
||||||
* otherwise we have two assigns going on and they will fight over who has lock.
|
* otherwise we have two assigns going on and they will fight over who has lock.
|
||||||
* Notify Unassigns. If unable to unassign because server went away, unassigns block waiting
|
* Notify Unassigns. If unable to unassign because server went away, unassigns block waiting
|
||||||
* on the below callback from a ServerCrashProcedure before proceeding.
|
* on the below callback from a ServerCrashProcedure before proceeding.
|
||||||
* @param env
|
* @param regions Regions on the Crashed Server.
|
||||||
* @param regions Regions that were on crashed server
|
* @return List of regions we should assign to new homes (not same as regions on crashed server).
|
||||||
*/
|
*/
|
||||||
private void handleRIT(final MasterProcedureEnv env, final List<RegionInfo> regions) {
|
private List<RegionInfo> handleRIT(final MasterProcedureEnv env, List<RegionInfo> regions) {
|
||||||
if (regions == null) return;
|
if (regions == null || regions.isEmpty()) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
AssignmentManager am = env.getMasterServices().getAssignmentManager();
|
AssignmentManager am = env.getMasterServices().getAssignmentManager();
|
||||||
final Iterator<RegionInfo> it = regions.iterator();
|
List<RegionInfo> toAssign = new ArrayList<RegionInfo>(regions);
|
||||||
|
// Get an iterator so can remove items.
|
||||||
|
final Iterator<RegionInfo> it = toAssign.iterator();
|
||||||
ServerCrashException sce = null;
|
ServerCrashException sce = null;
|
||||||
while (it.hasNext()) {
|
while (it.hasNext()) {
|
||||||
final RegionInfo hri = it.next();
|
final RegionInfo hri = it.next();
|
||||||
RegionTransitionProcedure rtp = am.getRegionStates().getRegionTransitionProcedure(hri);
|
RegionTransitionProcedure rtp = am.getRegionStates().getRegionTransitionProcedure(hri);
|
||||||
if (rtp == null) continue;
|
if (rtp == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
// Make sure the RIT is against this crashed server. In the case where there are many
|
// Make sure the RIT is against this crashed server. In the case where there are many
|
||||||
// processings of a crashed server -- backed up for whatever reason (slow WAL split) --
|
// processings of a crashed server -- backed up for whatever reason (slow WAL split) --
|
||||||
// then a previous SCP may have already failed an assign, etc., and it may have a new
|
// then a previous SCP may have already failed an assign, etc., and it may have a new
|
||||||
|
@ -389,11 +406,14 @@ implements ServerProcedureInterface {
|
||||||
sce = new ServerCrashException(getProcId(), getServerName());
|
sce = new ServerCrashException(getProcId(), getServerName());
|
||||||
}
|
}
|
||||||
rtp.remoteCallFailed(env, this.serverName, sce);
|
rtp.remoteCallFailed(env, this.serverName, sce);
|
||||||
if (rtp instanceof AssignProcedure) {
|
// If an assign, remove from passed-in list of regions so we subsequently do not create
|
||||||
// If an assign, include it in our return and remove from passed-in list of regions.
|
// a new assign; the exisitng assign after the call to remoteCallFailed will recalibrate
|
||||||
it.remove();
|
// and assign to a server other than the crashed one; no need to create new assign.
|
||||||
}
|
// If an unassign, do not return this region; the above cancel will wake up the unassign and
|
||||||
|
// it will complete. Done.
|
||||||
|
it.remove();
|
||||||
}
|
}
|
||||||
|
return toAssign;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue