HBASE-20846 Restore procedure locks when master restarts

This commit is contained in:
zhangduo 2018-07-22 15:10:06 +08:00
parent e44f506694
commit f3f17fa111
35 changed files with 624 additions and 551 deletions

View File

@ -163,8 +163,8 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
return null;
}
}
final Procedure pollResult = dequeue();
pollCalls++;
nullPollCalls += (pollResult == null) ? 1 : 0;
return pollResult;

View File

@ -24,8 +24,9 @@ import org.apache.yetus.audience.InterfaceAudience;
* Vessel that carries a Procedure and a timeout.
*/
@InterfaceAudience.Private
class DelayedProcedure extends DelayedUtil.DelayedContainerWithTimestamp<Procedure<?>> {
public DelayedProcedure(Procedure<?> procedure) {
class DelayedProcedure<TEnvironment>
extends DelayedUtil.DelayedContainerWithTimestamp<Procedure<TEnvironment>> {
public DelayedProcedure(Procedure<TEnvironment> procedure) {
super(procedure, procedure.getTimeoutTimestamp());
}
}

View File

@ -22,76 +22,94 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.metrics.Counter;
import org.apache.hadoop.hbase.metrics.Histogram;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
/**
* Base Procedure class responsible for Procedure Metadata;
* e.g. state, submittedTime, lastUpdate, stack-indexes, etc.
*
* <p>Procedures are run by a {@link ProcedureExecutor} instance. They are submitted and then
* the ProcedureExecutor keeps calling {@link #execute(Object)} until the Procedure is done.
* Execute may be called multiple times in the case of failure or a restart, so code must be
* idempotent. The return from an execute call is either: null to indicate we are done;
* ourself if there is more to do; or, a set of sub-procedures that need to
* be run to completion before the framework resumes our execution.
*
* <p>The ProcedureExecutor keeps its
* notion of Procedure State in the Procedure itself; e.g. it stamps the Procedure as INITIALIZING,
* RUNNABLE, SUCCESS, etc. Here are some of the States defined in the ProcedureState enum from
* protos:
*<ul>
* <li>{@link #isFailed()} A procedure has executed at least once and has failed. The procedure
* may or may not have rolled back yet. Any procedure in FAILED state will be eventually moved
* to ROLLEDBACK state.</li>
*
* Base Procedure class responsible for Procedure Metadata; e.g. state, submittedTime, lastUpdate,
* stack-indexes, etc.
* <p/>
* Procedures are run by a {@link ProcedureExecutor} instance. They are submitted and then the
* ProcedureExecutor keeps calling {@link #execute(Object)} until the Procedure is done. Execute may
* be called multiple times in the case of failure or a restart, so code must be idempotent. The
* return from an execute call is either: null to indicate we are done; ourself if there is more to
* do; or, a set of sub-procedures that need to be run to completion before the framework resumes
* our execution.
* <p/>
* The ProcedureExecutor keeps its notion of Procedure State in the Procedure itself; e.g. it stamps
* the Procedure as INITIALIZING, RUNNABLE, SUCCESS, etc. Here are some of the States defined in the
* ProcedureState enum from protos:
* <ul>
* <li>{@link #isFailed()} A procedure has executed at least once and has failed. The procedure may
* or may not have rolled back yet. Any procedure in FAILED state will be eventually moved to
* ROLLEDBACK state.</li>
* <li>{@link #isSuccess()} A procedure is completed successfully without exception.</li>
*
* <li>{@link #isFinished()} As a procedure in FAILED state will be tried forever for rollback, only
* condition when scheduler/ executor will drop procedure from further processing is when procedure
* state is ROLLEDBACK or isSuccess() returns true. This is a terminal state of the procedure.</li>
*
* <li>{@link #isWaiting()} - Procedure is in one of the two waiting states
* ({@link ProcedureState#WAITING}, {@link ProcedureState#WAITING_TIMEOUT}).</li>
*</ul>
* NOTE: These states are of the ProcedureExecutor. Procedure implementations in turn can keep
* their own state. This can lead to confusion. Try to keep the two distinct.
*
* <p>rollback() is called when the procedure or one of the sub-procedures
* has failed. The rollback step is supposed to cleanup the resources created
* during the execute() step. In case of failure and restart, rollback() may be
* called multiple times, so again the code must be idempotent.
*
* <p>Procedure can be made respect a locking regime. It has acquire/release methods as
* well as an {@link #hasLock(Object)}. The lock implementation is up to the implementor.
* If an entity needs to be locked for the life of a procedure -- not just the calls to
* execute -- then implementations should say so with the {@link #holdLock(Object)}
* method.
*
* <p>Procedures can be suspended or put in wait state with a callback that gets executed on
* </ul>
* NOTE: These states are of the ProcedureExecutor. Procedure implementations in turn can keep their
* own state. This can lead to confusion. Try to keep the two distinct.
* <p/>
* rollback() is called when the procedure or one of the sub-procedures has failed. The rollback
* step is supposed to cleanup the resources created during the execute() step. In case of failure
* and restart, rollback() may be called multiple times, so again the code must be idempotent.
* <p/>
* Procedure can be made respect a locking regime. It has acquire/release methods as well as an
* {@link #hasLock()}. The lock implementation is up to the implementor. If an entity needs to be
* locked for the life of a procedure -- not just the calls to execute -- then implementations
* should say so with the {@link #holdLock(Object)} method.
* <p/>
* And since we need to restore the lock when restarting to keep the logic correct(HBASE-20846), the
* implementation is a bit tricky so we add some comments hrre about it.
* <ul>
* <li>Make {@link #hasLock()} method final, and add a {@link #locked} field in Procedure to record
* whether we have the lock. We will set it to {@code true} in
* {@link #doAcquireLock(Object, ProcedureStore)} and to {@code false} in
* {@link #doReleaseLock(Object, ProcedureStore)}. The sub classes do not need to manage it any
* more.</li>
* <li>Also added a locked field in the proto message. When storing, the field will be set according
* to the return value of {@link #hasLock()}. And when loading, there is a new field in Procedure
* called {@link #lockedWhenLoading}. We will set it to {@code true} if the locked field in proto
* message is {@code true}.</li>
* <li>The reason why we can not set the {@link #locked} field directly to {@code true} by calling
* {@link #doAcquireLock(Object, ProcedureStore)} is that, during initialization, most procedures
* need to wait until master is initialized. So the solution here is that, we introduced a new
* method called {@link #waitInitialized(Object)} in Procedure, and move the wait master initialized
* related code from {@link #acquireLock(Object)} to this method. And we added a restoreLock method
* to Procedure, if {@link #lockedWhenLoading} is {@code true}, we will call the
* {@link #acquireLock(Object)} to get the lock, but do not set {@link #locked} to true. And later
* when we call {@link #doAcquireLock(Object, ProcedureStore)} and pass the
* {@link #waitInitialized(Object)} check, we will test {@link #lockedWhenLoading}, if it is
* {@code true}, when we just set the {@link #locked} field to true and return, without actually
* calling the {@link #acquireLock(Object)} method since we have already called it once.</li>
* </ul>
* <p/>
* Procedures can be suspended or put in wait state with a callback that gets executed on
* Procedure-specified timeout. See {@link #setTimeout(int)}}, and
* {@link #setTimeoutFailure(Object)}. See TestProcedureEvents and the
* TestTimeoutEventProcedure class for an example usage.</p>
*
* <p>There are hooks for collecting metrics on submit of the procedure and on finish.
* See {@link #updateMetricsOnSubmit(Object)} and
* {@link #updateMetricsOnFinish(Object, long, boolean)}.
* {@link #setTimeoutFailure(Object)}. See TestProcedureEvents and the TestTimeoutEventProcedure
* class for an example usage.
* </p>
* <p/>
* There are hooks for collecting metrics on submit of the procedure and on finish. See
* {@link #updateMetricsOnSubmit(Object)} and {@link #updateMetricsOnFinish(Object, long, boolean)}.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TEnvironment>> {
private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
public static final long NO_PROC_ID = -1;
@ -122,6 +140,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
private volatile byte[] result = null;
private volatile boolean locked = false;
private boolean lockedWhenLoading = false;
/**
* The main code of the procedure. It must be idempotent since execute()
* may be called multiple times in case of machine failure in the middle
@ -170,7 +192,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* be able to resume on failure.
* @param serializer stores the serializable state
*/
protected abstract void serializeStateData(final ProcedureStateSerializer serializer)
protected abstract void serializeStateData(ProcedureStateSerializer serializer)
throws IOException;
/**
@ -178,52 +200,65 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* state.
* @param serializer contains the serialized state
*/
protected abstract void deserializeStateData(final ProcedureStateSerializer serializer)
protected abstract void deserializeStateData(ProcedureStateSerializer serializer)
throws IOException;
/**
* The user should override this method if they need a lock on an Entity.
* A lock can be anything, and it is up to the implementor. The Procedure
* Framework will call this method just before it invokes {@link #execute(Object)}.
* It calls {@link #releaseLock(Object)} after the call to execute.
*
* <p>If you need to hold the lock for the life of the Procedure -- i.e. you do not
* want any other Procedure interfering while this Procedure is running, see
* {@link #holdLock(Object)}.
*
* <p>Example: in our Master we can execute request in parallel for different tables.
* We can create t1 and create t2 and these creates can be executed at the same time.
* Anything else on t1/t2 is queued waiting that specific table create to happen.
*
* <p>There are 3 LockState:
* <ul><li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is
* ready to execute.</li>
* <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework
* should take care of readding the procedure back to the runnable set for retry</li>
* <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will
* take care of readding the procedure back to the runnable set when the lock is available.
* </li></ul>
* The {@link #doAcquireLock(Object, ProcedureStore)} will be split into two steps, first, it will
* call us to determine whether we need to wait for initialization, second, it will call
* {@link #acquireLock(Object)} to actually handle the lock for this procedure.
* <p/>
* This is because that when master restarts, we need to restore the lock state for all the
* procedures to not break the semantic if {@link #holdLock(Object)} is true. But the
* {@link ProcedureExecutor} will be started before the master finish initialization(as it is part
* of the initialization!), so we need to split the code into two steps, and when restore, we just
* restore the lock part and ignore the waitInitialized part. Otherwise there will be dead lock.
* @return true means we need to wait until the environment has been initialized, otherwise true.
*/
protected boolean waitInitialized(TEnvironment env) {
return false;
}
/**
* The user should override this method if they need a lock on an Entity. A lock can be anything,
* and it is up to the implementor. The Procedure Framework will call this method just before it
* invokes {@link #execute(Object)}. It calls {@link #releaseLock(Object)} after the call to
* execute.
* <p/>
* If you need to hold the lock for the life of the Procedure -- i.e. you do not want any other
* Procedure interfering while this Procedure is running, see {@link #holdLock(Object)}.
* <p/>
* Example: in our Master we can execute request in parallel for different tables. We can create
* t1 and create t2 and these creates can be executed at the same time. Anything else on t1/t2 is
* queued waiting that specific table create to happen.
* <p/>
* There are 3 LockState:
* <ul>
* <li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is ready to
* execute.</li>
* <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework should
* take care of readding the procedure back to the runnable set for retry</li>
* <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will take
* care of readding the procedure back to the runnable set when the lock is available.</li>
* </ul>
* @return the lock state as described above.
*/
protected LockState acquireLock(final TEnvironment env) {
protected LockState acquireLock(TEnvironment env) {
return LockState.LOCK_ACQUIRED;
}
/**
* The user should override this method, and release lock if necessary.
*/
protected void releaseLock(final TEnvironment env) {
protected void releaseLock(TEnvironment env) {
// no-op
}
/**
* Used to keep the procedure lock even when the procedure is yielding or suspended.
* Must implement {@link #hasLock(Object)} if you want to hold the lock for life
* of the Procedure.
* @see #hasLock(Object)
* @return true if the procedure should hold on the lock until completionCleanup()
*/
protected boolean holdLock(final TEnvironment env) {
protected boolean holdLock(TEnvironment env) {
return false;
}
@ -235,8 +270,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* @see #holdLock(Object)
* @return true if the procedure has the lock, false otherwise.
*/
protected boolean hasLock(final TEnvironment env) {
return false;
protected final boolean hasLock() {
return locked;
}
/**
@ -245,7 +280,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* operation before replay.
* e.g. failing the procedure if the state on replay may be unknown.
*/
protected void beforeReplay(final TEnvironment env) {
protected void beforeReplay(TEnvironment env) {
// no-op
}
@ -253,7 +288,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* Called when the procedure is ready to be added to the queue after
* the loading/replay operation.
*/
protected void afterReplay(final TEnvironment env) {
protected void afterReplay(TEnvironment env) {
// no-op
}
@ -263,7 +298,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* This operation will not be retried on failure. If a procedure took a lock,
* it will have been released when this method runs.
*/
protected void completionCleanup(final TEnvironment env) {
protected void completionCleanup(TEnvironment env) {
// no-op
}
@ -275,7 +310,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* @return Return true if the executor should yield on completion of an execution step.
* Defaults to return false.
*/
protected boolean isYieldAfterExecutionStep(final TEnvironment env) {
protected boolean isYieldAfterExecutionStep(TEnvironment env) {
return false;
}
@ -288,7 +323,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* @return true if the executor should wait the client ack for the result.
* Defaults to return true.
*/
protected boolean shouldWaitClientAck(final TEnvironment env) {
protected boolean shouldWaitClientAck(TEnvironment env) {
return true;
}
@ -298,7 +333,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* @param env The environment passed to the procedure executor
* @return Container object for procedure related metric
*/
protected ProcedureMetrics getProcedureMetrics(final TEnvironment env) {
protected ProcedureMetrics getProcedureMetrics(TEnvironment env) {
return null;
}
@ -308,7 +343,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* updates submitted counter if {@link #getProcedureMetrics(Object)} returns non-null
* {@link ProcedureMetrics}.
*/
protected void updateMetricsOnSubmit(final TEnvironment env) {
protected void updateMetricsOnSubmit(TEnvironment env) {
ProcedureMetrics metrics = getProcedureMetrics(env);
if (metrics == null) {
return;
@ -322,21 +357,19 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
/**
* This function will be called just after procedure execution is finished. Override this method
* to update metrics at the end of the procedure. If {@link #getProcedureMetrics(Object)}
* returns non-null {@link ProcedureMetrics}, the default implementation adds runtime of a
* procedure to a time histogram for successfully completed procedures. Increments failed
* counter for failed procedures.
*
* TODO: As any of the sub-procedures on failure rolls back all procedures in the stack,
* including successfully finished siblings, this function may get called twice in certain
* cases for certain procedures. Explore further if this can be called once.
*
* to update metrics at the end of the procedure. If {@link #getProcedureMetrics(Object)} returns
* non-null {@link ProcedureMetrics}, the default implementation adds runtime of a procedure to a
* time histogram for successfully completed procedures. Increments failed counter for failed
* procedures.
* <p/>
* TODO: As any of the sub-procedures on failure rolls back all procedures in the stack, including
* successfully finished siblings, this function may get called twice in certain cases for certain
* procedures. Explore further if this can be called once.
* @param env The environment passed to the procedure executor
* @param runtime Runtime of the procedure in milliseconds
* @param success true if procedure is completed successfully
*/
protected void updateMetricsOnFinish(final TEnvironment env, final long runtime,
boolean success) {
protected void updateMetricsOnFinish(TEnvironment env, long runtime, boolean success) {
ProcedureMetrics metrics = getProcedureMetrics(env);
if (metrics == null) {
return;
@ -362,8 +395,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
}
/**
* Build the StringBuilder for the simple form of
* procedure string.
* Build the StringBuilder for the simple form of procedure string.
* @return the StringBuilder
*/
protected StringBuilder toStringSimpleSB() {
@ -389,6 +421,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
sb.append(", state="); // pState for Procedure State as opposed to any other kind.
toStringState(sb);
sb.append(", hasLock=").append(locked);
if (hasException()) {
sb.append(", exception=" + getException());
}
@ -400,8 +434,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
}
/**
* Extend the toString() information with more procedure
* details
* Extend the toString() information with more procedure details
*/
public String toStringDetails() {
final StringBuilder sb = toStringSimpleSB();
@ -429,8 +462,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
}
/**
* Called from {@link #toString()} when interpolating {@link Procedure} State.
* Allows decorating generic Procedure State with Procedure particulars.
* Called from {@link #toString()} when interpolating {@link Procedure} State. Allows decorating
* generic Procedure State with Procedure particulars.
* @param builder Append current {@link ProcedureState}
*/
protected void toStringState(StringBuilder builder) {
@ -493,8 +526,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* Called by the ProcedureExecutor to assign the ID to the newly created procedure.
*/
@VisibleForTesting
@InterfaceAudience.Private
protected void setProcId(final long procId) {
protected void setProcId(long procId) {
this.procId = procId;
this.submittedTime = EnvironmentEdgeManager.currentTime();
setState(ProcedureState.RUNNABLE);
@ -503,13 +535,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
/**
* Called by the ProcedureExecutor to assign the parent to the newly created procedure.
*/
@InterfaceAudience.Private
protected void setParentProcId(final long parentProcId) {
protected void setParentProcId(long parentProcId) {
this.parentProcId = parentProcId;
}
@InterfaceAudience.Private
protected void setRootProcId(final long rootProcId) {
protected void setRootProcId(long rootProcId) {
this.rootProcId = rootProcId;
}
@ -517,18 +547,16 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* Called by the ProcedureExecutor to set the value to the newly created procedure.
*/
@VisibleForTesting
@InterfaceAudience.Private
protected void setNonceKey(final NonceKey nonceKey) {
protected void setNonceKey(NonceKey nonceKey) {
this.nonceKey = nonceKey;
}
@VisibleForTesting
@InterfaceAudience.Private
public void setOwner(final String owner) {
public void setOwner(String owner) {
this.owner = StringUtils.isEmpty(owner) ? null : owner;
}
public void setOwner(final User owner) {
public void setOwner(User owner) {
assert owner != null : "expected owner to be not null";
setOwner(owner.getShortName());
}
@ -537,8 +565,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* Called on store load to initialize the Procedure internals after
* the creation/deserialization.
*/
@InterfaceAudience.Private
protected void setSubmittedTime(final long submittedTime) {
protected void setSubmittedTime(long submittedTime) {
this.submittedTime = submittedTime;
}
@ -548,7 +575,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
/**
* @param timeout timeout interval in msec
*/
protected void setTimeout(final int timeout) {
protected void setTimeout(int timeout) {
this.timeout = timeout;
}
@ -567,15 +594,13 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* Called on store load to initialize the Procedure internals after
* the creation/deserialization.
*/
@InterfaceAudience.Private
protected void setLastUpdate(final long lastUpdate) {
protected void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
/**
* Called by ProcedureExecutor after each time a procedure step is executed.
*/
@InterfaceAudience.Private
protected void updateTimestamp() {
this.lastUpdate = EnvironmentEdgeManager.currentTime();
}
@ -590,7 +615,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* the procedure is in the waiting queue.
* @return the timestamp of the next timeout.
*/
@InterfaceAudience.Private
protected long getTimeoutTimestamp() {
return getLastUpdate() + getTimeout();
}
@ -616,10 +640,19 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* The procedure may leave a "result" on completion.
* @param result the serialized result that will be passed to the client
*/
protected void setResult(final byte[] result) {
protected void setResult(byte[] result) {
this.result = result;
}
/**
* Will only be called when loading procedures from procedure store, where we need to record
* whether the procedure has already held a lock. Later we will call
* {@link #doAcquireLock(Object)} to actually acquire the lock.
*/
final void lockedWhenLoading() {
this.lockedWhenLoading = true;
}
// ==============================================================================================
// Runtime state, updated every operation by the ProcedureExecutor
//
@ -677,13 +710,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
}
@VisibleForTesting
@InterfaceAudience.Private
protected synchronized void setState(final ProcedureState state) {
this.state = state;
updateTimestamp();
}
@InterfaceAudience.Private
public synchronized ProcedureState getState() {
return state;
}
@ -705,10 +736,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
/**
* Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
* @return true to let the framework handle the timeout as abort,
* false in case the procedure handled the timeout itself.
* @return true to let the framework handle the timeout as abort, false in case the procedure
* handled the timeout itself.
*/
protected synchronized boolean setTimeoutFailure(final TEnvironment env) {
protected synchronized boolean setTimeoutFailure(TEnvironment env) {
if (state == ProcedureState.WAITING_TIMEOUT) {
long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
setFailure("ProcedureExecutor", new TimeoutIOException(
@ -729,8 +760,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
/**
* Called by the ProcedureExecutor on procedure-load to restore the latch state
*/
@InterfaceAudience.Private
protected synchronized void setChildrenLatch(final int numChildren) {
protected synchronized void setChildrenLatch(int numChildren) {
this.childrenLatch = numChildren;
if (LOG.isTraceEnabled()) {
LOG.trace("CHILD LATCH INCREMENT SET " +
@ -741,7 +771,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
/**
* Called by the ProcedureExecutor on procedure-load to restore the latch state
*/
@InterfaceAudience.Private
protected synchronized void incChildrenLatch() {
// TODO: can this be inferred from the stack? I think so...
this.childrenLatch++;
@ -753,7 +782,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
/**
* Called by the ProcedureExecutor to notify that one of the sub-procedures has completed.
*/
@InterfaceAudience.Private
private synchronized boolean childrenCountDown() {
assert childrenLatch > 0: this;
boolean b = --childrenLatch == 0;
@ -770,17 +798,18 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
*/
synchronized boolean tryRunnable() {
// Don't use isWaiting in the below; it returns true for WAITING and WAITING_TIMEOUT
boolean b = getState() == ProcedureState.WAITING && childrenCountDown();
if (b) setState(ProcedureState.RUNNABLE);
return b;
if (getState() == ProcedureState.WAITING && childrenCountDown()) {
setState(ProcedureState.RUNNABLE);
return true;
} else {
return false;
}
}
@InterfaceAudience.Private
protected synchronized boolean hasChildren() {
return childrenLatch > 0;
}
@InterfaceAudience.Private
protected synchronized int getChildrenLatch() {
return childrenLatch;
}
@ -789,7 +818,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* Called by the RootProcedureState on procedure execution.
* Each procedure store its stack-index positions.
*/
@InterfaceAudience.Private
protected synchronized void addStackIndex(final int index) {
if (stackIndexes == null) {
stackIndexes = new int[] { index };
@ -800,7 +828,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
}
}
@InterfaceAudience.Private
protected synchronized boolean removeStackIndex() {
if (stackIndexes != null && stackIndexes.length > 1) {
stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
@ -815,7 +842,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* Called on store load to initialize the Procedure internals after
* the creation/deserialization.
*/
@InterfaceAudience.Private
protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
this.stackIndexes = new int[stackIndexes.size()];
for (int i = 0; i < this.stackIndexes.length; ++i) {
@ -823,12 +849,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
}
}
@InterfaceAudience.Private
protected synchronized boolean wasExecuted() {
return stackIndexes != null;
}
@InterfaceAudience.Private
protected synchronized int[] getStackIndexes() {
return stackIndexes;
}
@ -840,10 +864,9 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
/**
* Internal method called by the ProcedureExecutor that starts the user-level code execute().
* @throws ProcedureSuspendedException This is used when procedure wants to halt processing and
* skip out without changing states or releasing any locks held.
* skip out without changing states or releasing any locks held.
*/
@InterfaceAudience.Private
protected Procedure<TEnvironment>[] doExecute(final TEnvironment env)
protected Procedure<TEnvironment>[] doExecute(TEnvironment env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
try {
updateTimestamp();
@ -856,8 +879,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
/**
* Internal method called by the ProcedureExecutor that starts the user-level code rollback().
*/
@InterfaceAudience.Private
protected void doRollback(final TEnvironment env)
protected void doRollback(TEnvironment env)
throws IOException, InterruptedException {
try {
updateTimestamp();
@ -867,19 +889,60 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
}
}
final void restoreLock(TEnvironment env) {
if (!lockedWhenLoading) {
LOG.debug("{} didn't hold the lock before restarting, skip acquiring lock.", this);
return;
}
LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this);
LockState state = acquireLock(env);
assert state == LockState.LOCK_ACQUIRED;
}
/**
* Internal method called by the ProcedureExecutor that starts the user-level code acquireLock().
*/
@InterfaceAudience.Private
protected LockState doAcquireLock(final TEnvironment env) {
return acquireLock(env);
final LockState doAcquireLock(TEnvironment env, ProcedureStore store) {
if (waitInitialized(env)) {
return LockState.LOCK_EVENT_WAIT;
}
if (lockedWhenLoading) {
// reset it so we will not consider it anymore
lockedWhenLoading = false;
locked = true;
// Here we return without persist the locked state, as lockedWhenLoading is true means
// that the locked field of the procedure stored in procedure store is true, so we do not need
// to store it again.
return LockState.LOCK_ACQUIRED;
}
LockState state = acquireLock(env);
if (state == LockState.LOCK_ACQUIRED) {
locked = true;
// persist that we have held the lock. This must be done before we actually execute the
// procedure, otherwise when restarting, we may consider the procedure does not have a lock,
// but it may have already done some changes as we have already executed it, and if another
// procedure gets the lock, then the semantic will be broken if the holdLock is true, as we do
// not expect that another procedure can be executed in the middle.
store.update(this);
}
return state;
}
/**
* Internal method called by the ProcedureExecutor that starts the user-level code releaseLock().
*/
@InterfaceAudience.Private
protected void doReleaseLock(final TEnvironment env) {
final void doReleaseLock(TEnvironment env, ProcedureStore store) {
locked = false;
// persist that we have released the lock. This must be done before we actually release the
// lock. Another procedure may take this lock immediately after we release the lock, and if we
// crash before persist the information that we have already released the lock, then when
// restarting there will be two procedures which both have the lock and cause problems.
if (getState() != ProcedureState.ROLLEDBACK) {
// If the state is ROLLEDBACK, it means that we have already deleted the procedure from
// procedure store, so do not need to log the release operation any more.
store.update(this);
}
releaseLock(env);
}
@ -896,7 +959,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* Get an hashcode for the specified Procedure ID
* @return the hashcode for the specified procId
*/
public static long getProcIdHashCode(final long procId) {
public static long getProcIdHashCode(long procId) {
long h = procId;
h ^= h >> 16;
h *= 0x85ebca6b;
@ -906,15 +969,16 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
return h;
}
/*
/**
* Helper to lookup the root Procedure ID given a specified procedure.
*/
@InterfaceAudience.Private
protected static Long getRootProcedureId(final Map<Long, Procedure> procedures,
Procedure<?> proc) {
protected static <T> Long getRootProcedureId(Map<Long, Procedure<T>> procedures,
Procedure<T> proc) {
while (proc.hasParent()) {
proc = procedures.get(proc.getParentProcId());
if (proc == null) return null;
if (proc == null) {
return null;
}
}
return proc.getProcId();
}
@ -924,7 +988,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* @param b the second procedure to be compared.
* @return true if the two procedures have the same parent
*/
public static boolean haveSameParent(final Procedure<?> a, final Procedure<?> b) {
public static boolean haveSameParent(Procedure<?> a, Procedure<?> b) {
return a.hasParent() && b.hasParent() && (a.getParentProcId() == b.getParentProcId());
}
}

View File

@ -19,8 +19,10 @@
package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -48,7 +50,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -71,7 +72,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
* and get the result via getResult(procId)
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ProcedureExecutor<TEnvironment> {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class);
@ -108,16 +108,16 @@ public class ProcedureExecutor<TEnvironment> {
void procedureFinished(long procId);
}
private static class CompletedProcedureRetainer {
private final Procedure<?> procedure;
private static final class CompletedProcedureRetainer<TEnvironment> {
private final Procedure<TEnvironment> procedure;
private long clientAckTime;
public CompletedProcedureRetainer(Procedure<?> procedure) {
public CompletedProcedureRetainer(Procedure<TEnvironment> procedure) {
this.procedure = procedure;
clientAckTime = -1;
}
public Procedure<?> getProcedure() {
public Procedure<TEnvironment> getProcedure() {
return procedure;
}
@ -172,13 +172,13 @@ public class ProcedureExecutor<TEnvironment> {
private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size";
private static final int DEFAULT_BATCH_SIZE = 32;
private final Map<Long, CompletedProcedureRetainer> completed;
private final Map<Long, CompletedProcedureRetainer<TEnvironment>> completed;
private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
private final ProcedureStore store;
private Configuration conf;
public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store,
final Map<Long, CompletedProcedureRetainer> completedMap,
public CompletedProcedureCleaner(Configuration conf, final ProcedureStore store,
final Map<Long, CompletedProcedureRetainer<TEnvironment>> completedMap,
final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
// set the timeout interval that triggers the periodic-procedure
super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
@ -205,10 +205,11 @@ public class ProcedureExecutor<TEnvironment> {
int batchCount = 0;
final long now = EnvironmentEdgeManager.currentTime();
final Iterator<Map.Entry<Long, CompletedProcedureRetainer>> it = completed.entrySet().iterator();
final Iterator<Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>>> it =
completed.entrySet().iterator();
while (it.hasNext() && store.isRunning()) {
final Map.Entry<Long, CompletedProcedureRetainer> entry = it.next();
final CompletedProcedureRetainer retainer = entry.getValue();
final Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>> entry = it.next();
final CompletedProcedureRetainer<TEnvironment> retainer = entry.getValue();
final Procedure<?> proc = retainer.getProcedure();
// TODO: Select TTL based on Procedure type
@ -240,28 +241,32 @@ public class ProcedureExecutor<TEnvironment> {
* Once a Root-Procedure completes (success or failure), the result will be added to this map.
* The user of ProcedureExecutor should call getResult(procId) to get the result.
*/
private final ConcurrentHashMap<Long, CompletedProcedureRetainer> completed = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed =
new ConcurrentHashMap<>();
/**
* Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
* The RootProcedureState contains the execution stack of the Root-Procedure,
* It is added to the map by submitProcedure() and removed on procedure completion.
*/
private final ConcurrentHashMap<Long, RootProcedureState> rollbackStack = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack =
new ConcurrentHashMap<>();
/**
* Helper map to lookup the live procedures by ID.
* This map contains every procedure. root-procedures and subprocedures.
*/
private final ConcurrentHashMap<Long, Procedure> procedures = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures =
new ConcurrentHashMap<>();
/**
* Helper map to lookup whether the procedure already issued from the same client.
* This map contains every root procedure.
* Helper map to lookup whether the procedure already issued from the same client. This map
* contains every root procedure.
*/
private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>();
private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
new CopyOnWriteArrayList<>();
private Configuration conf;
@ -287,7 +292,7 @@ public class ProcedureExecutor<TEnvironment> {
* Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
* (Should be ok).
*/
private TimeoutExecutorThread timeoutExecutor;
private TimeoutExecutorThread<TEnvironment> timeoutExecutor;
private int corePoolSize;
private int maxPoolSize;
@ -357,27 +362,68 @@ public class ProcedureExecutor<TEnvironment> {
});
}
private void loadProcedures(final ProcedureIterator procIter,
final boolean abortOnCorruption) throws IOException {
final boolean debugEnabled = LOG.isDebugEnabled();
private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) {
proc.restoreLock(getEnvironment());
restored.add(proc.getProcId());
}
private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) {
while (!stack.isEmpty()) {
restoreLock(stack.pop(), restored);
}
}
// Restore the locks for all the procedures.
// Notice that we need to restore the locks starting from the root proc, otherwise there will be
// problem that a sub procedure may hold the exclusive lock first and then we are stuck when
// calling the acquireLock method for the parent procedure.
// The algorithm is straight-forward:
// 1. Use a set to record the procedures which locks have already been restored.
// 2. Use a stack to store the hierarchy of the procedures
// 3. For all the procedure, we will first try to find its parent and push it into the stack,
// unless
// a. We have no parent, i.e, we are the root procedure
// b. The lock has already been restored(by checking the set introduced in #1)
// then we start to pop the stack and call acquireLock for each procedure.
// Notice that this should be done for all procedures, not only the ones in runnableList.
private void restoreLocks() {
Set<Long> restored = new HashSet<>();
Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>();
procedures.values().forEach(proc -> {
for (;;) {
if (restored.contains(proc.getProcId())) {
restoreLocks(stack, restored);
return;
}
if (!proc.hasParent()) {
restoreLock(proc, restored);
restoreLocks(stack, restored);
return;
}
stack.push(proc);
proc = procedures.get(proc.getParentProcId());
}
});
}
private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption)
throws IOException {
// 1. Build the rollback stack
int runnablesCount = 0;
int failedCount = 0;
while (procIter.hasNext()) {
boolean finished = procIter.isNextFinished();
Procedure proc = procIter.next();
Procedure<TEnvironment> proc = procIter.next();
NonceKey nonceKey = proc.getNonceKey();
long procId = proc.getProcId();
if (finished) {
completed.put(proc.getProcId(), new CompletedProcedureRetainer(proc));
if (debugEnabled) {
LOG.debug("Completed " + proc);
}
completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc));
LOG.debug("Completed {}", proc);
} else {
if (!proc.hasParent()) {
assert !proc.isFinished() : "unexpected finished procedure";
rollbackStack.put(proc.getProcId(), new RootProcedureState());
rollbackStack.put(proc.getProcId(), new RootProcedureState<>());
}
// add the procedure to the map
@ -386,6 +432,8 @@ public class ProcedureExecutor<TEnvironment> {
if (proc.getState() == ProcedureState.RUNNABLE) {
runnablesCount++;
} else if (proc.getState() == ProcedureState.FAILED) {
failedCount++;
}
}
@ -396,8 +444,19 @@ public class ProcedureExecutor<TEnvironment> {
}
// 2. Initialize the stacks
final ArrayList<Procedure> runnableList = new ArrayList(runnablesCount);
HashSet<Procedure> waitingSet = null;
// In the old implementation, for procedures in FAILED state, we will push it into the
// ProcedureScheduler directly to execute the rollback. But this does not work after we
// introduce the restore lock stage.
// For now, when we acquire a xlock, we will remove the queue from runQueue in scheduler, and
// then when a procedure which has lock access, for example, a sub procedure of the procedure
// which has the xlock, is pushed into the scheduler, we will add the queue back to let the
// workers poll from it. The assumption here is that, the procedure which has the xlock should
// have been polled out already, so when loading we can not add the procedure to scheduler first
// and then call acquireLock, since the procedure is still in the queue, and since we will
// remove the queue from runQueue, then no one can poll it out, then there is a dead lock
List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnablesCount);
List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount);
Set<Procedure<TEnvironment>> waitingSet = null;
procIter.reset();
while (procIter.hasNext()) {
if (procIter.isNextFinished()) {
@ -405,12 +464,10 @@ public class ProcedureExecutor<TEnvironment> {
continue;
}
Procedure proc = procIter.next();
Procedure<TEnvironment> proc = procIter.next();
assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
if (debugEnabled) {
LOG.debug(String.format("Loading %s", proc));
}
LOG.debug("Loading {}", proc);
Long rootProcId = getRootProcedureId(proc);
if (rootProcId == null) {
@ -420,14 +477,14 @@ public class ProcedureExecutor<TEnvironment> {
}
if (proc.hasParent()) {
Procedure parent = procedures.get(proc.getParentProcId());
Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId());
// corrupted procedures are handled later at step 3
if (parent != null && !proc.isFinished()) {
parent.incChildrenLatch();
}
}
RootProcedureState procStack = rollbackStack.get(rootProcId);
RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
procStack.loadStack(proc);
proc.setRootProcId(rootProcId);
@ -447,8 +504,7 @@ public class ProcedureExecutor<TEnvironment> {
waitingSet.add(proc);
break;
case FAILED:
// add the proc to the scheduler to perform the rollback
scheduler.addBack(proc);
failedList.add(proc);
break;
case ROLLEDBACK:
case INITIALIZING:
@ -462,13 +518,14 @@ public class ProcedureExecutor<TEnvironment> {
// 3. Validate the stacks
int corruptedCount = 0;
Iterator<Map.Entry<Long, RootProcedureState>> itStack = rollbackStack.entrySet().iterator();
Iterator<Map.Entry<Long, RootProcedureState<TEnvironment>>> itStack =
rollbackStack.entrySet().iterator();
while (itStack.hasNext()) {
Map.Entry<Long, RootProcedureState> entry = itStack.next();
RootProcedureState procStack = entry.getValue();
Map.Entry<Long, RootProcedureState<TEnvironment>> entry = itStack.next();
RootProcedureState<TEnvironment> procStack = entry.getValue();
if (procStack.isValid()) continue;
for (Procedure proc: procStack.getSubproceduresStack()) {
for (Procedure<TEnvironment> proc : procStack.getSubproceduresStack()) {
LOG.error("Corrupted " + proc);
procedures.remove(proc.getProcId());
runnableList.remove(proc);
@ -484,30 +541,22 @@ public class ProcedureExecutor<TEnvironment> {
// 4. Push the procedures to the timeout executor
if (waitingSet != null && !waitingSet.isEmpty()) {
for (Procedure proc: waitingSet) {
for (Procedure<TEnvironment> proc: waitingSet) {
proc.afterReplay(getEnvironment());
timeoutExecutor.add(proc);
}
}
// 5. Push the procedure to the scheduler
if (!runnableList.isEmpty()) {
// TODO: See ProcedureWALFormatReader#hasFastStartSupport
// some procedure may be started way before this stuff.
for (int i = runnableList.size() - 1; i >= 0; --i) {
Procedure proc = runnableList.get(i);
proc.afterReplay(getEnvironment());
if (!proc.hasParent()) {
sendProcedureLoadedNotification(proc.getProcId());
}
if (proc.wasExecuted()) {
scheduler.addFront(proc);
} else {
// if it was not in execution, it can wait.
scheduler.addBack(proc);
}
// 5. restore locks
restoreLocks();
// 6. Push the procedure to the scheduler
failedList.forEach(scheduler::addBack);
runnableList.forEach(p -> {
p.afterReplay(getEnvironment());
if (!p.hasParent()) {
sendProcedureLoadedNotification(p.getProcId());
}
}
scheduler.addBack(p);
});
}
/**
@ -529,7 +578,7 @@ public class ProcedureExecutor<TEnvironment> {
corePoolSize, maxPoolSize);
this.threadGroup = new ThreadGroup("PEWorkerGroup");
this.timeoutExecutor = new TimeoutExecutorThread(this, threadGroup);
this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup);
// Create the workers
workerId.set(0);
@ -581,7 +630,7 @@ public class ProcedureExecutor<TEnvironment> {
timeoutExecutor.add(new WorkerMonitor());
// Add completed cleaner chore
addChore(new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap));
addChore(new CompletedProcedureCleaner<>(conf, store, completed, nonceKeysToProcIdsMap));
}
public void stop() {
@ -686,7 +735,7 @@ public class ProcedureExecutor<TEnvironment> {
* Add a chore procedure to the executor
* @param chore the chore to add
*/
public void addChore(final ProcedureInMemoryChore chore) {
public void addChore(ProcedureInMemoryChore<TEnvironment> chore) {
chore.setState(ProcedureState.WAITING_TIMEOUT);
timeoutExecutor.add(chore);
}
@ -696,7 +745,7 @@ public class ProcedureExecutor<TEnvironment> {
* @param chore the chore to remove
* @return whether the chore is removed, or it will be removed later
*/
public boolean removeChore(final ProcedureInMemoryChore chore) {
public boolean removeChore(ProcedureInMemoryChore<TEnvironment> chore) {
chore.setState(ProcedureState.SUCCESS);
return timeoutExecutor.remove(chore);
}
@ -830,17 +879,21 @@ public class ProcedureExecutor<TEnvironment> {
* @param procOwner name of the owner of the procedure, used to inform the user
* @param exception the failure to report to the user
*/
public void setFailureResultForNonce(final NonceKey nonceKey, final String procName,
final User procOwner, final IOException exception) {
if (nonceKey == null) return;
public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner,
IOException exception) {
if (nonceKey == null) {
return;
}
final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
if (procId == null || completed.containsKey(procId)) return;
Long procId = nonceKeysToProcIdsMap.get(nonceKey);
if (procId == null || completed.containsKey(procId)) {
return;
}
Procedure<?> proc = new FailedProcedure(procId.longValue(),
procName, procOwner, nonceKey, exception);
Procedure<TEnvironment> proc =
new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception);
completed.putIfAbsent(procId, new CompletedProcedureRetainer(proc));
completed.putIfAbsent(procId, new CompletedProcedureRetainer<>(proc));
}
// ==========================================================================
@ -851,7 +904,7 @@ public class ProcedureExecutor<TEnvironment> {
* @param proc the new procedure to execute.
* @return the procedure id, that can be used to monitor the operation
*/
public long submitProcedure(final Procedure proc) {
public long submitProcedure(Procedure<TEnvironment> proc) {
return submitProcedure(proc, null);
}
@ -863,7 +916,7 @@ public class ProcedureExecutor<TEnvironment> {
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
justification = "FindBugs is blind to the check-for-null")
public long submitProcedure(final Procedure proc, final NonceKey nonceKey) {
public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) {
Preconditions.checkArgument(lastProcId.get() >= 0);
prepareProcedure(proc);
@ -883,9 +936,7 @@ public class ProcedureExecutor<TEnvironment> {
// Commit the transaction
store.insert(proc, null);
if (LOG.isDebugEnabled()) {
LOG.debug("Stored " + proc);
}
LOG.debug("Stored {}", proc);
// Add the procedure to the executor
return pushProcedure(proc);
@ -896,7 +947,7 @@ public class ProcedureExecutor<TEnvironment> {
* @param procs the new procedures to execute.
*/
// TODO: Do we need to take nonces here?
public void submitProcedures(final Procedure[] procs) {
public void submitProcedures(Procedure<TEnvironment>[] procs) {
Preconditions.checkArgument(lastProcId.get() >= 0);
if (procs == null || procs.length <= 0) {
return;
@ -919,7 +970,7 @@ public class ProcedureExecutor<TEnvironment> {
}
}
private Procedure prepareProcedure(final Procedure proc) {
private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) {
Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
if (this.checkOwnerSet) {
@ -928,14 +979,14 @@ public class ProcedureExecutor<TEnvironment> {
return proc;
}
private long pushProcedure(final Procedure proc) {
private long pushProcedure(Procedure<TEnvironment> proc) {
final long currentProcId = proc.getProcId();
// Update metrics on start of a procedure
proc.updateMetricsOnSubmit(getEnvironment());
// Create the rollback stack for the procedure
RootProcedureState stack = new RootProcedureState();
RootProcedureState<TEnvironment> stack = new RootProcedureState<>();
rollbackStack.put(currentProcId, stack);
// Submit the new subprocedures
@ -952,7 +1003,7 @@ public class ProcedureExecutor<TEnvironment> {
* @param procId the procedure to abort
* @return true if the procedure exists and has received the abort, otherwise false.
*/
public boolean abort(final long procId) {
public boolean abort(long procId) {
return abort(procId, true);
}
@ -963,8 +1014,8 @@ public class ProcedureExecutor<TEnvironment> {
* @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
* @return true if the procedure exists and has received the abort, otherwise false.
*/
public boolean abort(final long procId, final boolean mayInterruptIfRunning) {
final Procedure proc = procedures.get(procId);
public boolean abort(long procId, boolean mayInterruptIfRunning) {
Procedure<TEnvironment> proc = procedures.get(procId);
if (proc != null) {
if (!mayInterruptIfRunning && proc.wasExecuted()) {
return false;
@ -977,20 +1028,20 @@ public class ProcedureExecutor<TEnvironment> {
// ==========================================================================
// Executor query helpers
// ==========================================================================
public Procedure getProcedure(final long procId) {
public Procedure<TEnvironment> getProcedure(final long procId) {
return procedures.get(procId);
}
public <T extends Procedure> T getProcedure(final Class<T> clazz, final long procId) {
final Procedure proc = getProcedure(procId);
public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) {
Procedure<TEnvironment> proc = getProcedure(procId);
if (clazz.isInstance(proc)) {
return (T)proc;
return clazz.cast(proc);
}
return null;
}
public Procedure getResult(final long procId) {
CompletedProcedureRetainer retainer = completed.get(procId);
public Procedure<TEnvironment> getResult(long procId) {
CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
if (retainer == null) {
return null;
} else {
@ -1014,8 +1065,8 @@ public class ProcedureExecutor<TEnvironment> {
* @param procId the ID of the procedure to check
* @return true if the procedure execution is started, otherwise false.
*/
public boolean isStarted(final long procId) {
final Procedure proc = procedures.get(procId);
public boolean isStarted(long procId) {
Procedure<?> proc = procedures.get(procId);
if (proc == null) {
return completed.get(procId) != null;
}
@ -1026,13 +1077,11 @@ public class ProcedureExecutor<TEnvironment> {
* Mark the specified completed procedure, as ready to remove.
* @param procId the ID of the procedure to remove
*/
public void removeResult(final long procId) {
CompletedProcedureRetainer retainer = completed.get(procId);
public void removeResult(long procId) {
CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
if (retainer == null) {
assert !procedures.containsKey(procId) : "pid=" + procId + " is still running";
if (LOG.isDebugEnabled()) {
LOG.debug("pid=" + procId + " already removed by the cleaner.");
}
LOG.debug("pid={} already removed by the cleaner.", procId);
return;
}
@ -1040,8 +1089,8 @@ public class ProcedureExecutor<TEnvironment> {
retainer.setClientAckTime(EnvironmentEdgeManager.currentTime());
}
public Procedure getResultOrProcedure(final long procId) {
CompletedProcedureRetainer retainer = completed.get(procId);
public Procedure<TEnvironment> getResultOrProcedure(long procId) {
CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
if (retainer == null) {
return procedures.get(procId);
} else {
@ -1056,15 +1105,16 @@ public class ProcedureExecutor<TEnvironment> {
* @return true if the user is the owner of the procedure,
* false otherwise or the owner is unknown.
*/
public boolean isProcedureOwner(final long procId, final User user) {
if (user == null) return false;
final Procedure runningProc = procedures.get(procId);
public boolean isProcedureOwner(long procId, User user) {
if (user == null) {
return false;
}
final Procedure<TEnvironment> runningProc = procedures.get(procId);
if (runningProc != null) {
return runningProc.getOwner().equals(user.getShortName());
}
final CompletedProcedureRetainer retainer = completed.get(procId);
final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
if (retainer != null) {
return retainer.getProcedure().getOwner().equals(user.getShortName());
}
@ -1078,19 +1128,17 @@ public class ProcedureExecutor<TEnvironment> {
* Get procedures.
* @return the procedures in a list
*/
public List<Procedure<?>> getProcedures() {
final List<Procedure<?>> procedureLists = new ArrayList<>(procedures.size() + completed.size());
for (Procedure<?> procedure : procedures.values()) {
procedureLists.add(procedure);
}
public List<Procedure<TEnvironment>> getProcedures() {
List<Procedure<TEnvironment>> procedureList =
new ArrayList<>(procedures.size() + completed.size());
procedureList.addAll(procedures.values());
// Note: The procedure could show up twice in the list with different state, as
// it could complete after we walk through procedures list and insert into
// procedureList - it is ok, as we will use the information in the Procedure
// to figure it out; to prevent this would increase the complexity of the logic.
for (CompletedProcedureRetainer retainer: completed.values()) {
procedureLists.add(retainer.getProcedure());
}
return procedureLists;
completed.values().stream().map(CompletedProcedureRetainer::getProcedure)
.forEach(procedureList::add);
return procedureList;
}
// ==========================================================================
@ -1169,14 +1217,14 @@ public class ProcedureExecutor<TEnvironment> {
return procedures.keySet();
}
Long getRootProcedureId(Procedure proc) {
Long getRootProcedureId(Procedure<TEnvironment> proc) {
return Procedure.getRootProcedureId(procedures, proc);
}
// ==========================================================================
// Executions
// ==========================================================================
private void executeProcedure(final Procedure proc) {
private void executeProcedure(Procedure<TEnvironment> proc) {
final Long rootProcId = getRootProcedureId(proc);
if (rootProcId == null) {
// The 'proc' was ready to run but the root procedure was rolledback
@ -1185,7 +1233,7 @@ public class ProcedureExecutor<TEnvironment> {
return;
}
final RootProcedureState procStack = rollbackStack.get(rootProcId);
RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
if (procStack == null) {
LOG.warn("RootProcedureState is null for " + proc.getProcId());
return;
@ -1197,7 +1245,7 @@ public class ProcedureExecutor<TEnvironment> {
// we have the 'rollback-lock' we can start rollingback
switch (executeRollback(rootProcId, procStack)) {
case LOCK_ACQUIRED:
break;
break;
case LOCK_YIELD_WAIT:
procStack.unsetRollback();
scheduler.yield(proc);
@ -1239,7 +1287,6 @@ public class ProcedureExecutor<TEnvironment> {
switch (lockState) {
case LOCK_ACQUIRED:
execProcedure(procStack, proc);
releaseLock(proc, false);
break;
case LOCK_YIELD_WAIT:
LOG.info(lockState + " " + proc);
@ -1254,12 +1301,6 @@ public class ProcedureExecutor<TEnvironment> {
}
procStack.release(proc);
// allows to kill the executor before something is stored to the wal.
// useful to test the procedure recovery.
if (testing != null && !isRunning()) {
break;
}
if (proc.isSuccess()) {
// update metrics on finishing the procedure
proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
@ -1275,33 +1316,31 @@ public class ProcedureExecutor<TEnvironment> {
} while (procStack.isFailed());
}
private LockState acquireLock(final Procedure proc) {
final TEnvironment env = getEnvironment();
// hasLock() is used in conjunction with holdLock().
// This allows us to not rewrite or carry around the hasLock() flag
// for every procedure. the hasLock() have meaning only if holdLock() is true.
if (proc.holdLock(env) && proc.hasLock(env)) {
private LockState acquireLock(Procedure<TEnvironment> proc) {
TEnvironment env = getEnvironment();
// if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if
// hasLock is true.
if (proc.hasLock()) {
return LockState.LOCK_ACQUIRED;
}
return proc.doAcquireLock(env);
return proc.doAcquireLock(env, store);
}
private void releaseLock(final Procedure proc, final boolean force) {
final TEnvironment env = getEnvironment();
private void releaseLock(Procedure<TEnvironment> proc, boolean force) {
TEnvironment env = getEnvironment();
// For how the framework works, we know that we will always have the lock
// when we call releaseLock(), so we can avoid calling proc.hasLock()
if (force || !proc.holdLock(env)) {
proc.doReleaseLock(env);
if (force || !proc.holdLock(env) || proc.isFinished()) {
proc.doReleaseLock(env, store);
}
}
/**
* Execute the rollback of the full procedure stack.
* Once the procedure is rolledback, the root-procedure will be visible as
* finished to user, and the result will be the fatal exception.
* Execute the rollback of the full procedure stack. Once the procedure is rolledback, the
* root-procedure will be visible as finished to user, and the result will be the fatal exception.
*/
private LockState executeRollback(final long rootProcId, final RootProcedureState procStack) {
final Procedure rootProc = procedures.get(rootProcId);
private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) {
Procedure<TEnvironment> rootProc = procedures.get(rootProcId);
RemoteProcedureException exception = rootProc.getException();
// TODO: This needs doc. The root proc doesn't have an exception. Maybe we are
// rolling back because the subprocedure does. Clarify.
@ -1311,13 +1350,13 @@ public class ProcedureExecutor<TEnvironment> {
store.update(rootProc);
}
final List<Procedure> subprocStack = procStack.getSubproceduresStack();
List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack();
assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
int stackTail = subprocStack.size();
boolean reuseLock = false;
while (stackTail --> 0) {
final Procedure proc = subprocStack.get(stackTail);
Procedure<TEnvironment> proc = subprocStack.get(stackTail);
LockState lockState;
if (!reuseLock && (lockState = acquireLock(proc)) != LockState.LOCK_ACQUIRED) {
@ -1334,7 +1373,7 @@ public class ProcedureExecutor<TEnvironment> {
// (e.g. StateMachineProcedure reuse the same instance)
// we can avoid to lock/unlock each step
reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback;
if (!reuseLock) {
if (!reuseLock && proc.hasLock()) {
releaseLock(proc, false);
}
@ -1368,13 +1407,11 @@ public class ProcedureExecutor<TEnvironment> {
* It updates the store with the new state (stack index)
* or will remove completly the procedure in case it is a child.
*/
private LockState executeRollback(final Procedure proc) {
private LockState executeRollback(Procedure<TEnvironment> proc) {
try {
proc.doRollback(getEnvironment());
} catch (IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Roll back attempt failed for " + proc, e);
}
LOG.debug("Roll back attempt failed for {}", proc, e);
return LockState.LOCK_YIELD_WAIT;
} catch (InterruptedException e) {
handleInterruptedException(proc, e);
@ -1387,9 +1424,10 @@ public class ProcedureExecutor<TEnvironment> {
// allows to kill the executor before something is stored to the wal.
// useful to test the procedure recovery.
if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
LOG.debug("TESTING: Kill before store update");
String msg = "TESTING: Kill before store update";
LOG.debug(msg);
stop();
return LockState.LOCK_YIELD_WAIT;
throw new RuntimeException(msg);
}
if (proc.removeStackIndex()) {
@ -1416,6 +1454,11 @@ public class ProcedureExecutor<TEnvironment> {
return LockState.LOCK_ACQUIRED;
}
private void yieldProcedure(Procedure<TEnvironment> proc) {
releaseLock(proc, false);
scheduler.yield(proc);
}
/**
* Executes <code>procedure</code>
* <ul>
@ -1445,10 +1488,10 @@ public class ProcedureExecutor<TEnvironment> {
* </li>
* </ul>
*/
private void execProcedure(final RootProcedureState procStack,
final Procedure<TEnvironment> procedure) {
private void execProcedure(RootProcedureState<TEnvironment> procStack,
Procedure<TEnvironment> procedure) {
Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
procedure.toString());
procedure.toString());
// Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
// The exception is caught below and then we hurry to the exit without disturbing state. The
@ -1475,22 +1518,16 @@ public class ProcedureExecutor<TEnvironment> {
subprocs = null;
}
} catch (ProcedureSuspendedException e) {
if (LOG.isTraceEnabled()) {
LOG.trace("Suspend " + procedure);
}
LOG.trace("Suspend {}", procedure);
suspended = true;
} catch (ProcedureYieldException e) {
if (LOG.isTraceEnabled()) {
LOG.trace("Yield " + procedure + ": " + e.getMessage(), e);
}
scheduler.yield(procedure);
LOG.trace("Yield {}", procedure, e);
yieldProcedure(procedure);
return;
} catch (InterruptedException e) {
if (LOG.isTraceEnabled()) {
LOG.trace("Yield interrupt " + procedure + ": " + e.getMessage(), e);
}
LOG.trace("Yield interrupt {}", procedure, e);
handleInterruptedException(procedure, e);
scheduler.yield(procedure);
yieldProcedure(procedure);
return;
} catch (Throwable e) {
// Catch NullPointerExceptions or similar errors...
@ -1506,9 +1543,7 @@ public class ProcedureExecutor<TEnvironment> {
// i.e. we go around this loop again rather than go back out on the scheduler queue.
subprocs = null;
reExecute = true;
if (LOG.isTraceEnabled()) {
LOG.trace("Short-circuit to next step on pid=" + procedure.getProcId());
}
LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId());
} else {
// Yield the current procedure, and make the subprocedure runnable
// subprocs may come back 'null'.
@ -1519,9 +1554,7 @@ public class ProcedureExecutor<TEnvironment> {
collect(Collectors.toList()).toString()));
}
} else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
if (LOG.isTraceEnabled()) {
LOG.trace("Added to timeoutExecutor " + procedure);
}
LOG.trace("Added to timeoutExecutor {}", procedure);
timeoutExecutor.add(procedure);
} else if (!suspended) {
// No subtask, so we are done
@ -1535,9 +1568,10 @@ public class ProcedureExecutor<TEnvironment> {
// allows to kill the executor before something is stored to the wal.
// useful to test the procedure recovery.
if (testing != null && testing.shouldKillBeforeStoreUpdate(suspended)) {
LOG.debug("TESTING: Kill before store update: " + procedure);
String msg = "TESTING: Kill before store update: " + procedure;
LOG.debug(msg);
stop();
return;
throw new RuntimeException(msg);
}
// TODO: The code here doesn't check if store is running before persisting to the store as
@ -1551,11 +1585,13 @@ public class ProcedureExecutor<TEnvironment> {
updateStoreOnExec(procStack, procedure, subprocs);
// if the store is not running we are aborting
if (!store.isRunning()) return;
if (!store.isRunning()) {
return;
}
// if the procedure is kind enough to pass the slot to someone else, yield
if (procedure.isRunnable() && !suspended &&
procedure.isYieldAfterExecutionStep(getEnvironment())) {
scheduler.yield(procedure);
yieldProcedure(procedure);
return;
}
@ -1566,6 +1602,11 @@ public class ProcedureExecutor<TEnvironment> {
submitChildrenProcedures(subprocs);
}
// we need to log the release lock operation before waking up the parent procedure, as there
// could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all
// the sub procedures from store and cause problems...
releaseLock(procedure, false);
// if the procedure is complete and has a parent, count down the children latch.
// If 'suspended', do nothing to change state -- let other threads handle unsuspend event.
if (!suspended && procedure.isFinished() && procedure.hasParent()) {
@ -1573,12 +1614,12 @@ public class ProcedureExecutor<TEnvironment> {
}
}
private Procedure[] initializeChildren(final RootProcedureState procStack,
final Procedure procedure, final Procedure[] subprocs) {
private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack,
Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
assert subprocs != null : "expected subprocedures";
final long rootProcId = getRootProcedureId(procedure);
for (int i = 0; i < subprocs.length; ++i) {
final Procedure subproc = subprocs[i];
Procedure<TEnvironment> subproc = subprocs[i];
if (subproc == null) {
String msg = "subproc[" + i + "] is null, aborting the procedure";
procedure.setFailure(new RemoteProcedureException(msg,
@ -1609,9 +1650,9 @@ public class ProcedureExecutor<TEnvironment> {
return subprocs;
}
private void submitChildrenProcedures(final Procedure[] subprocs) {
private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) {
for (int i = 0; i < subprocs.length; ++i) {
final Procedure subproc = subprocs[i];
Procedure<TEnvironment> subproc = subprocs[i];
subproc.updateMetricsOnSubmit(getEnvironment());
assert !procedures.containsKey(subproc.getProcId());
procedures.put(subproc.getProcId(), subproc);
@ -1619,8 +1660,9 @@ public class ProcedureExecutor<TEnvironment> {
}
}
private void countDownChildren(final RootProcedureState procStack, final Procedure procedure) {
final Procedure parent = procedures.get(procedure.getParentProcId());
private void countDownChildren(RootProcedureState<TEnvironment> procStack,
Procedure<TEnvironment> procedure) {
Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId());
if (parent == null) {
assert procStack.isRollingback();
return;
@ -1637,17 +1679,15 @@ public class ProcedureExecutor<TEnvironment> {
}
}
private void updateStoreOnExec(final RootProcedureState procStack,
final Procedure procedure, final Procedure[] subprocs) {
private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack,
Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
if (subprocs != null && !procedure.isFailed()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs));
}
store.insert(procedure, subprocs);
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Store update " + procedure);
}
LOG.trace("Store update {}", procedure);
if (procedure.isFinished() && !procedure.hasParent()) {
// remove child procedures
final long[] childProcIds = procStack.getSubprocedureIds();
@ -1665,11 +1705,8 @@ public class ProcedureExecutor<TEnvironment> {
}
}
private void handleInterruptedException(final Procedure proc, final InterruptedException e) {
if (LOG.isTraceEnabled()) {
LOG.trace("Interrupt during " + proc + ". suspend and retry it later.", e);
}
private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) {
LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e);
// NOTE: We don't call Thread.currentThread().interrupt()
// because otherwise all the subsequent calls e.g. Thread.sleep() will throw
// the InterruptedException. If the master is going down, we will be notified
@ -1677,9 +1714,13 @@ public class ProcedureExecutor<TEnvironment> {
// (The interrupted procedure will be retried on the next run)
}
private void execCompletionCleanup(final Procedure proc) {
private void execCompletionCleanup(Procedure<TEnvironment> proc) {
final TEnvironment env = getEnvironment();
if (proc.holdLock(env) && proc.hasLock(env)) {
if (proc.hasLock()) {
LOG.warn("Usually this should not happen, we will release the lock before if the procedure" +
" is finished, even if the holdLock is true, arrive here means we have some holes where" +
" we do not release the lock. And the releaseLock below may fail since the procedure may" +
" have already been deleted from the procedure store.");
releaseLock(proc, true);
}
try {
@ -1690,11 +1731,11 @@ public class ProcedureExecutor<TEnvironment> {
}
}
private void procedureFinished(final Procedure proc) {
private void procedureFinished(Procedure<TEnvironment> proc) {
// call the procedure completion cleanup handler
execCompletionCleanup(proc);
CompletedProcedureRetainer retainer = new CompletedProcedureRetainer(proc);
CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc);
// update the executor internal state maps
if (!proc.shouldWaitClientAck(getEnvironment())) {
@ -1710,14 +1751,14 @@ public class ProcedureExecutor<TEnvironment> {
scheduler.completionCleanup(proc);
} catch (Throwable e) {
// Catch NullPointerExceptions or similar errors...
LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: " + proc, e);
LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e);
}
// Notify the listeners
sendProcedureFinishedNotification(proc.getProcId());
}
RootProcedureState getProcStack(long rootProcId) {
RootProcedureState<TEnvironment> getProcStack(long rootProcId) {
return rollbackStack.get(rootProcId);
}
@ -1726,7 +1767,7 @@ public class ProcedureExecutor<TEnvironment> {
// ==========================================================================
private class WorkerThread extends StoppableThread {
private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
private volatile Procedure<?> activeProcedure;
private volatile Procedure<TEnvironment> activeProcedure;
public WorkerThread(ThreadGroup group) {
this(group, "PEWorker-");
@ -1747,7 +1788,7 @@ public class ProcedureExecutor<TEnvironment> {
long lastUpdate = EnvironmentEdgeManager.currentTime();
try {
while (isRunning() && keepAlive(lastUpdate)) {
Procedure<?> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (proc == null) {
continue;
}

View File

@ -202,6 +202,9 @@ public final class ProcedureUtil {
builder.setNonce(proc.getNonceKey().getNonce());
}
if (proc.hasLock()) {
builder.setLocked(true);
}
return builder.build();
}
@ -255,6 +258,10 @@ public final class ProcedureUtil {
proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce()));
}
if (proto.getLocked()) {
proc.lockedWhenLoading();
}
ProcedureStateSerializer serializer = null;
if (proto.getStateMessageCount() > 0) {

View File

@ -22,11 +22,9 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
/**
@ -42,8 +40,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
class RootProcedureState {
private static final Logger LOG = LoggerFactory.getLogger(RootProcedureState.class);
class RootProcedureState<TEnvironment> {
private enum State {
RUNNING, // The Procedure is running or ready to run
@ -51,8 +48,8 @@ class RootProcedureState {
ROLLINGBACK, // The Procedure failed and the execution was rolledback
}
private Set<Procedure> subprocs = null;
private ArrayList<Procedure> subprocStack = null;
private Set<Procedure<TEnvironment>> subprocs = null;
private ArrayList<Procedure<TEnvironment>> subprocStack = null;
private State state = State.RUNNING;
private int running = 0;
@ -91,22 +88,19 @@ class RootProcedureState {
}
protected synchronized long[] getSubprocedureIds() {
if (subprocs == null) return null;
int index = 0;
final long[] subIds = new long[subprocs.size()];
for (Procedure proc: subprocs) {
subIds[index++] = proc.getProcId();
if (subprocs == null) {
return null;
}
return subIds;
return subprocs.stream().mapToLong(Procedure::getProcId).toArray();
}
protected synchronized List<Procedure> getSubproceduresStack() {
protected synchronized List<Procedure<TEnvironment>> getSubproceduresStack() {
return subprocStack;
}
protected synchronized RemoteProcedureException getException() {
if (subprocStack != null) {
for (Procedure proc: subprocStack) {
for (Procedure<TEnvironment> proc: subprocStack) {
if (proc.hasException()) {
return proc.getException();
}
@ -118,8 +112,10 @@ class RootProcedureState {
/**
* Called by the ProcedureExecutor to mark the procedure step as running.
*/
protected synchronized boolean acquire(final Procedure proc) {
if (state != State.RUNNING) return false;
protected synchronized boolean acquire(Procedure<TEnvironment> proc) {
if (state != State.RUNNING) {
return false;
}
running++;
return true;
@ -128,7 +124,7 @@ class RootProcedureState {
/**
* Called by the ProcedureExecutor to mark the procedure step as finished.
*/
protected synchronized void release(final Procedure proc) {
protected synchronized void release(Procedure<TEnvironment> proc) {
running--;
}
@ -142,7 +138,7 @@ class RootProcedureState {
* Called by the ProcedureExecutor after the procedure step is completed,
* to add the step to the rollback list (or procedure stack)
*/
protected synchronized void addRollbackStep(final Procedure proc) {
protected synchronized void addRollbackStep(Procedure<TEnvironment> proc) {
if (proc.isFailed()) {
state = State.FAILED;
}
@ -153,8 +149,10 @@ class RootProcedureState {
subprocStack.add(proc);
}
protected synchronized void addSubProcedure(final Procedure proc) {
if (!proc.hasParent()) return;
protected synchronized void addSubProcedure(Procedure<TEnvironment> proc) {
if (!proc.hasParent()) {
return;
}
if (subprocs == null) {
subprocs = new HashSet<>();
}
@ -168,7 +166,7 @@ class RootProcedureState {
* to the store only the Procedure we executed, and nothing else.
* on load we recreate the full stack by aggregating each procedure stack-positions.
*/
protected synchronized void loadStack(final Procedure proc) {
protected synchronized void loadStack(Procedure<TEnvironment> proc) {
addSubProcedure(proc);
int[] stackIndexes = proc.getStackIndexes();
if (stackIndexes != null) {
@ -196,7 +194,7 @@ class RootProcedureState {
*/
protected synchronized boolean isValid() {
if (subprocStack != null) {
for (Procedure proc: subprocStack) {
for (Procedure<TEnvironment> proc : subprocStack) {
if (proc == null) {
return false;
}

View File

@ -31,15 +31,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
* @see InlineChore
*/
@InterfaceAudience.Private
class TimeoutExecutorThread extends StoppableThread {
class TimeoutExecutorThread<TEnvironment> extends StoppableThread {
private static final Logger LOG = LoggerFactory.getLogger(TimeoutExecutorThread.class);
private final ProcedureExecutor<?> executor;
private final ProcedureExecutor<TEnvironment> executor;
private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>();
public TimeoutExecutorThread(ProcedureExecutor<?> executor, ThreadGroup group) {
public TimeoutExecutorThread(ProcedureExecutor<TEnvironment> executor, ThreadGroup group) {
super(group, "ProcExecTimeout");
setDaemon(true);
this.executor = executor;
@ -65,7 +65,7 @@ class TimeoutExecutorThread extends StoppableThread {
if (task instanceof InlineChore) {
execInlineChore((InlineChore) task);
} else if (task instanceof DelayedProcedure) {
execDelayedProcedure((DelayedProcedure) task);
execDelayedProcedure((DelayedProcedure<TEnvironment>) task);
} else {
LOG.error("CODE-BUG unknown timeout task type {}", task);
}
@ -77,15 +77,15 @@ class TimeoutExecutorThread extends StoppableThread {
queue.add(chore);
}
public void add(Procedure<?> procedure) {
public void add(Procedure<TEnvironment> procedure) {
assert procedure.getState() == ProcedureState.WAITING_TIMEOUT;
LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(),
procedure.getTimeoutTimestamp());
queue.add(new DelayedProcedure(procedure));
queue.add(new DelayedProcedure<>(procedure));
}
public boolean remove(Procedure<?> procedure) {
return queue.remove(new DelayedProcedure(procedure));
public boolean remove(Procedure<TEnvironment> procedure) {
return queue.remove(new DelayedProcedure<>(procedure));
}
private void execInlineChore(InlineChore chore) {
@ -93,13 +93,13 @@ class TimeoutExecutorThread extends StoppableThread {
add(chore);
}
private void execDelayedProcedure(DelayedProcedure delayed) {
private void execDelayedProcedure(DelayedProcedure<TEnvironment> delayed) {
// TODO: treat this as a normal procedure, add it to the scheduler and
// let one of the workers handle it.
// Today we consider ProcedureInMemoryChore as InlineChores
Procedure<?> procedure = delayed.getObject();
Procedure<TEnvironment> procedure = delayed.getObject();
if (procedure instanceof ProcedureInMemoryChore) {
executeInMemoryChore((ProcedureInMemoryChore) procedure);
executeInMemoryChore((ProcedureInMemoryChore<TEnvironment>) procedure);
// if the procedure is in a waiting state again, put it back in the queue
procedure.updateTimestamp();
if (procedure.isWaiting()) {
@ -111,7 +111,7 @@ class TimeoutExecutorThread extends StoppableThread {
}
}
private void executeInMemoryChore(ProcedureInMemoryChore chore) {
private void executeInMemoryChore(ProcedureInMemoryChore<TEnvironment> chore) {
if (!chore.isWaiting()) {
return;
}
@ -126,12 +126,12 @@ class TimeoutExecutorThread extends StoppableThread {
}
}
private void executeTimedoutProcedure(Procedure proc) {
private void executeTimedoutProcedure(Procedure<TEnvironment> proc) {
// The procedure received a timeout. if the procedure itself does not handle it,
// call abort() and add the procedure back in the queue for rollback.
if (proc.setTimeoutFailure(executor.getEnvironment())) {
long rootProcId = executor.getRootProcedureId(proc);
RootProcedureState procStack = executor.getProcStack(rootProcId);
RootProcedureState<TEnvironment> procStack = executor.getProcStack(rootProcId);
procStack.abort();
executor.getStore().update(proc);
executor.getScheduler().addFront(proc);

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
@ -42,7 +43,12 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value;
@Category({MasterTests.class, LargeTests.class})
/**
* For now we do not guarantee this, we will restore the locks when restarting ProcedureExecutor so
* we should use lock to obtain the correct order. Ignored.
*/
@Ignore
@Category({ MasterTests.class, LargeTests.class })
public class TestProcedureReplayOrder {
@ClassRule

View File

@ -227,7 +227,6 @@ public class TestProcedureSuspended {
protected void releaseLock(final TestProcEnv env) {
LOG.info("RELEASE LOCK " + this + " " + hasLock);
lock.set(false);
hasLock = false;
}
@Override
@ -235,11 +234,6 @@ public class TestProcedureSuspended {
return true;
}
@Override
protected boolean hasLock(final TestProcEnv env) {
return hasLock;
}
public ArrayList<Long> getTimestamps() {
return timestamps;
}

View File

@ -63,6 +63,9 @@ message Procedure {
// Nonce to prevent same procedure submit by multiple times
optional uint64 nonce_group = 13 [default = 0];
optional uint64 nonce = 14 [default = 0];
// whether the procedure has held the lock
optional bool locked = 16 [default = false];
}
/**

View File

@ -81,8 +81,8 @@ class ClusterSchemaServiceImpl extends AbstractService implements ClusterSchemaS
return this.tableNamespaceManager;
}
private long submitProcedure(final Procedure<?> procedure, final NonceKey nonceKey)
throws ServiceNotRunningException {
private long submitProcedure(final Procedure<MasterProcedureEnv> procedure,
final NonceKey nonceKey) throws ServiceNotRunningException {
checkIsRunning();
ProcedureExecutor<MasterProcedureEnv> pe = this.masterServices.getMasterProcedureExecutor();
return pe.submitProcedure(procedure, nonceKey);

View File

@ -923,7 +923,7 @@ public class HMaster extends HRegionServer implements MasterServices {
InitMetaProcedure initMetaProc = null;
if (assignmentManager.getRegionStates().getRegionState(RegionInfoBuilder.FIRST_META_REGIONINFO)
.isOffline()) {
Optional<Procedure<?>> optProc = procedureExecutor.getProcedures().stream()
Optional<Procedure<MasterProcedureEnv>> optProc = procedureExecutor.getProcedures().stream()
.filter(p -> p instanceof InitMetaProcedure).findAny();
if (optProc.isPresent()) {
initMetaProc = (InitMetaProcedure) optProc.get();
@ -3202,7 +3202,8 @@ public class HMaster extends HRegionServer implements MasterServices {
cpHost.preGetProcedures();
}
final List<Procedure<?>> procList = this.procedureExecutor.getProcedures();
@SuppressWarnings({ "unchecked", "rawtypes" })
List<Procedure<?>> procList = (List) this.procedureExecutor.getProcedures();
if (cpHost != null) {
cpHost.postGetProcedures(procList);
@ -3717,7 +3718,7 @@ public class HMaster extends HRegionServer implements MasterServices {
HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoadSourceMap =
new HashMap<>(peerList.size());
peerList.stream()
.forEach(peer -> replicationLoadSourceMap.put(peer.getPeerId(), new ArrayList()));
.forEach(peer -> replicationLoadSourceMap.put(peer.getPeerId(), new ArrayList<>()));
for (ServerName serverName : serverNames) {
List<ReplicationLoadSource> replicationLoadSources =
getServerManager().getLoad(serverName).getReplicationLoadSourceList();

View File

@ -148,9 +148,4 @@ public class GCRegionProcedure extends AbstractStateMachineRegionProcedure<GCReg
serializer.deserialize(MasterProcedureProtos.GCRegionStateData.class);
setRegion(ProtobufUtil.toRegionInfo(msg.getRegionInfo()));
}
@Override
protected org.apache.hadoop.hbase.procedure2.Procedure.LockState acquireLock(MasterProcedureEnv env) {
return super.acquireLock(env);
}
}

View File

@ -82,7 +82,6 @@ public class MergeTableRegionsProcedure
extends AbstractStateMachineTableProcedure<MergeTableRegionsState> {
private static final Logger LOG = LoggerFactory.getLogger(MergeTableRegionsProcedure.class);
private Boolean traceEnabled;
private volatile boolean lock = false;
private ServerName regionLocation;
private RegionInfo[] regionsToMerge;
private RegionInfo mergedRegion;
@ -420,24 +419,20 @@ public class MergeTableRegionsProcedure
@Override
protected LockState acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT;
if (env.getProcedureScheduler().waitRegions(this, getTableName(),
mergedRegion, regionsToMerge[0], regionsToMerge[1])) {
try {
LOG.debug(LockState.LOCK_EVENT_WAIT + " " + env.getProcedureScheduler().dumpLocks());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
// Ignore, just for logging
}
return LockState.LOCK_EVENT_WAIT;
}
this.lock = true;
return LockState.LOCK_ACQUIRED;
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
this.lock = false;
env.getProcedureScheduler().wakeRegions(this, getTableName(),
mergedRegion, regionsToMerge[0], regionsToMerge[1]);
}
@ -447,11 +442,6 @@ public class MergeTableRegionsProcedure
return true;
}
@Override
protected boolean hasLock(MasterProcedureEnv env) {
return this.lock;
}
@Override
public TableName getTableName() {
return mergedRegion.getTable();

View File

@ -34,14 +34,16 @@ import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
/**
* Base class for the Assign and Unassign Procedure.
*
@ -114,8 +116,6 @@ public abstract class RegionTransitionProcedure
*/
private int attempt;
private volatile boolean lock = false;
// Required by the Procedure framework to create the procedure on replay
public RegionTransitionProcedure() {}
@ -419,15 +419,17 @@ public abstract class RegionTransitionProcedure
}
@Override
protected LockState acquireLock(final MasterProcedureEnv env) {
protected boolean waitInitialized(MasterProcedureEnv env) {
// Unless we are assigning meta, wait for meta to be available and loaded.
if (!isMeta()) {
AssignmentManager am = env.getAssignmentManager();
if (am.waitMetaLoaded(this) || am.waitMetaAssigned(this, regionInfo)) {
return LockState.LOCK_EVENT_WAIT;
}
if (isMeta()) {
return false;
}
AssignmentManager am = env.getAssignmentManager();
return am.waitMetaLoaded(this) || am.waitMetaAssigned(this, regionInfo);
}
@Override
protected LockState acquireLock(final MasterProcedureEnv env) {
// TODO: Revisit this and move it to the executor
if (env.getProcedureScheduler().waitRegion(this, getRegionInfo())) {
try {
@ -438,14 +440,12 @@ public abstract class RegionTransitionProcedure
}
return LockState.LOCK_EVENT_WAIT;
}
this.lock = true;
return LockState.LOCK_ACQUIRED;
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureScheduler().wakeRegion(this, getRegionInfo());
lock = false;
}
@Override
@ -453,11 +453,6 @@ public abstract class RegionTransitionProcedure
return true;
}
@Override
protected boolean hasLock(final MasterProcedureEnv env) {
return lock;
}
@Override
protected boolean shouldWaitClientAck(MasterProcedureEnv env) {
// The operation is triggered internally on the server

View File

@ -76,8 +76,6 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
private String description;
// True when recovery of master lock from WALs
private boolean recoveredMasterLock;
// this is for internal working
private boolean hasLock;
private final ProcedureEvent<LockProcedure> event = new ProcedureEvent<>(this);
// True if this proc acquired relevant locks. This value is for client checks.
@ -306,7 +304,6 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
protected LockState acquireLock(final MasterProcedureEnv env) {
boolean ret = lock.acquireLock(env);
locked.set(ret);
hasLock = ret;
if (ret) {
if (LOG.isDebugEnabled()) {
LOG.debug("LOCKED " + toString());
@ -321,7 +318,6 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
@Override
protected void releaseLock(final MasterProcedureEnv env) {
lock.releaseLock(env);
hasLock = false;
}
/**
@ -423,11 +419,6 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
return true;
}
@Override
public boolean hasLock(final MasterProcedureEnv env) {
return hasLock;
}
///////////////////////
// LOCK IMPLEMENTATIONS
///////////////////////

View File

@ -65,9 +65,13 @@ public abstract class AbstractStateMachineNamespaceProcedure<TState>
sb.append(getNamespaceName());
}
@Override
protected boolean waitInitialized(MasterProcedureEnv env) {
return env.waitInitialized(this);
}
@Override
protected LockState acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT;
if (env.getProcedureScheduler().waitNamespaceExclusiveLock(this, getNamespaceName())) {
return LockState.LOCK_EVENT_WAIT;
}

View File

@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
public abstract class AbstractStateMachineRegionProcedure<TState>
extends AbstractStateMachineTableProcedure<TState> {
private RegionInfo hri;
private volatile boolean lock = false;
public AbstractStateMachineRegionProcedure(final MasterProcedureEnv env,
final RegionInfo hri) {
@ -100,25 +99,17 @@ public abstract class AbstractStateMachineRegionProcedure<TState>
@Override
protected LockState acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT;
if (env.getProcedureScheduler().waitRegions(this, getTableName(), getRegion())) {
return LockState.LOCK_EVENT_WAIT;
}
this.lock = true;
return LockState.LOCK_ACQUIRED;
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
this.lock = false;
env.getProcedureScheduler().wakeRegions(this, getTableName(), getRegion());
}
@Override
protected boolean hasLock(final MasterProcedureEnv env) {
return this.lock;
}
protected void setFailure(Throwable cause) {
super.setFailure(getClass().getSimpleName(), cause);
}

View File

@ -88,11 +88,13 @@ public abstract class AbstractStateMachineTableProcedure<TState>
sb.append(getTableName());
}
@Override
protected boolean waitInitialized(MasterProcedureEnv env) {
return env.waitInitialized(this);
}
@Override
protected LockState acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) {
return LockState.LOCK_EVENT_WAIT;
}
if (env.getProcedureScheduler().waitTableExclusiveLock(this, getTableName())) {
return LockState.LOCK_EVENT_WAIT;
}

View File

@ -131,7 +131,7 @@ public class CreateNamespaceProcedure
@Override
protected CreateNamespaceState getState(final int stateId) {
return CreateNamespaceState.valueOf(stateId);
return CreateNamespaceState.forNumber(stateId);
}
@Override
@ -171,15 +171,18 @@ public class CreateNamespaceProcedure
}
@Override
protected LockState acquireLock(final MasterProcedureEnv env) {
if (!env.getMasterServices().isInitialized()) {
// Namespace manager might not be ready if master is not fully initialized,
// return false to reject user namespace creation; return true for default
// and system namespace creation (this is part of master initialization).
if (!isBootstrapNamespace() && env.waitInitialized(this)) {
return LockState.LOCK_EVENT_WAIT;
}
protected boolean waitInitialized(MasterProcedureEnv env) {
// Namespace manager might not be ready if master is not fully initialized,
// return false to reject user namespace creation; return true for default
// and system namespace creation (this is part of master initialization).
if (isBootstrapNamespace()) {
return false;
}
return env.waitInitialized(this);
}
@Override
protected LockState acquireLock(final MasterProcedureEnv env) {
if (env.getProcedureScheduler().waitNamespaceExclusiveLock(this, getNamespaceName())) {
return LockState.LOCK_EVENT_WAIT;
}
@ -263,20 +266,6 @@ public class CreateNamespaceProcedure
}
}
/**
* remove quota for the namespace if exists
* @param env MasterProcedureEnv
* @throws IOException
**/
private void rollbackSetNamespaceQuota(final MasterProcedureEnv env) throws IOException {
try {
DeleteNamespaceProcedure.removeNamespaceQuota(env, nsDescriptor.getName());
} catch (Exception e) {
// Ignore exception
LOG.debug("Rollback of setNamespaceQuota throws exception: " + e);
}
}
private static TableNamespaceManager getTableNamespaceManager(final MasterProcedureEnv env) {
return env.getMasterServices().getClusterSchema().getTableNamespaceManager();
}

View File

@ -220,10 +220,16 @@ public class CreateTableProcedure
}
@Override
protected LockState acquireLock(final MasterProcedureEnv env) {
if (!getTableName().isSystemTable() && env.waitInitialized(this)) {
return LockState.LOCK_EVENT_WAIT;
protected boolean waitInitialized(MasterProcedureEnv env) {
if (getTableName().isSystemTable()) {
// Creating system table is part of the initialization, so do not wait here.
return false;
}
return super.waitInitialized(env);
}
@Override
protected LockState acquireLock(final MasterProcedureEnv env) {
if (env.getProcedureScheduler().waitTableExclusiveLock(this, getTableName())) {
return LockState.LOCK_EVENT_WAIT;
}

View File

@ -63,8 +63,13 @@ public class InitMetaProcedure extends AbstractStateMachineTableProcedure<InitMe
}
@Override
protected LockState acquireLock(MasterProcedureEnv env) {
protected boolean waitInitialized(MasterProcedureEnv env) {
// we do not need to wait for master initialized, we are part of the initialization.
return false;
}
@Override
protected LockState acquireLock(MasterProcedureEnv env) {
if (env.getProcedureScheduler().waitTableExclusiveLock(this, getTableName())) {
return LockState.LOCK_EVENT_WAIT;
}

View File

@ -143,21 +143,13 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
private <T extends Comparable<T>> void doAdd(final FairQueue<T> fairq,
final Queue<T> queue, final Procedure<?> proc, final boolean addFront) {
if (!queue.getLockStatus().hasExclusiveLock() ||
queue.getLockStatus().isLockOwner(proc.getProcId())) {
// if the queue was not remove for an xlock execution
// or the proc is the lock owner, put the queue back into execution
if (!queue.getLockStatus().hasExclusiveLock()) {
// if the queue was not remove for an xlock execution,put the queue back into execution
queue.add(proc, addFront);
addToRunQueue(fairq, queue);
} else if (queue.getLockStatus().hasParentLock(proc)) {
// always add it to front as its parent has the xlock
// usually the addFront is true if we arrive here as we will call addFront for adding sub
// proc, but sometimes we may retry on the proc which means we will arrive here through yield,
// so it is possible the addFront here is false.
} else if (queue.getLockStatus().hasLockAccess(proc)) {
// always add it to front as the have the lock access.
queue.add(proc, true);
// our (proc) parent has the xlock,
// so the queue is not in the fairq (run-queue)
// add it back to let the child run (inherit the lock)
addToRunQueue(fairq, queue);
} else {
queue.add(proc, addFront);
@ -386,9 +378,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
if (proc != null) {
priority = MasterProcedureUtil.getServerPriority(proc);
} else {
LOG.warn("Usually this should not happen as proc can only be null when calling from " +
"wait/wake lock, which means at least we should have one procedure in the queue which " +
"wants to acquire the lock or just released the lock.");
priority = 1;
}
node = new ServerQueue(serverName, priority, locking.getServerLock(serverName));
@ -848,9 +837,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
try {
final LockAndQueue lock = locking.getServerLock(serverName);
if (lock.tryExclusiveLock(procedure)) {
// We do not need to create a new queue so just pass null, as in tests we may pass
// procedures other than ServerProcedureInterface
removeFromRunQueue(serverRunQueue, getServerQueue(serverName, null));
// In tests we may pass procedures other than ServerProcedureInterface, just pass null if
// so.
removeFromRunQueue(serverRunQueue,
getServerQueue(serverName,
procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
: null));
return false;
}
waitProcedure(lock, procedure);
@ -873,9 +865,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
final LockAndQueue lock = locking.getServerLock(serverName);
// Only SCP will acquire/release server lock so do not need to check the return value here.
lock.releaseExclusiveLock(procedure);
// We do not need to create a new queue so just pass null, as in tests we may pass procedures
// other than ServerProcedureInterface
addToRunQueue(serverRunQueue, getServerQueue(serverName, null));
// In tests we may pass procedures other than ServerProcedureInterface, just pass null if
// so.
addToRunQueue(serverRunQueue,
getServerQueue(serverName,
procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
: null));
int waitingCount = wakeWaitingProcedures(lock);
wakePollIfNeeded(waitingCount);
} finally {

View File

@ -99,7 +99,7 @@ public final class MasterProcedureUtil {
protected abstract void run() throws IOException;
protected abstract String getDescription();
protected long submitProcedure(final Procedure<?> proc) {
protected long submitProcedure(final Procedure<MasterProcedureEnv> proc) {
assert procId == null : "submitProcedure() was already called, running procId=" + procId;
procId = getProcedureExecutor().submitProcedure(proc, nonceKey);
return procId;

View File

@ -29,20 +29,6 @@ class PeerQueue extends Queue<String> {
super(peerId, lockStatus);
}
@Override
public boolean isAvailable() {
if (isEmpty()) {
return false;
}
if (getLockStatus().hasExclusiveLock()) {
// if we have an exclusive lock already taken
// only child of the lock owner can be executed
Procedure<?> nextProc = peek();
return nextProc != null && getLockStatus().hasLockAccess(nextProc);
}
return true;
}
@Override
public boolean requireExclusiveLock(Procedure<?> proc) {
return requirePeerExclusiveLock((PeerProcedureInterface) proc);

View File

@ -106,7 +106,7 @@ public final class ProcedureSyncWait {
}
public static Future<byte[]> submitProcedure(final ProcedureExecutor<MasterProcedureEnv> procExec,
final Procedure<?> proc) {
final Procedure<MasterProcedureEnv> proc) {
if (proc.isInitializing()) {
procExec.submitProcedure(proc);
}
@ -114,7 +114,7 @@ public final class ProcedureSyncWait {
}
public static byte[] submitAndWaitProcedure(ProcedureExecutor<MasterProcedureEnv> procExec,
final Procedure<?> proc) throws IOException {
final Procedure<MasterProcedureEnv> proc) throws IOException {
if (proc.isInitializing()) {
procExec.submitProcedure(proc);
}

View File

@ -63,7 +63,18 @@ abstract class Queue<TKey extends Comparable<TKey>> extends AvlLinkedNode<Queue<
// This should go away when we have the new AM and its events
// and we move xlock to the lock-event-queue.
public boolean isAvailable() {
return !lockStatus.hasExclusiveLock() && !isEmpty();
if (isEmpty()) {
return false;
}
if (getLockStatus().hasExclusiveLock()) {
// If we have an exclusive lock already taken, only child of the lock owner can be executed
// And now we will restore locks when master restarts, so it is possible that the procedure
// which is holding the lock is also in the queue, so we need to use hasLockAccess here
// instead of hasParentLock
Procedure<?> nextProc = peek();
return nextProc != null && getLockStatus().hasLockAccess(nextProc);
}
return true;
}
// ======================================================================

View File

@ -36,8 +36,6 @@ public abstract class AbstractPeerProcedure<TState>
protected String peerId;
private volatile boolean locked;
// used to keep compatible with old client where we can only returns after updateStorage.
protected ProcedurePrepareLatch latch;
@ -58,18 +56,21 @@ public abstract class AbstractPeerProcedure<TState>
return peerId;
}
@Override
protected boolean waitInitialized(MasterProcedureEnv env) {
return env.waitInitialized(this);
}
@Override
protected LockState acquireLock(MasterProcedureEnv env) {
if (env.getProcedureScheduler().waitPeerExclusiveLock(this, peerId)) {
return LockState.LOCK_EVENT_WAIT;
}
locked = true;
return LockState.LOCK_ACQUIRED;
}
@Override
protected void releaseLock(MasterProcedureEnv env) {
locked = false;
env.getProcedureScheduler().wakePeerExclusiveLock(this, peerId);
}
@ -78,11 +79,6 @@ public abstract class AbstractPeerProcedure<TState>
return true;
}
@Override
protected boolean hasLock(MasterProcedureEnv env) {
return locked;
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);

View File

@ -46,7 +46,7 @@
long millisFromLastRoll = walStore.getMillisFromLastRoll();
ArrayList<ProcedureWALFile> procedureWALFiles = walStore.getActiveLogs();
Set<ProcedureWALFile> corruptedWALFiles = walStore.getCorruptedLogs();
List<Procedure<?>> procedures = procExecutor.getProcedures();
List<Procedure<MasterProcedureEnv>> procedures = procExecutor.getProcedures();
Collections.sort(procedures, new Comparator<Procedure>() {
@Override
public int compare(Procedure lhs, Procedure rhs) {

View File

@ -124,7 +124,7 @@ public class TestGetProcedureResult {
@Test
public void testRace() throws Exception {
ProcedureExecutor<?> executor =
ProcedureExecutor<MasterProcedureEnv> executor =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
DummyProcedure p = new DummyProcedure();
long procId = executor.submitProcedure(p);

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
import org.apache.hadoop.hbase.procedure2.Procedure;
@ -434,7 +435,7 @@ public class TestAssignmentManager {
am.wakeMetaLoadedEvent();
}
private Future<byte[]> submitProcedure(final Procedure<?> proc) {
private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
}

View File

@ -111,7 +111,7 @@ public class TestMasterProcedureEvents {
}
private void testProcedureEventWaitWake(final HMaster master, final ProcedureEvent<?> event,
final Procedure<?> proc) throws Exception {
final Procedure<MasterProcedureEnv> proc) throws Exception {
final ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
final MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureScheduler();

View File

@ -207,7 +207,7 @@ public class TestProcedureAdmin {
// Wait for one step to complete
ProcedureTestingUtility.waitProcedure(procExec, procId);
List<Procedure<?>> procedures = procExec.getProcedures();
List<Procedure<MasterProcedureEnv>> procedures = procExec.getProcedures();
assertTrue(procedures.size() >= 1);
boolean found = false;
for (Procedure<?> proc: procedures) {

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -88,7 +89,7 @@ public class TestFailedProcCleanup {
LOG.debug("Ignoring exception: ", e);
Thread.sleep(evictionDelay * 3);
}
List<Procedure<?>> procedureInfos =
List<Procedure<MasterProcedureEnv>> procedureInfos =
TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getProcedures();
for (Procedure procedureInfo : procedureInfos) {
if (procedureInfo.getProcName().equals("CreateTableProcedure")
@ -109,7 +110,7 @@ public class TestFailedProcCleanup {
LOG.debug("Ignoring exception: ", e);
Thread.sleep(evictionDelay * 3);
}
List<Procedure<?>> procedureInfos =
List<Procedure<MasterProcedureEnv>> procedureInfos =
TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getProcedures();
for (Procedure procedureInfo : procedureInfos) {
if (procedureInfo.getProcName().equals("CreateTableProcedure")

View File

@ -587,7 +587,7 @@ public class TestAccessController extends SecureTestUtil {
Procedure proc = new TestTableDDLProcedure(procExec.getEnvironment(), tableName);
proc.setOwner(USER_OWNER);
procExec.submitProcedure(proc);
final List<Procedure<?>> procList = procExec.getProcedures();
final List<Procedure<MasterProcedureEnv>> procList = procExec.getProcedures();
AccessTestAction getProceduresAction = new AccessTestAction() {
@Override