HBASE-16587 Procedure v2 - Cleanup suspended proc execution
This commit is contained in:
parent
5f7e642fed
commit
e01e05cc0e
|
@ -163,6 +163,23 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
// no-op
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to keep the procedure lock even when the procedure is yielding or suspended.
|
||||
* @return true if the procedure should hold on the lock until completionCleanup()
|
||||
*/
|
||||
protected boolean holdLock(final TEnvironment env) {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* This is used in conjuction with holdLock(). If holdLock() is true
|
||||
* the procedure executor will not call acquireLock() if hasLock() is true.
|
||||
* @return true if the procedure has the lock, false otherwise.
|
||||
*/
|
||||
protected boolean hasLock(final TEnvironment env) {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the procedure is loaded for replay.
|
||||
* The procedure implementor may use this method to perform some quick
|
||||
|
@ -173,6 +190,14 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
// no-op
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the procedure is ready to be added to the queue after
|
||||
* the loading/replay operation.
|
||||
*/
|
||||
protected void afterReplay(final TEnvironment env) {
|
||||
// no-op
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the procedure is marked as completed (success or rollback).
|
||||
* The procedure implementor may use this method to cleanup in-memory states.
|
||||
|
@ -339,6 +364,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
return state == ProcedureState.RUNNABLE;
|
||||
}
|
||||
|
||||
public synchronized boolean isInitializing() {
|
||||
return state == ProcedureState.INITIALIZING;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the procedure has failed.
|
||||
* true may mean failed but not yet rolledback or failed and rolledback.
|
||||
|
@ -479,8 +508,12 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
setFailure(source, new ProcedureAbortedException(msg));
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
protected synchronized boolean setTimeoutFailure() {
|
||||
/**
|
||||
* Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
|
||||
* @return true to let the framework handle the timeout as abort,
|
||||
* false in case the procedure handled the timeout itself.
|
||||
*/
|
||||
protected synchronized boolean setTimeoutFailure(final TEnvironment env) {
|
||||
if (state == ProcedureState.WAITING_TIMEOUT) {
|
||||
long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
|
||||
setFailure("ProcedureExecutor", new TimeoutIOException(
|
||||
|
@ -548,6 +581,24 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal method called by the ProcedureExecutor that starts the
|
||||
* user-level code acquireLock().
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
protected boolean doAcquireLock(final TEnvironment env) {
|
||||
return acquireLock(env);
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal method called by the ProcedureExecutor that starts the
|
||||
* user-level code releaseLock().
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
protected void doReleaseLock(final TEnvironment env) {
|
||||
releaseLock(env);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called on store load to initialize the Procedure internals after
|
||||
* the creation/deserialization.
|
||||
|
@ -601,6 +652,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
return childrenLatch > 0;
|
||||
}
|
||||
|
||||
protected synchronized int getChildrenLatch() {
|
||||
return childrenLatch;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by the RootProcedureState on procedure execution.
|
||||
* Each procedure store its stack-index positions.
|
||||
|
|
|
@ -438,6 +438,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
// some procedure may be started way before this stuff.
|
||||
for (int i = runnableList.size() - 1; i >= 0; --i) {
|
||||
Procedure proc = runnableList.get(i);
|
||||
proc.afterReplay(getEnvironment());
|
||||
if (!proc.hasParent()) {
|
||||
sendProcedureLoadedNotification(proc.getProcId());
|
||||
}
|
||||
|
@ -857,9 +858,9 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
|
||||
// Execute the procedure
|
||||
assert proc.getState() == ProcedureState.RUNNABLE : proc;
|
||||
if (proc.acquireLock(getEnvironment())) {
|
||||
if (acquireLock(proc)) {
|
||||
execProcedure(procStack, proc);
|
||||
proc.releaseLock(getEnvironment());
|
||||
releaseLock(proc, false);
|
||||
} else {
|
||||
runnables.yield(proc);
|
||||
}
|
||||
|
@ -879,12 +880,34 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
// Finalize the procedure state
|
||||
if (proc.getProcId() == rootProcId) {
|
||||
procedureFinished(proc);
|
||||
} else {
|
||||
execCompletionCleanup(proc);
|
||||
}
|
||||
break;
|
||||
}
|
||||
} while (procStack.isFailed());
|
||||
}
|
||||
|
||||
private boolean acquireLock(final Procedure proc) {
|
||||
final TEnvironment env = getEnvironment();
|
||||
// hasLock() is used in conjunction with holdLock().
|
||||
// This allows us to not rewrite or carry around the hasLock() flag
|
||||
// for every procedure. the hasLock() have meaning only if holdLock() is true.
|
||||
if (proc.holdLock(env) && proc.hasLock(env)) {
|
||||
return true;
|
||||
}
|
||||
return proc.doAcquireLock(env);
|
||||
}
|
||||
|
||||
private void releaseLock(final Procedure proc, final boolean force) {
|
||||
final TEnvironment env = getEnvironment();
|
||||
// for how the framework works, we know that we will always have the lock
|
||||
// when we call releaseLock(), so we can avoid calling proc.hasLock()
|
||||
if (force || !proc.holdLock(env)) {
|
||||
proc.doReleaseLock(env);
|
||||
}
|
||||
}
|
||||
|
||||
private void timeoutLoop() {
|
||||
while (isRunning()) {
|
||||
Procedure proc = waitingTimeout.poll();
|
||||
|
@ -921,15 +944,17 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
continue;
|
||||
}
|
||||
|
||||
// The procedure received an "abort-timeout", call abort() and
|
||||
// add the procedure back in the queue for rollback.
|
||||
if (proc.setTimeoutFailure()) {
|
||||
// The procedure received a timeout. if the procedure itself does not handle it,
|
||||
// call abort() and add the procedure back in the queue for rollback.
|
||||
if (proc.setTimeoutFailure(getEnvironment())) {
|
||||
long rootProcId = Procedure.getRootProcedureId(procedures, proc);
|
||||
RootProcedureState procStack = rollbackStack.get(rootProcId);
|
||||
procStack.abort();
|
||||
store.update(proc);
|
||||
runnables.addFront(proc);
|
||||
continue;
|
||||
} else if (proc.getState() == ProcedureState.WAITING_TIMEOUT) {
|
||||
waitingTimeout.add(proc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -940,7 +965,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
* finished to user, and the result will be the fatal exception.
|
||||
*/
|
||||
private boolean executeRollback(final long rootProcId, final RootProcedureState procStack) {
|
||||
Procedure rootProc = procedures.get(rootProcId);
|
||||
final Procedure rootProc = procedures.get(rootProcId);
|
||||
RemoteProcedureException exception = rootProc.getException();
|
||||
if (exception == null) {
|
||||
exception = procStack.getException();
|
||||
|
@ -948,7 +973,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
store.update(rootProc);
|
||||
}
|
||||
|
||||
List<Procedure> subprocStack = procStack.getSubproceduresStack();
|
||||
final List<Procedure> subprocStack = procStack.getSubproceduresStack();
|
||||
assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
|
||||
|
||||
int stackTail = subprocStack.size();
|
||||
|
@ -956,7 +981,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
while (stackTail --> 0) {
|
||||
final Procedure proc = subprocStack.get(stackTail);
|
||||
|
||||
if (!reuseLock && !proc.acquireLock(getEnvironment())) {
|
||||
if (!reuseLock && !acquireLock(proc)) {
|
||||
// can't take a lock on the procedure, add the root-proc back on the
|
||||
// queue waiting for the lock availability
|
||||
return false;
|
||||
|
@ -970,7 +995,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
// we can avoid to lock/unlock each step
|
||||
reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback;
|
||||
if (!reuseLock) {
|
||||
proc.releaseLock(getEnvironment());
|
||||
releaseLock(proc, false);
|
||||
}
|
||||
|
||||
// allows to kill the executor before something is stored to the wal.
|
||||
|
@ -985,6 +1010,10 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
if (proc.isYieldAfterExecutionStep(getEnvironment())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (proc != rootProc) {
|
||||
execCompletionCleanup(proc);
|
||||
}
|
||||
}
|
||||
|
||||
// Finalize the procedure state
|
||||
|
@ -1302,14 +1331,22 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
return Procedure.getRootProcedureId(procedures, proc);
|
||||
}
|
||||
|
||||
private void procedureFinished(final Procedure proc) {
|
||||
// call the procedure completion cleanup handler
|
||||
private void execCompletionCleanup(final Procedure proc) {
|
||||
final TEnvironment env = getEnvironment();
|
||||
if (proc.holdLock(env) && proc.hasLock(env)) {
|
||||
releaseLock(proc, true);
|
||||
}
|
||||
try {
|
||||
proc.completionCleanup(getEnvironment());
|
||||
proc.completionCleanup(env);
|
||||
} catch (Throwable e) {
|
||||
// Catch NullPointerExceptions or similar errors...
|
||||
LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void procedureFinished(final Procedure proc) {
|
||||
// call the procedure completion cleanup handler
|
||||
execCompletionCleanup(proc);
|
||||
|
||||
// update the executor internal state maps
|
||||
ProcedureInfo procInfo = Procedure.createProcedureInfo(proc, proc.getNonceKey());
|
||||
|
|
|
@ -65,14 +65,23 @@ public class RemoteProcedureException extends ProcedureException {
|
|||
return source;
|
||||
}
|
||||
|
||||
public IOException unwrapRemoteException() {
|
||||
if (getCause() instanceof RemoteException) {
|
||||
return ((RemoteException)getCause()).unwrapRemoteException();
|
||||
public Exception unwrapRemoteException() {
|
||||
final Throwable cause = getCause();
|
||||
if (cause instanceof RemoteException) {
|
||||
return ((RemoteException)cause).unwrapRemoteException();
|
||||
}
|
||||
if (getCause() instanceof IOException) {
|
||||
return (IOException)getCause();
|
||||
if (cause instanceof Exception) {
|
||||
return (Exception)cause;
|
||||
}
|
||||
return new IOException(getCause());
|
||||
return new Exception(cause);
|
||||
}
|
||||
|
||||
public IOException unwrapRemoteIOException() {
|
||||
final Exception cause = unwrapRemoteException();
|
||||
if (cause instanceof IOException) {
|
||||
return (IOException)cause;
|
||||
}
|
||||
return new IOException(cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -131,7 +131,9 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
|
|||
subProcList = new ArrayList<Procedure>(subProcedure.length);
|
||||
}
|
||||
for (int i = 0; i < subProcedure.length; ++i) {
|
||||
subProcList.add(subProcedure[i]);
|
||||
Procedure proc = subProcedure[i];
|
||||
if (!proc.hasOwner()) proc.setOwner(getOwner());
|
||||
subProcList.add(proc);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,270 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
|
||||
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@Category({MasterTests.class, SmallTests.class})
|
||||
public class TestProcedureSuspended {
|
||||
private static final Log LOG = LogFactory.getLog(TestProcedureSuspended.class);
|
||||
|
||||
private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
|
||||
private static final Procedure NULL_PROC = null;
|
||||
|
||||
private ProcedureExecutor<TestProcEnv> procExecutor;
|
||||
private ProcedureStore procStore;
|
||||
|
||||
private HBaseCommonTestingUtility htu;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
htu = new HBaseCommonTestingUtility();
|
||||
|
||||
procStore = new NoopProcedureStore();
|
||||
procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore);
|
||||
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
|
||||
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
procExecutor.stop();
|
||||
procStore.stop(false);
|
||||
}
|
||||
|
||||
@Test(timeout=10000)
|
||||
public void testSuspendWhileHoldingLocks() {
|
||||
final AtomicBoolean lockA = new AtomicBoolean(false);
|
||||
final AtomicBoolean lockB = new AtomicBoolean(false);
|
||||
|
||||
final TestLockProcedure p1keyA = new TestLockProcedure(lockA, "keyA", false, true);
|
||||
final TestLockProcedure p2keyA = new TestLockProcedure(lockA, "keyA", false, true);
|
||||
final TestLockProcedure p3keyB = new TestLockProcedure(lockB, "keyB", false, true);
|
||||
|
||||
procExecutor.submitProcedure(p1keyA);
|
||||
procExecutor.submitProcedure(p2keyA);
|
||||
procExecutor.submitProcedure(p3keyB);
|
||||
|
||||
// first run p1, p3 are able to run p2 is blocked by p1
|
||||
waitAndAssertTimestamp(p1keyA, 1, 1);
|
||||
waitAndAssertTimestamp(p2keyA, 0, -1);
|
||||
waitAndAssertTimestamp(p3keyB, 1, 2);
|
||||
assertEquals(true, lockA.get());
|
||||
assertEquals(true, lockB.get());
|
||||
|
||||
// release p3
|
||||
p3keyB.setThrowSuspend(false);
|
||||
procExecutor.getRunnableSet().addFront(p3keyB);
|
||||
waitAndAssertTimestamp(p1keyA, 1, 1);
|
||||
waitAndAssertTimestamp(p2keyA, 0, -1);
|
||||
waitAndAssertTimestamp(p3keyB, 2, 3);
|
||||
assertEquals(true, lockA.get());
|
||||
|
||||
// wait until p3 is fully completed
|
||||
ProcedureTestingUtility.waitProcedure(procExecutor, p3keyB);
|
||||
assertEquals(false, lockB.get());
|
||||
|
||||
// rollback p2 and wait until is fully completed
|
||||
p1keyA.setTriggerRollback(true);
|
||||
procExecutor.getRunnableSet().addFront(p1keyA);
|
||||
ProcedureTestingUtility.waitProcedure(procExecutor, p1keyA);
|
||||
|
||||
// p2 should start and suspend
|
||||
waitAndAssertTimestamp(p1keyA, 4, 60000);
|
||||
waitAndAssertTimestamp(p2keyA, 1, 7);
|
||||
waitAndAssertTimestamp(p3keyB, 2, 3);
|
||||
assertEquals(true, lockA.get());
|
||||
|
||||
// wait until p2 is fully completed
|
||||
p2keyA.setThrowSuspend(false);
|
||||
procExecutor.getRunnableSet().addFront(p2keyA);
|
||||
ProcedureTestingUtility.waitProcedure(procExecutor, p2keyA);
|
||||
waitAndAssertTimestamp(p1keyA, 4, 60000);
|
||||
waitAndAssertTimestamp(p2keyA, 2, 8);
|
||||
waitAndAssertTimestamp(p3keyB, 2, 3);
|
||||
assertEquals(false, lockA.get());
|
||||
assertEquals(false, lockB.get());
|
||||
}
|
||||
|
||||
@Test(timeout=10000)
|
||||
public void testYieldWhileHoldingLocks() {
|
||||
final AtomicBoolean lock = new AtomicBoolean(false);
|
||||
|
||||
final TestLockProcedure p1 = new TestLockProcedure(lock, "key", true, false);
|
||||
final TestLockProcedure p2 = new TestLockProcedure(lock, "key", true, false);
|
||||
|
||||
procExecutor.submitProcedure(p1);
|
||||
procExecutor.submitProcedure(p2);
|
||||
|
||||
// try to execute a bunch of yield on p1, p2 should be blocked
|
||||
while (p1.getTimestamps().size() < 100) Threads.sleep(10);
|
||||
assertEquals(0, p2.getTimestamps().size());
|
||||
|
||||
// wait until p1 is completed
|
||||
p1.setThrowYield(false);
|
||||
ProcedureTestingUtility.waitProcedure(procExecutor, p1);
|
||||
|
||||
// try to execute a bunch of yield on p2
|
||||
while (p2.getTimestamps().size() < 100) Threads.sleep(10);
|
||||
assertEquals(p1.getTimestamps().get(p1.getTimestamps().size() - 1).longValue() + 1,
|
||||
p2.getTimestamps().get(0).longValue());
|
||||
|
||||
// wait until p2 is completed
|
||||
p1.setThrowYield(false);
|
||||
ProcedureTestingUtility.waitProcedure(procExecutor, p1);
|
||||
}
|
||||
|
||||
private void waitAndAssertTimestamp(TestLockProcedure proc, int size, int lastTs) {
|
||||
final ArrayList<Long> timestamps = proc.getTimestamps();
|
||||
while (timestamps.size() < size) Threads.sleep(10);
|
||||
LOG.info(proc + " -> " + timestamps);
|
||||
assertEquals(size, timestamps.size());
|
||||
if (size > 0) {
|
||||
assertEquals(lastTs, timestamps.get(timestamps.size() - 1).longValue());
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestLockProcedure extends Procedure<TestProcEnv> {
|
||||
private final ArrayList<Long> timestamps = new ArrayList<Long>();
|
||||
private final String key;
|
||||
|
||||
private boolean triggerRollback = false;
|
||||
private boolean throwSuspend = false;
|
||||
private boolean throwYield = false;
|
||||
private AtomicBoolean lock = null;
|
||||
private boolean hasLock = false;
|
||||
|
||||
public TestLockProcedure(final AtomicBoolean lock, final String key,
|
||||
final boolean throwYield, final boolean throwSuspend) {
|
||||
this.lock = lock;
|
||||
this.key = key;
|
||||
this.throwYield = throwYield;
|
||||
this.throwSuspend = throwSuspend;
|
||||
}
|
||||
|
||||
public void setThrowYield(final boolean throwYield) {
|
||||
this.throwYield = throwYield;
|
||||
}
|
||||
|
||||
public void setThrowSuspend(final boolean throwSuspend) {
|
||||
this.throwSuspend = throwSuspend;
|
||||
}
|
||||
|
||||
public void setTriggerRollback(final boolean triggerRollback) {
|
||||
this.triggerRollback = triggerRollback;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Procedure[] execute(final TestProcEnv env)
|
||||
throws ProcedureYieldException, ProcedureSuspendedException {
|
||||
LOG.info("EXECUTE " + this + " suspend " + (lock != null));
|
||||
timestamps.add(env.nextTimestamp());
|
||||
if (triggerRollback) {
|
||||
setFailure(getClass().getSimpleName(), new Exception("injected failure"));
|
||||
} else if (throwYield) {
|
||||
throw new ProcedureYieldException();
|
||||
} else if (throwSuspend) {
|
||||
throw new ProcedureSuspendedException();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void rollback(final TestProcEnv env) {
|
||||
LOG.info("ROLLBACK " + this);
|
||||
timestamps.add(env.nextTimestamp() * 10000);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean acquireLock(final TestProcEnv env) {
|
||||
if ((hasLock = lock.compareAndSet(false, true))) {
|
||||
LOG.info("ACQUIRE LOCK " + this + " " + (hasLock));
|
||||
}
|
||||
return hasLock;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void releaseLock(final TestProcEnv env) {
|
||||
LOG.info("RELEASE LOCK " + this + " " + hasLock);
|
||||
lock.set(false);
|
||||
hasLock = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean holdLock(final TestProcEnv env) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean hasLock(final TestProcEnv env) {
|
||||
return hasLock;
|
||||
}
|
||||
|
||||
public ArrayList<Long> getTimestamps() {
|
||||
return timestamps;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void toStringClassDetails(StringBuilder builder) {
|
||||
builder.append(getClass().getName());
|
||||
builder.append("(" + key + ")");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean abort(TestProcEnv env) { return false; }
|
||||
|
||||
@Override
|
||||
protected void serializeStateData(final OutputStream stream) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deserializeStateData(final InputStream stream) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestProcEnv {
|
||||
public final AtomicLong timestamp = new AtomicLong(0);
|
||||
|
||||
public long nextTimestamp() {
|
||||
return timestamp.incrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -297,7 +297,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
TableProcedureInterface iProcTable = (TableProcedureInterface)proc;
|
||||
boolean tableDeleted;
|
||||
if (proc.hasException()) {
|
||||
IOException procEx = proc.getException().unwrapRemoteException();
|
||||
Exception procEx = proc.getException().unwrapRemoteException();
|
||||
if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
|
||||
// create failed because the table already exist
|
||||
tableDeleted = !(procEx instanceof TableExistsException);
|
||||
|
@ -1628,4 +1628,4 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
return Math.max(1, queue.getPriority() * quantum); // TODO
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,7 +82,7 @@ public abstract class ProcedurePrepareLatch {
|
|||
|
||||
protected void countDown(final Procedure proc) {
|
||||
if (proc.hasException()) {
|
||||
exception = proc.getException().unwrapRemoteException();
|
||||
exception = proc.getException().unwrapRemoteIOException();
|
||||
}
|
||||
latch.countDown();
|
||||
}
|
||||
|
|
|
@ -83,7 +83,8 @@ public final class ProcedureSyncWait {
|
|||
if (result.isFailed()) {
|
||||
// If the procedure fails, we should always have an exception captured. Throw it.
|
||||
throw RemoteProcedureException.fromProto(
|
||||
result.getForeignExceptionMessage().getForeignExchangeMessage()).unwrapRemoteException();
|
||||
result.getForeignExceptionMessage().getForeignExchangeMessage())
|
||||
.unwrapRemoteIOException();
|
||||
}
|
||||
return result.getResult();
|
||||
} else {
|
||||
|
|
|
@ -18,6 +18,11 @@
|
|||
|
||||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -29,9 +34,13 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
|
@ -173,4 +182,85 @@ public class TestMasterProcedureEvents {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Test(timeout=30000)
|
||||
public void testTimeoutEventProcedure() throws Exception {
|
||||
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
|
||||
ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
|
||||
MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue();
|
||||
|
||||
TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, 5);
|
||||
procExec.submitProcedure(proc);
|
||||
|
||||
ProcedureTestingUtility.waitProcedure(procExec, proc.getProcId());
|
||||
ProcedureTestingUtility.assertIsAbortException(procExec.getResult(proc.getProcId()));
|
||||
}
|
||||
|
||||
public static class TestTimeoutEventProcedure
|
||||
extends Procedure<MasterProcedureEnv> implements TableProcedureInterface {
|
||||
private final ProcedureEvent event = new ProcedureEvent("timeout-event");
|
||||
|
||||
private final AtomicInteger ntimeouts = new AtomicInteger(0);
|
||||
private int maxTimeouts = 1;
|
||||
|
||||
public TestTimeoutEventProcedure() {}
|
||||
|
||||
public TestTimeoutEventProcedure(final int timeoutMsec, final int maxTimeouts) {
|
||||
this.maxTimeouts = maxTimeouts;
|
||||
setTimeout(timeoutMsec);
|
||||
setOwner("test");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Procedure[] execute(final MasterProcedureEnv env)
|
||||
throws ProcedureSuspendedException {
|
||||
LOG.info("EXECUTE " + this + " ntimeouts=" + ntimeouts);
|
||||
if (ntimeouts.get() > maxTimeouts) {
|
||||
setAbortFailure("test", "give up after " + ntimeouts.get());
|
||||
return null;
|
||||
}
|
||||
|
||||
env.getProcedureQueue().suspendEvent(event);
|
||||
if (env.getProcedureQueue().waitEvent(event, this)) {
|
||||
setState(ProcedureState.WAITING_TIMEOUT);
|
||||
throw new ProcedureSuspendedException();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void rollback(final MasterProcedureEnv env) {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean setTimeoutFailure(final MasterProcedureEnv env) {
|
||||
int n = ntimeouts.incrementAndGet();
|
||||
LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n);
|
||||
setState(ProcedureState.RUNNABLE);
|
||||
env.getProcedureQueue().wakeEvent(event);
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableName getTableName() {
|
||||
return TableName.valueOf("testtb");
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableOperationType getTableOperationType() {
|
||||
return TableOperationType.READ;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean abort(MasterProcedureEnv env) { return false; }
|
||||
|
||||
@Override
|
||||
protected void serializeStateData(final OutputStream stream) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deserializeStateData(final InputStream stream) throws IOException {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue