HBASE-21323 Should not skip force updating for a sub procedure even if it has been finished
This commit is contained in:
parent
92fdc8dd51
commit
132bea9a1c
|
@ -376,6 +376,11 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
this(conf, environment, store, new SimpleProcedureScheduler());
|
||||
}
|
||||
|
||||
private boolean isRootFinished(Procedure<?> proc) {
|
||||
Procedure<?> rootProc = procedures.get(proc.getRootProcId());
|
||||
return rootProc == null || rootProc.isFinished();
|
||||
}
|
||||
|
||||
private void forceUpdateProcedure(long procId) throws IOException {
|
||||
IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
|
||||
try {
|
||||
|
@ -384,7 +389,9 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
|
||||
return;
|
||||
}
|
||||
if (proc.isFinished()) {
|
||||
// For a sub procedure which root parent has not been finished, we still need to retain the
|
||||
// wal even if the procedure itself is finished.
|
||||
if (proc.isFinished() && (!proc.hasParent() || isRootFinished(proc))) {
|
||||
LOG.debug("Procedure {} has already been finished, skip force updating.", proc);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -123,7 +123,34 @@ public class TestForceUpdateProcedure {
|
|||
@Override
|
||||
protected Procedure<Void>[] execute(Void env)
|
||||
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
||||
return new Procedure[] { new WaitingProcedure() };
|
||||
return new Procedure[] { new DummyProcedure(), new WaitingProcedure() };
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void rollback(Void env) throws IOException, InterruptedException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean abort(Void env) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
public static final class DummyProcedure extends Procedure<Void> {
|
||||
|
||||
@Override
|
||||
protected Procedure<Void>[] execute(Void env)
|
||||
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -208,10 +235,12 @@ public class TestForceUpdateProcedure {
|
|||
createStoreAndExecutor();
|
||||
Map<Class<?>, Procedure<Void>> procMap = new HashMap<>();
|
||||
EXEC.getActiveProceduresNoCopy().forEach(p -> procMap.put(p.getClass(), p));
|
||||
assertEquals(2, procMap.size());
|
||||
assertEquals(3, procMap.size());
|
||||
ParentProcedure parentProc = (ParentProcedure) procMap.get(ParentProcedure.class);
|
||||
assertEquals(ProcedureState.WAITING, parentProc.getState());
|
||||
WaitingProcedure waitingProc = (WaitingProcedure) procMap.get(WaitingProcedure.class);
|
||||
assertEquals(ProcedureState.WAITING_TIMEOUT, waitingProc.getState());
|
||||
DummyProcedure dummyProc = (DummyProcedure) procMap.get(DummyProcedure.class);
|
||||
assertEquals(ProcedureState.SUCCESS, dummyProc.getState());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue