HBASE-21323 Should not skip force updating for a sub procedure even if it has been finished

This commit is contained in:
zhangduo 2018-10-17 20:51:19 +08:00 committed by Duo Zhang
parent 20a9b8ad00
commit 92b9b0f26d
2 changed files with 39 additions and 3 deletions

View File

@ -376,6 +376,11 @@ public class ProcedureExecutor<TEnvironment> {
this(conf, environment, store, new SimpleProcedureScheduler()); this(conf, environment, store, new SimpleProcedureScheduler());
} }
private boolean isRootFinished(Procedure<?> proc) {
Procedure<?> rootProc = procedures.get(proc.getRootProcId());
return rootProc == null || rootProc.isFinished();
}
private void forceUpdateProcedure(long procId) throws IOException { private void forceUpdateProcedure(long procId) throws IOException {
IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId); IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
try { try {
@ -384,7 +389,9 @@ public class ProcedureExecutor<TEnvironment> {
LOG.debug("No pending procedure with id = {}, skip force updating.", procId); LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
return; return;
} }
if (proc.isFinished()) { // For a sub procedure which root parent has not been finished, we still need to retain the
// wal even if the procedure itself is finished.
if (proc.isFinished() && (!proc.hasParent() || isRootFinished(proc))) {
LOG.debug("Procedure {} has already been finished, skip force updating.", proc); LOG.debug("Procedure {} has already been finished, skip force updating.", proc);
return; return;
} }

View File

@ -123,7 +123,34 @@ public class TestForceUpdateProcedure {
@Override @Override
protected Procedure<Void>[] execute(Void env) protected Procedure<Void>[] execute(Void env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
return new Procedure[] { new WaitingProcedure() }; return new Procedure[] { new DummyProcedure(), new WaitingProcedure() };
}
@Override
protected void rollback(Void env) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected boolean abort(Void env) {
return false;
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
}
public static final class DummyProcedure extends Procedure<Void> {
@Override
protected Procedure<Void>[] execute(Void env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
return null;
} }
@Override @Override
@ -208,10 +235,12 @@ public class TestForceUpdateProcedure {
createStoreAndExecutor(); createStoreAndExecutor();
Map<Class<?>, Procedure<Void>> procMap = new HashMap<>(); Map<Class<?>, Procedure<Void>> procMap = new HashMap<>();
EXEC.getActiveProceduresNoCopy().forEach(p -> procMap.put(p.getClass(), p)); EXEC.getActiveProceduresNoCopy().forEach(p -> procMap.put(p.getClass(), p));
assertEquals(2, procMap.size()); assertEquals(3, procMap.size());
ParentProcedure parentProc = (ParentProcedure) procMap.get(ParentProcedure.class); ParentProcedure parentProc = (ParentProcedure) procMap.get(ParentProcedure.class);
assertEquals(ProcedureState.WAITING, parentProc.getState()); assertEquals(ProcedureState.WAITING, parentProc.getState());
WaitingProcedure waitingProc = (WaitingProcedure) procMap.get(WaitingProcedure.class); WaitingProcedure waitingProc = (WaitingProcedure) procMap.get(WaitingProcedure.class);
assertEquals(ProcedureState.WAITING_TIMEOUT, waitingProc.getState()); assertEquals(ProcedureState.WAITING_TIMEOUT, waitingProc.getState());
DummyProcedure dummyProc = (DummyProcedure) procMap.get(DummyProcedure.class);
assertEquals(ProcedureState.SUCCESS, dummyProc.getState());
} }
} }