HBASE-17863: Procedure V2: Some cleanup around Procedure.isFinished() and procedure executor
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
ec5188df30
commit
9109803891
|
@ -24,5 +24,5 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
*/
|
||||
@InterfaceAudience.Public
|
||||
public enum ProcedureState {
|
||||
INITIALIZING, RUNNABLE, WAITING, WAITING_TIMEOUT, ROLLEDBACK, FINISHED;
|
||||
INITIALIZING, RUNNABLE, WAITING, WAITING_TIMEOUT, ROLLEDBACK, SUCCESS, FAILED;
|
||||
}
|
||||
|
|
|
@ -216,9 +216,9 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
}
|
||||
|
||||
/**
|
||||
* By default, the executor will try ro run procedures start to finish.
|
||||
* By default, the executor will try to run procedures start to finish.
|
||||
* Return true to make the executor yield between each execution step to
|
||||
* give other procedures time to run their steps.
|
||||
* give other procedures a chance to run.
|
||||
* @param env the environment passed to the ProcedureExecutor
|
||||
* @return Return true if the executor should yield on completion of an execution step.
|
||||
* Defaults to return false.
|
||||
|
@ -271,7 +271,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
toStringState(sb);
|
||||
|
||||
if (hasException()) {
|
||||
sb.append(", failed=" + getException());
|
||||
sb.append(", exception=" + getException());
|
||||
}
|
||||
|
||||
sb.append(", ");
|
||||
|
@ -505,6 +505,25 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
// just because the procedure can get scheduled on different executor threads on each step.
|
||||
// ==============================================================================================
|
||||
|
||||
/**
|
||||
* Procedure has states which are defined in proto file. At some places in the code, we
|
||||
* need to determine more about those states. Following Methods help determine:
|
||||
*
|
||||
* {@link #isFailed()} - A procedure has executed at least once and has failed. The procedure
|
||||
* may or may not have rolled back yet. Any procedure in FAILED state
|
||||
* will be eventually moved to ROLLEDBACK state.
|
||||
*
|
||||
* {@link #isSuccess()} - A procedure is completed successfully without any exception.
|
||||
*
|
||||
* {@link #isFinished()} - As a procedure in FAILED state will be tried forever for rollback, only
|
||||
* condition when scheduler/ executor will drop procedure from further
|
||||
* processing is when procedure state is ROLLEDBACK or isSuccess()
|
||||
* returns true. This is a terminal state of the procedure.
|
||||
*
|
||||
* {@link #isWaiting()} - Procedure is in one of the two waiting states ({@link
|
||||
* ProcedureState#WAITING}, {@link ProcedureState#WAITING_TIMEOUT}).
|
||||
*/
|
||||
|
||||
/**
|
||||
* @return true if the procedure is in a RUNNABLE state.
|
||||
*/
|
||||
|
@ -517,34 +536,25 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
}
|
||||
|
||||
/**
|
||||
* @return true if the procedure has failed.
|
||||
* true may mean failed but not yet rolledback or failed and rolledback.
|
||||
* @return true if the procedure has failed. It may or may not have rolled back.
|
||||
*/
|
||||
public synchronized boolean isFailed() {
|
||||
return exception != null || state == ProcedureState.ROLLEDBACK;
|
||||
return state == ProcedureState.FAILED || state == ProcedureState.ROLLEDBACK;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the procedure is finished successfully.
|
||||
*/
|
||||
public synchronized boolean isSuccess() {
|
||||
return state == ProcedureState.FINISHED && exception == null;
|
||||
return state == ProcedureState.SUCCESS && !hasException();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the procedure is finished. The Procedure may be completed
|
||||
* successfuly or failed and rolledback.
|
||||
* @return true if the procedure is finished. The Procedure may be completed successfully or
|
||||
* rolledback.
|
||||
*/
|
||||
public synchronized boolean isFinished() {
|
||||
switch (state) {
|
||||
case ROLLEDBACK:
|
||||
return true;
|
||||
case FINISHED:
|
||||
return exception == null;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return false;
|
||||
return isSuccess() || state == ProcedureState.ROLLEDBACK;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -580,7 +590,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
protected synchronized void setFailure(final RemoteProcedureException exception) {
|
||||
this.exception = exception;
|
||||
if (!isFinished()) {
|
||||
setState(ProcedureState.FINISHED);
|
||||
setState(ProcedureState.FAILED);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -313,7 +313,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
final NonceKey nonceKey;
|
||||
final long procId;
|
||||
|
||||
if (procIter.isNextCompleted()) {
|
||||
if (procIter.isNextFinished()) {
|
||||
ProcedureInfo proc = procIter.nextAsProcedureInfo();
|
||||
nonceKey = proc.getNonceKey();
|
||||
procId = proc.getProcId();
|
||||
|
@ -351,7 +351,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
HashSet<Procedure> waitingSet = null;
|
||||
procIter.reset();
|
||||
while (procIter.hasNext()) {
|
||||
if (procIter.isNextCompleted()) {
|
||||
if (procIter.isNextFinished()) {
|
||||
procIter.skipNext();
|
||||
continue;
|
||||
}
|
||||
|
@ -397,11 +397,9 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
}
|
||||
waitingSet.add(proc);
|
||||
break;
|
||||
case FINISHED:
|
||||
if (proc.hasException()) {
|
||||
// add the proc to the scheduler to perform the rollback
|
||||
scheduler.addBack(proc);
|
||||
}
|
||||
case FAILED:
|
||||
// add the proc to the scheduler to perform the rollback
|
||||
scheduler.addBack(proc);
|
||||
break;
|
||||
case ROLLEDBACK:
|
||||
case INITIALIZING:
|
||||
|
@ -650,7 +648,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
* @return whether the chore is removed, or it will be removed later
|
||||
*/
|
||||
public boolean removeChore(final ProcedureInMemoryChore chore) {
|
||||
chore.setState(ProcedureState.FINISHED);
|
||||
chore.setState(ProcedureState.SUCCESS);
|
||||
return timeoutExecutor.remove(chore);
|
||||
}
|
||||
|
||||
|
@ -1317,7 +1315,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE);
|
||||
|
||||
// Execute the procedure
|
||||
boolean isSuspended = false;
|
||||
boolean suspended = false;
|
||||
boolean reExecute = false;
|
||||
Procedure[] subprocs = null;
|
||||
do {
|
||||
|
@ -1328,7 +1326,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
subprocs = null;
|
||||
}
|
||||
} catch (ProcedureSuspendedException e) {
|
||||
isSuspended = true;
|
||||
suspended = true;
|
||||
} catch (ProcedureYieldException e) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Yield " + procedure + ": " + e.getMessage());
|
||||
|
@ -1358,9 +1356,9 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
}
|
||||
} else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
|
||||
timeoutExecutor.add(procedure);
|
||||
} else if (!isSuspended) {
|
||||
} else if (!suspended) {
|
||||
// No subtask, so we are done
|
||||
procedure.setState(ProcedureState.FINISHED);
|
||||
procedure.setState(ProcedureState.SUCCESS);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1369,20 +1367,20 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
|
||||
// allows to kill the executor before something is stored to the wal.
|
||||
// useful to test the procedure recovery.
|
||||
if (testing != null && testing.shouldKillBeforeStoreUpdate(isSuspended)) {
|
||||
if (testing != null && testing.shouldKillBeforeStoreUpdate(suspended)) {
|
||||
LOG.debug("TESTING: Kill before store update: " + procedure);
|
||||
stop();
|
||||
return;
|
||||
}
|
||||
|
||||
// Commit the transaction
|
||||
updateStoreOnExec(procStack, procedure, subprocs);
|
||||
|
||||
// if the store is not running we are aborting
|
||||
if (!store.isRunning()) return;
|
||||
|
||||
// Commit the transaction
|
||||
updateStoreOnExec(procStack, procedure, subprocs);
|
||||
|
||||
// if the procedure is kind enough to pass the slot to someone else, yield
|
||||
if (procedure.isRunnable() && !isSuspended &&
|
||||
if (procedure.isRunnable() && !suspended &&
|
||||
procedure.isYieldAfterExecutionStep(getEnvironment())) {
|
||||
scheduler.yield(procedure);
|
||||
return;
|
||||
|
|
|
@ -184,7 +184,7 @@ public final class ProcedureUtil {
|
|||
}
|
||||
|
||||
if (proto.hasException()) {
|
||||
assert proc.getState() == ProcedureProtos.ProcedureState.FINISHED ||
|
||||
assert proc.getState() == ProcedureProtos.ProcedureState.FAILED ||
|
||||
proc.getState() == ProcedureProtos.ProcedureState.ROLLEDBACK :
|
||||
"The procedure must be failed (waiting to rollback) or rolledback";
|
||||
proc.setFailure(RemoteProcedureException.fromProto(proto.getException()));
|
||||
|
|
|
@ -70,7 +70,7 @@ public interface ProcedureStore {
|
|||
/**
|
||||
* @return true if the iterator next element is a completed procedure.
|
||||
*/
|
||||
boolean isNextCompleted();
|
||||
boolean isNextFinished();
|
||||
|
||||
/**
|
||||
* Skip the next procedure
|
||||
|
|
|
@ -49,7 +49,7 @@ public class ProcedureWALFormatReader {
|
|||
// - INIT: Procedure submitted by the user (also known as 'root procedure')
|
||||
// - INSERT: Children added to the procedure <parentId>:[<childId>, ...]
|
||||
// - UPDATE: The specified procedure was updated
|
||||
// - DELETE: The procedure was removed (completed/rolledback and result TTL expired)
|
||||
// - DELETE: The procedure was removed (finished/rolledback and result TTL expired)
|
||||
//
|
||||
// In the WAL we can find multiple times the same procedure as UPDATE or INSERT.
|
||||
// We read the WAL from top to bottom, so every time we find an entry of the
|
||||
|
@ -326,15 +326,14 @@ public class ProcedureWALFormatReader {
|
|||
public boolean hasParent() { return proto.hasParentId(); }
|
||||
public boolean isReady() { return ready; }
|
||||
|
||||
public boolean isCompleted() {
|
||||
public boolean isFinished() {
|
||||
if (!hasParent()) {
|
||||
// we only consider 'root' procedures. because for the user 'completed'
|
||||
// means when everything up to the 'root' is complete.
|
||||
// we only consider 'root' procedures. because for the user 'finished'
|
||||
// means when everything up to the 'root' is finished.
|
||||
switch (proto.getState()) {
|
||||
case ROLLEDBACK:
|
||||
case SUCCESS:
|
||||
return true;
|
||||
case FINISHED:
|
||||
return !proto.hasException();
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -387,8 +386,8 @@ public class ProcedureWALFormatReader {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean isNextCompleted() {
|
||||
return current != null && current.isCompleted();
|
||||
public boolean isNextFinished() {
|
||||
return current != null && current.isFinished();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -676,8 +675,8 @@ public class ProcedureWALFormatReader {
|
|||
private boolean checkReadyToRun(Entry rootEntry) {
|
||||
assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry;
|
||||
|
||||
if (rootEntry.isCompleted()) {
|
||||
// if the root procedure is completed, sub-procedures should be gone
|
||||
if (rootEntry.isFinished()) {
|
||||
// if the root procedure is finished, sub-procedures should be gone
|
||||
if (rootEntry.childHead != null) {
|
||||
LOG.error("unexpected active children for root-procedure: " + rootEntry);
|
||||
for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
|
||||
|
|
|
@ -409,7 +409,7 @@ public class ProcedureTestingUtility {
|
|||
}
|
||||
|
||||
public void setFinishedState() {
|
||||
setState(ProcedureState.FINISHED);
|
||||
setState(ProcedureState.SUCCESS);
|
||||
}
|
||||
|
||||
public void setData(final byte[] data) {
|
||||
|
@ -523,7 +523,7 @@ public class ProcedureTestingUtility {
|
|||
public void load(ProcedureIterator procIter) throws IOException {
|
||||
while (procIter.hasNext()) {
|
||||
long procId;
|
||||
if (procIter.isNextCompleted()) {
|
||||
if (procIter.isNextFinished()) {
|
||||
ProcedureInfo proc = procIter.nextAsProcedureInfo();
|
||||
procId = proc.getProcId();
|
||||
LOG.debug("loading completed procId=" + procId + ": " + proc);
|
||||
|
|
|
@ -82,7 +82,7 @@ public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool {
|
|||
@Override
|
||||
public void load(ProcedureIterator procIter) throws IOException {
|
||||
while (procIter.hasNext()) {
|
||||
if (procIter.isNextCompleted()) {
|
||||
if (procIter.isNextFinished()) {
|
||||
ProcedureInfo proc = procIter.nextAsProcedureInfo();
|
||||
} else {
|
||||
Procedure proc = procIter.nextAsProcedure();
|
||||
|
|
|
@ -61,12 +61,20 @@ public final class ProcedureProtos {
|
|||
ROLLEDBACK(5),
|
||||
/**
|
||||
* <pre>
|
||||
* The procedure execution is completed. may need a rollback if failed.
|
||||
* The procedure execution is completed successfully.
|
||||
* </pre>
|
||||
*
|
||||
* <code>FINISHED = 6;</code>
|
||||
* <code>SUCCESS = 6;</code>
|
||||
*/
|
||||
FINISHED(6),
|
||||
SUCCESS(6),
|
||||
/**
|
||||
* <pre>
|
||||
* The procedure execution is failed, may need to rollback
|
||||
* </pre>
|
||||
*
|
||||
* <code>FAILED = 7;</code>
|
||||
*/
|
||||
FAILED(7),
|
||||
;
|
||||
|
||||
/**
|
||||
|
@ -111,12 +119,20 @@ public final class ProcedureProtos {
|
|||
public static final int ROLLEDBACK_VALUE = 5;
|
||||
/**
|
||||
* <pre>
|
||||
* The procedure execution is completed. may need a rollback if failed.
|
||||
* The procedure execution is completed successfully.
|
||||
* </pre>
|
||||
*
|
||||
* <code>FINISHED = 6;</code>
|
||||
* <code>SUCCESS = 6;</code>
|
||||
*/
|
||||
public static final int FINISHED_VALUE = 6;
|
||||
public static final int SUCCESS_VALUE = 6;
|
||||
/**
|
||||
* <pre>
|
||||
* The procedure execution is failed, may need to rollback
|
||||
* </pre>
|
||||
*
|
||||
* <code>FAILED = 7;</code>
|
||||
*/
|
||||
public static final int FAILED_VALUE = 7;
|
||||
|
||||
|
||||
public final int getNumber() {
|
||||
|
@ -138,7 +154,8 @@ public final class ProcedureProtos {
|
|||
case 3: return WAITING;
|
||||
case 4: return WAITING_TIMEOUT;
|
||||
case 5: return ROLLEDBACK;
|
||||
case 6: return FINISHED;
|
||||
case 6: return SUCCESS;
|
||||
case 7: return FAILED;
|
||||
default: return null;
|
||||
}
|
||||
}
|
||||
|
@ -7752,11 +7769,12 @@ public final class ProcedureProtos {
|
|||
"DURE_WAL_INIT\020\002\022\030\n\024PROCEDURE_WAL_INSERT\020" +
|
||||
"\003\022\030\n\024PROCEDURE_WAL_UPDATE\020\004\022\030\n\024PROCEDURE" +
|
||||
"_WAL_DELETE\020\005\022\031\n\025PROCEDURE_WAL_COMPACT\020\006" +
|
||||
"*p\n\016ProcedureState\022\020\n\014INITIALIZING\020\001\022\014\n\010" +
|
||||
"*{\n\016ProcedureState\022\020\n\014INITIALIZING\020\001\022\014\n\010" +
|
||||
"RUNNABLE\020\002\022\013\n\007WAITING\020\003\022\023\n\017WAITING_TIMEO" +
|
||||
"UT\020\004\022\016\n\nROLLEDBACK\020\005\022\014\n\010FINISHED\020\006BL\n1or",
|
||||
"g.apache.hadoop.hbase.shaded.protobuf.ge" +
|
||||
"neratedB\017ProcedureProtosH\001\210\001\001\240\001\001"
|
||||
"UT\020\004\022\016\n\nROLLEDBACK\020\005\022\013\n\007SUCCESS\020\006\022\n\n\006FAI",
|
||||
"LED\020\007BL\n1org.apache.hadoop.hbase.shaded." +
|
||||
"protobuf.generatedB\017ProcedureProtosH\001\210\001\001" +
|
||||
"\240\001\001"
|
||||
};
|
||||
org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() {
|
||||
|
|
|
@ -31,7 +31,8 @@ enum ProcedureState {
|
|||
WAITING = 3; // The procedure is waiting on children to be completed
|
||||
WAITING_TIMEOUT = 4; // The procedure is waiting a timout or an external event
|
||||
ROLLEDBACK = 5; // The procedure failed and was rolledback
|
||||
FINISHED = 6; // The procedure execution is completed. may need a rollback if failed.
|
||||
SUCCESS = 6; // The procedure execution is completed successfully.
|
||||
FAILED = 7; // The procedure execution is failed, may need to rollback
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.ProcedureInfo;
|
||||
|
@ -212,7 +211,7 @@ public class TestProcedureAdmin {
|
|||
assertTrue(procInfo.getProcState() == ProcedureState.RUNNABLE);
|
||||
found = true;
|
||||
} else {
|
||||
assertTrue(procInfo.getProcState() == ProcedureState.FINISHED);
|
||||
assertTrue(procInfo.getProcState() == ProcedureState.SUCCESS);
|
||||
}
|
||||
}
|
||||
assertTrue(found);
|
||||
|
@ -223,7 +222,7 @@ public class TestProcedureAdmin {
|
|||
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
|
||||
listProcedures = procExec.listProcedures();
|
||||
for (ProcedureInfo procInfo: listProcedures) {
|
||||
assertTrue(procInfo.getProcState() == ProcedureState.FINISHED);
|
||||
assertTrue(procInfo.getProcState() == ProcedureState.SUCCESS);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue