HBASE-21647 Add status track for splitting WAL tasks
This commit is contained in:
parent
11a0793d72
commit
87d4ab8669
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
|
||||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||||
import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProcedure;
|
import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProcedure;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||||
|
@ -398,6 +399,8 @@ public class TransitRegionStateProcedure
|
||||||
regionNode.setOpenSeqNum(openSeqNum);
|
regionNode.setOpenSeqNum(openSeqNum);
|
||||||
env.getAssignmentManager().regionOpened(regionNode);
|
env.getAssignmentManager().regionOpened(regionNode);
|
||||||
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) {
|
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) {
|
||||||
|
// if parent procedure is ServerCrashProcedure, update progress
|
||||||
|
ServerCrashProcedure.updateProgress(env, getParentProcId());
|
||||||
// we are done
|
// we are done
|
||||||
regionNode.unsetProcedure(this);
|
regionNode.unsetProcedure(this);
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,8 @@ import org.apache.hadoop.hbase.master.SplitWALManager;
|
||||||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||||
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
|
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
|
||||||
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
|
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
|
||||||
|
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
|
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||||
|
@ -82,6 +84,10 @@ public class ServerCrashProcedure
|
||||||
|
|
||||||
private boolean carryingMeta = false;
|
private boolean carryingMeta = false;
|
||||||
private boolean shouldSplitWal;
|
private boolean shouldSplitWal;
|
||||||
|
private MonitoredTask status;
|
||||||
|
// currentRunningState is updated when ServerCrashProcedure get scheduled, child procedures update
|
||||||
|
// progress will not update the state because the actual state is overwritten by its next state
|
||||||
|
private ServerCrashState currentRunningState = getInitialState();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Call this constructor queuing up a Procedure.
|
* Call this constructor queuing up a Procedure.
|
||||||
|
@ -113,6 +119,7 @@ public class ServerCrashProcedure
|
||||||
throws ProcedureSuspendedException, ProcedureYieldException {
|
throws ProcedureSuspendedException, ProcedureYieldException {
|
||||||
final MasterServices services = env.getMasterServices();
|
final MasterServices services = env.getMasterServices();
|
||||||
final AssignmentManager am = env.getAssignmentManager();
|
final AssignmentManager am = env.getAssignmentManager();
|
||||||
|
updateProgress(true);
|
||||||
// HBASE-14802
|
// HBASE-14802
|
||||||
// If we have not yet notified that we are processing a dead server, we should do now.
|
// If we have not yet notified that we are processing a dead server, we should do now.
|
||||||
if (!notifiedDeadServer) {
|
if (!notifiedDeadServer) {
|
||||||
|
@ -224,6 +231,7 @@ public class ServerCrashProcedure
|
||||||
LOG.info("removed crashed server {} after splitting done", serverName);
|
LOG.info("removed crashed server {} after splitting done", serverName);
|
||||||
services.getAssignmentManager().getRegionStates().removeServer(serverName);
|
services.getAssignmentManager().getRegionStates().removeServer(serverName);
|
||||||
services.getServerManager().getDeadServers().finish(serverName);
|
services.getServerManager().getDeadServers().finish(serverName);
|
||||||
|
updateProgress(true);
|
||||||
return Flow.NO_MORE_STATE;
|
return Flow.NO_MORE_STATE;
|
||||||
default:
|
default:
|
||||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||||
|
@ -299,6 +307,25 @@ public class ServerCrashProcedure
|
||||||
LOG.debug("Done splitting WALs {}", this);
|
LOG.debug("Done splitting WALs {}", this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void updateProgress(boolean updateState) {
|
||||||
|
String msg = "Processing ServerCrashProcedure of " + serverName;
|
||||||
|
if (status == null) {
|
||||||
|
status = TaskMonitor.get().createStatus(msg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (currentRunningState == ServerCrashState.SERVER_CRASH_FINISH) {
|
||||||
|
status.markComplete(msg + " done");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (updateState) {
|
||||||
|
currentRunningState = getCurrentState();
|
||||||
|
}
|
||||||
|
int childrenLatch = getChildrenLatch();
|
||||||
|
status.setStatus(msg + " current State " + currentRunningState
|
||||||
|
+ (childrenLatch > 0 ? "; remaining num of running child procedures = " + childrenLatch
|
||||||
|
: ""));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void rollbackState(MasterProcedureEnv env, ServerCrashState state)
|
protected void rollbackState(MasterProcedureEnv env, ServerCrashState state)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -387,6 +414,7 @@ public class ServerCrashProcedure
|
||||||
this.regionsOnCrashedServer.add(ProtobufUtil.toRegionInfo(ri));
|
this.regionsOnCrashedServer.add(ProtobufUtil.toRegionInfo(ri));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
updateProgress(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -455,4 +483,15 @@ public class ServerCrashProcedure
|
||||||
protected boolean holdLock(MasterProcedureEnv env) {
|
protected boolean holdLock(MasterProcedureEnv env) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void updateProgress(MasterProcedureEnv env, long parentId) {
|
||||||
|
if (parentId == NO_PROC_ID) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Procedure parentProcedure =
|
||||||
|
env.getMasterServices().getMasterProcedureExecutor().getProcedure(parentId);
|
||||||
|
if (parentProcedure != null && parentProcedure instanceof ServerCrashProcedure) {
|
||||||
|
((ServerCrashProcedure) parentProcedure).updateProgress(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -94,6 +94,7 @@ public class SplitWALProcedure
|
||||||
setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER);
|
setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER);
|
||||||
return Flow.HAS_MORE_STATE;
|
return Flow.HAS_MORE_STATE;
|
||||||
}
|
}
|
||||||
|
ServerCrashProcedure.updateProgress(env, getParentProcId());
|
||||||
return Flow.NO_MORE_STATE;
|
return Flow.NO_MORE_STATE;
|
||||||
default:
|
default:
|
||||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||||
|
|
Loading…
Reference in New Issue