HBASE-21323 Should not skip force updating for a sub procedure even if it has been finished

Reapplication after fixing failing test.
This commit is contained in:
zhangduo 2018-10-17 20:51:19 +08:00 committed by Michael Stack
parent 63f718974b
commit 7e4cb7d7ec
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
2 changed files with 53 additions and 5 deletions

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
@ -376,6 +377,11 @@ public class ProcedureExecutor<TEnvironment> {
this(conf, environment, store, new SimpleProcedureScheduler());
}
private boolean isRootFinished(Procedure<?> proc) {
Procedure<?> rootProc = procedures.get(proc.getRootProcId());
return rootProc == null || rootProc.isFinished();
}
private void forceUpdateProcedure(long procId) throws IOException {
IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
try {
@ -384,7 +390,9 @@ public class ProcedureExecutor<TEnvironment> {
LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
return;
}
if (proc.isFinished()) {
// For a sub procedure which root parent has not been finished, we still need to retain the
// wal even if the procedure itself is finished.
if (proc.isFinished() && (!proc.hasParent() || isRootFinished(proc))) {
LOG.debug("Procedure {} has already been finished, skip force updating.", proc);
return;
}
@ -1396,6 +1404,18 @@ public class ProcedureExecutor<TEnvironment> {
return false;
}
/**
* Should only be used when starting up, where the procedure workers have not been started.
* <p/>
* If the procedure works has been started, the return values maybe changed when you are
* processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as
* it will do a copy, and also include the finished procedures.
*/
public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() {
return procedures.values();
}
/**
* Get procedures.
* @return the procedures in a list

View File

@ -123,7 +123,34 @@ public class TestForceUpdateProcedure {
@Override
protected Procedure<Void>[] execute(Void env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
return new Procedure[] { new WaitingProcedure() };
return new Procedure[] { new DummyProcedure(), new WaitingProcedure() };
}
@Override
protected void rollback(Void env) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected boolean abort(Void env) {
return false;
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
}
public static final class DummyProcedure extends Procedure<Void> {
@Override
protected Procedure<Void>[] execute(Void env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
return null;
}
@Override
@ -207,12 +234,13 @@ public class TestForceUpdateProcedure {
stopStoreAndExecutor();
createStoreAndExecutor();
Map<Class<?>, Procedure<Void>> procMap = new HashMap<>();
EXEC.getProcedures().stream().filter(p -> !p.isFinished())
.forEach(p -> procMap.put(p.getClass(), p));
assertEquals(2, procMap.size());
EXEC.getActiveProceduresNoCopy().forEach(p -> procMap.put(p.getClass(), p));
assertEquals(3, procMap.size());
ParentProcedure parentProc = (ParentProcedure) procMap.get(ParentProcedure.class);
assertEquals(ProcedureState.WAITING, parentProc.getState());
WaitingProcedure waitingProc = (WaitingProcedure) procMap.get(WaitingProcedure.class);
assertEquals(ProcedureState.WAITING_TIMEOUT, waitingProc.getState());
DummyProcedure dummyProc = (DummyProcedure) procMap.get(DummyProcedure.class);
assertEquals(ProcedureState.SUCCESS, dummyProc.getState());
}
}