HBASE-13759 Procedure v2 - Improve procedure yielding
This commit is contained in:
parent
a016b23e85
commit
d86f2fa3b2
|
@ -82,10 +82,13 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
* The main code of the procedure. It must be idempotent since execute()
|
||||
* may be called multiple time in case of machine failure in the middle
|
||||
* of the execution.
|
||||
* @param env the environment passed to the ProcedureExecutor
|
||||
* @return a set of sub-procedures or null if there is nothing else to execute.
|
||||
* @throw ProcedureYieldException the procedure will be added back to the queue and retried later
|
||||
* @throw InterruptedException the procedure will be added back to the queue and retried later
|
||||
*/
|
||||
protected abstract Procedure[] execute(TEnvironment env)
|
||||
throws ProcedureYieldException;
|
||||
throws ProcedureYieldException, InterruptedException;
|
||||
|
||||
/**
|
||||
* The code to undo what done by the execute() code.
|
||||
|
@ -94,10 +97,12 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
* the execute() call. The implementation must be idempotent since rollback()
|
||||
* may be called multiple time in case of machine failure in the middle
|
||||
* of the execution.
|
||||
* @param env the environment passed to the ProcedureExecutor
|
||||
* @throws IOException temporary failure, the rollback will retry later
|
||||
* @throw InterruptedException the procedure will be added back to the queue and retried later
|
||||
*/
|
||||
protected abstract void rollback(TEnvironment env)
|
||||
throws IOException;
|
||||
throws IOException, InterruptedException;
|
||||
|
||||
/**
|
||||
* The abort() call is asynchronous and each procedure must decide how to deal
|
||||
|
@ -169,12 +174,14 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
}
|
||||
|
||||
/**
|
||||
* By default, the executor will run procedures start to finish. Return true to make the executor
|
||||
* yield between each flow step to give other procedures time to run their flow steps.
|
||||
* @return Return true if the executor should yield on completion of a flow state step.
|
||||
* Defaults to return false.
|
||||
* By default, the executor will try ro run procedures start to finish.
|
||||
* Return true to make the executor yield between each execution step to
|
||||
* give other procedures time to run their steps.
|
||||
* @param env the environment passed to the ProcedureExecutor
|
||||
* @return Return true if the executor should yield on completion of an execution step.
|
||||
* Defaults to return false.
|
||||
*/
|
||||
protected boolean isYieldAfterSuccessfulFlowStateStep() {
|
||||
protected boolean isYieldAfterExecutionStep(final TEnvironment env) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -404,7 +411,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
protected Procedure[] doExecute(final TEnvironment env)
|
||||
throws ProcedureYieldException {
|
||||
throws ProcedureYieldException, InterruptedException {
|
||||
try {
|
||||
updateTimestamp();
|
||||
return execute(env);
|
||||
|
@ -418,7 +425,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
* user-level code rollback().
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
protected void doRollback(final TEnvironment env) throws IOException {
|
||||
protected void doRollback(final TEnvironment env)
|
||||
throws IOException, InterruptedException {
|
||||
try {
|
||||
updateTimestamp();
|
||||
rollback(env);
|
||||
|
|
|
@ -732,6 +732,12 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
procedureFinished(proc);
|
||||
break;
|
||||
}
|
||||
|
||||
// if the procedure is kind enough to pass the slot to someone else, yield
|
||||
if (proc.isYieldAfterExecutionStep(getEnvironment())) {
|
||||
runnables.yield(proc);
|
||||
break;
|
||||
}
|
||||
} while (procStack.isFailed());
|
||||
}
|
||||
|
||||
|
@ -828,6 +834,11 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
}
|
||||
|
||||
subprocStack.remove(stackTail);
|
||||
|
||||
// if the procedure is kind enough to pass the slot to someone else, yield
|
||||
if (proc.isYieldAfterExecutionStep(getEnvironment())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Finalize the procedure state
|
||||
|
@ -851,6 +862,9 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
LOG.debug("rollback attempt failed for " + proc, e);
|
||||
}
|
||||
return false;
|
||||
} catch (InterruptedException e) {
|
||||
handleInterruptedException(proc, e);
|
||||
return false;
|
||||
} catch (Throwable e) {
|
||||
// Catch NullPointerExceptions or similar errors...
|
||||
LOG.fatal("CODE-BUG: Uncatched runtime exception for procedure: " + proc, e);
|
||||
|
@ -859,9 +873,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
// allows to kill the executor before something is stored to the wal.
|
||||
// useful to test the procedure recovery.
|
||||
if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("TESTING: Kill before store update");
|
||||
}
|
||||
LOG.debug("TESTING: Kill before store update");
|
||||
stop();
|
||||
return false;
|
||||
}
|
||||
|
@ -877,6 +889,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
} else {
|
||||
store.update(proc);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -912,10 +925,14 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
}
|
||||
} catch (ProcedureYieldException e) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Yield procedure: " + procedure);
|
||||
LOG.trace("Yield procedure: " + procedure + ": " + e.getMessage());
|
||||
}
|
||||
runnables.yield(procedure);
|
||||
return;
|
||||
} catch (InterruptedException e) {
|
||||
handleInterruptedException(procedure, e);
|
||||
runnables.yield(procedure);
|
||||
return;
|
||||
} catch (Throwable e) {
|
||||
// Catch NullPointerExceptions or similar errors...
|
||||
String msg = "CODE-BUG: Uncatched runtime exception for procedure: " + procedure;
|
||||
|
@ -974,9 +991,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
// allows to kill the executor before something is stored to the wal.
|
||||
// useful to test the procedure recovery.
|
||||
if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("TESTING: Kill before store update");
|
||||
}
|
||||
LOG.debug("TESTING: Kill before store update");
|
||||
stop();
|
||||
return;
|
||||
}
|
||||
|
@ -999,6 +1014,11 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
return;
|
||||
}
|
||||
|
||||
// if the procedure is kind enough to pass the slot to someone else, yield
|
||||
if (reExecute && procedure.isYieldAfterExecutionStep(getEnvironment())) {
|
||||
return;
|
||||
}
|
||||
|
||||
assert (reExecute && subprocs == null) || !reExecute;
|
||||
} while (reExecute);
|
||||
|
||||
|
@ -1035,6 +1055,18 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
}
|
||||
}
|
||||
|
||||
private void handleInterruptedException(final Procedure proc, final InterruptedException e) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("got an interrupt during " + proc + ". suspend and retry it later.", e);
|
||||
}
|
||||
|
||||
// NOTE: We don't call Thread.currentThread().interrupt()
|
||||
// because otherwise all the subsequent calls e.g. Thread.sleep() will throw
|
||||
// the InterruptedException. If the master is going down, we will be notified
|
||||
// and the executor/store will be stopped.
|
||||
// (The interrupted procedure will be retried on the next run)
|
||||
}
|
||||
|
||||
private void sendProcedureLoadedNotification(final long procId) {
|
||||
if (!this.listeners.isEmpty()) {
|
||||
for (ProcedureExecutorListener listener: this.listeners) {
|
||||
|
|
|
@ -42,7 +42,7 @@ public abstract class SequentialProcedure<TEnvironment> extends Procedure<TEnvir
|
|||
|
||||
@Override
|
||||
protected Procedure[] doExecute(final TEnvironment env)
|
||||
throws ProcedureYieldException {
|
||||
throws ProcedureYieldException, InterruptedException {
|
||||
updateTimestamp();
|
||||
try {
|
||||
Procedure[] children = !executed ? execute(env) : null;
|
||||
|
@ -54,7 +54,8 @@ public abstract class SequentialProcedure<TEnvironment> extends Procedure<TEnvir
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doRollback(final TEnvironment env) throws IOException {
|
||||
protected void doRollback(final TEnvironment env)
|
||||
throws IOException, InterruptedException {
|
||||
updateTimestamp();
|
||||
if (executed) {
|
||||
try {
|
||||
|
|
|
@ -57,7 +57,7 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
|
|||
* Flow.HAS_MORE_STATE if there is another step.
|
||||
*/
|
||||
protected abstract Flow executeFromState(TEnvironment env, TState state)
|
||||
throws ProcedureYieldException;
|
||||
throws ProcedureYieldException, InterruptedException;
|
||||
|
||||
/**
|
||||
* called to perform the rollback of the specified state
|
||||
|
@ -65,7 +65,7 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
|
|||
* @throws IOException temporary failure, the rollback will retry later
|
||||
*/
|
||||
protected abstract void rollbackState(TEnvironment env, TState state)
|
||||
throws IOException;
|
||||
throws IOException, InterruptedException;
|
||||
|
||||
/**
|
||||
* Convert an ordinal (or state id) to an Enum (or more descriptive) state object.
|
||||
|
@ -95,12 +95,24 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
|
|||
setNextState(getStateId(state));
|
||||
}
|
||||
|
||||
/**
|
||||
* By default, the executor will try ro run all the steps of the procedure start to finish.
|
||||
* Return true to make the executor yield between execution steps to
|
||||
* give other procedures time to run their steps.
|
||||
* @param state the state we are going to execute next.
|
||||
* @return Return true if the executor should yield before the execution of the specified step.
|
||||
* Defaults to return false.
|
||||
*/
|
||||
protected boolean isYieldBeforeExecuteFromState(TEnvironment env, TState state) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Procedure[] execute(final TEnvironment env)
|
||||
throws ProcedureYieldException {
|
||||
throws ProcedureYieldException, InterruptedException {
|
||||
updateTimestamp();
|
||||
try {
|
||||
TState state = stateCount > 0 ? getState(states[stateCount-1]) : getInitialState();
|
||||
TState state = getCurrentState();
|
||||
if (stateCount == 0) {
|
||||
setNextState(getStateId(state));
|
||||
}
|
||||
|
@ -115,16 +127,26 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void rollback(final TEnvironment env) throws IOException {
|
||||
protected void rollback(final TEnvironment env)
|
||||
throws IOException, InterruptedException {
|
||||
try {
|
||||
updateTimestamp();
|
||||
rollbackState(env, stateCount > 0 ? getState(states[stateCount-1]) : getInitialState());
|
||||
rollbackState(env, getCurrentState());
|
||||
stateCount--;
|
||||
} finally {
|
||||
updateTimestamp();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isYieldAfterExecutionStep(final TEnvironment env) {
|
||||
return isYieldBeforeExecuteFromState(env, getCurrentState());
|
||||
}
|
||||
|
||||
private TState getCurrentState() {
|
||||
return stateCount > 0 ? getState(states[stateCount-1]) : getInitialState();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the next state for the procedure.
|
||||
* @param stateId the ordinal() of the state enum (or state id)
|
||||
|
|
|
@ -0,0 +1,286 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
@Category({MasterTests.class, SmallTests.class})
|
||||
public class TestYieldProcedures {
|
||||
private static final Log LOG = LogFactory.getLog(TestYieldProcedures.class);
|
||||
|
||||
private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
|
||||
private static final Procedure NULL_PROC = null;
|
||||
|
||||
private ProcedureExecutor<TestProcEnv> procExecutor;
|
||||
private ProcedureStore procStore;
|
||||
|
||||
private HBaseCommonTestingUtility htu;
|
||||
private FileSystem fs;
|
||||
private Path testDir;
|
||||
private Path logDir;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
htu = new HBaseCommonTestingUtility();
|
||||
testDir = htu.getDataTestDir();
|
||||
fs = testDir.getFileSystem(htu.getConfiguration());
|
||||
assertTrue(testDir.depth() > 1);
|
||||
|
||||
logDir = new Path(testDir, "proc-logs");
|
||||
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
|
||||
procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore);
|
||||
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
|
||||
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
procExecutor.stop();
|
||||
procStore.stop(false);
|
||||
fs.delete(logDir, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testYieldEachExecutionStep() throws Exception {
|
||||
final int NUM_STATES = 3;
|
||||
|
||||
TestStateMachineProcedure[] procs = new TestStateMachineProcedure[3];
|
||||
for (int i = 0; i < procs.length; ++i) {
|
||||
procs[i] = new TestStateMachineProcedure(true, false);
|
||||
procExecutor.submitProcedure(procs[i]);
|
||||
}
|
||||
ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
|
||||
|
||||
// verify yield during execute()
|
||||
long prevTimestamp = 0;
|
||||
for (int execStep = 0; execStep < NUM_STATES; ++execStep) {
|
||||
for (int i = 0; i < procs.length; ++i) {
|
||||
assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size());
|
||||
TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(execStep);
|
||||
LOG.info("i=" + i + " execStep=" + execStep + " timestamp=" + info.getTimestamp());
|
||||
assertEquals(false, info.isRollback());
|
||||
assertEquals(execStep, info.getStep().ordinal());
|
||||
assertEquals(prevTimestamp + 1, info.getTimestamp());
|
||||
prevTimestamp++;
|
||||
}
|
||||
}
|
||||
|
||||
// verify yield during rollback()
|
||||
int count = NUM_STATES;
|
||||
for (int execStep = NUM_STATES - 1; execStep >= 0; --execStep) {
|
||||
for (int i = 0; i < procs.length; ++i) {
|
||||
assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size());
|
||||
TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(count);
|
||||
LOG.info("i=" + i + " execStep=" + execStep + " timestamp=" + info.getTimestamp());
|
||||
assertEquals(true, info.isRollback());
|
||||
assertEquals(execStep, info.getStep().ordinal());
|
||||
assertEquals(prevTimestamp + 1, info.getTimestamp());
|
||||
prevTimestamp++;
|
||||
}
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testYieldOnInterrupt() throws Exception {
|
||||
final int NUM_STATES = 3;
|
||||
int count = 0;
|
||||
|
||||
TestStateMachineProcedure proc = new TestStateMachineProcedure(true, true);
|
||||
ProcedureTestingUtility.submitAndWait(procExecutor, proc);
|
||||
|
||||
// test execute (we execute steps twice, one has the IE the other completes)
|
||||
assertEquals(NUM_STATES * 4, proc.getExecutionInfo().size());
|
||||
for (int i = 0; i < NUM_STATES; ++i) {
|
||||
TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
|
||||
assertEquals(false, info.isRollback());
|
||||
assertEquals(i, info.getStep().ordinal());
|
||||
|
||||
info = proc.getExecutionInfo().get(count++);
|
||||
assertEquals(false, info.isRollback());
|
||||
assertEquals(i, info.getStep().ordinal());
|
||||
}
|
||||
|
||||
// test rollback (we execute steps twice, one has the IE the other completes)
|
||||
for (int i = NUM_STATES - 1; i >= 0; --i) {
|
||||
TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
|
||||
assertEquals(true, info.isRollback());
|
||||
assertEquals(i, info.getStep().ordinal());
|
||||
|
||||
info = proc.getExecutionInfo().get(count++);
|
||||
assertEquals(true, info.isRollback());
|
||||
assertEquals(i, info.getStep().ordinal());
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestProcEnv {
|
||||
public final AtomicLong timestamp = new AtomicLong(0);
|
||||
|
||||
public long nextTimestamp() {
|
||||
return timestamp.incrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestStateMachineProcedure
|
||||
extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
|
||||
enum State { STATE_1, STATE_2, STATE_3 }
|
||||
|
||||
public class ExecutionInfo {
|
||||
private final boolean rollback;
|
||||
private final long timestamp;
|
||||
private final State step;
|
||||
|
||||
public ExecutionInfo(long timestamp, State step, boolean isRollback) {
|
||||
this.timestamp = timestamp;
|
||||
this.step = step;
|
||||
this.rollback = isRollback;
|
||||
}
|
||||
|
||||
public State getStep() { return step; }
|
||||
public long getTimestamp() { return timestamp; }
|
||||
public boolean isRollback() { return rollback; }
|
||||
}
|
||||
|
||||
private final ArrayList<ExecutionInfo> executionInfo = new ArrayList<ExecutionInfo>();
|
||||
private final AtomicBoolean aborted = new AtomicBoolean(false);
|
||||
private final boolean throwInterruptOnceOnEachStep;
|
||||
private final boolean abortOnFinalStep;
|
||||
|
||||
public TestStateMachineProcedure() {
|
||||
this(false, false);
|
||||
}
|
||||
|
||||
public TestStateMachineProcedure(boolean abortOnFinalStep,
|
||||
boolean throwInterruptOnceOnEachStep) {
|
||||
this.abortOnFinalStep = abortOnFinalStep;
|
||||
this.throwInterruptOnceOnEachStep = throwInterruptOnceOnEachStep;
|
||||
}
|
||||
|
||||
public ArrayList<ExecutionInfo> getExecutionInfo() {
|
||||
return executionInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state)
|
||||
throws InterruptedException {
|
||||
LOG.info("execute step " + state);
|
||||
executionInfo.add(new ExecutionInfo(env.nextTimestamp(), state, false));
|
||||
Thread.sleep(150);
|
||||
|
||||
if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
|
||||
LOG.debug("THROW INTERRUPT");
|
||||
throw new InterruptedException("test interrupt");
|
||||
}
|
||||
|
||||
switch (state) {
|
||||
case STATE_1:
|
||||
setNextState(State.STATE_2);
|
||||
break;
|
||||
case STATE_2:
|
||||
setNextState(State.STATE_3);
|
||||
break;
|
||||
case STATE_3:
|
||||
if (abortOnFinalStep) {
|
||||
setFailure("test", new IOException("Requested abort on final step"));
|
||||
}
|
||||
return Flow.NO_MORE_STATE;
|
||||
default:
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
return Flow.HAS_MORE_STATE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void rollbackState(TestProcEnv env, final State state)
|
||||
throws InterruptedException {
|
||||
LOG.debug("rollback state " + state);
|
||||
executionInfo.add(new ExecutionInfo(env.nextTimestamp(), state, true));
|
||||
Thread.sleep(150);
|
||||
|
||||
if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
|
||||
LOG.debug("THROW INTERRUPT");
|
||||
throw new InterruptedException("test interrupt");
|
||||
}
|
||||
|
||||
switch (state) {
|
||||
case STATE_1:
|
||||
break;
|
||||
case STATE_2:
|
||||
break;
|
||||
case STATE_3:
|
||||
break;
|
||||
default:
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected State getState(final int stateId) {
|
||||
return State.values()[stateId];
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getStateId(final State state) {
|
||||
return state.ordinal();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected State getInitialState() {
|
||||
return State.STATE_1;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isYieldBeforeExecuteFromState(TestProcEnv env, State state) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean abort(TestProcEnv env) {
|
||||
aborted.set(true);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -80,7 +80,8 @@ public class AddColumnFamilyProcedure
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, final AddColumnFamilyState state) {
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, final AddColumnFamilyState state)
|
||||
throws InterruptedException {
|
||||
if (isTraceEnabled()) {
|
||||
LOG.trace(this + " execute state=" + state);
|
||||
}
|
||||
|
@ -109,7 +110,7 @@ public class AddColumnFamilyProcedure
|
|||
default:
|
||||
throw new UnsupportedOperationException(this + " unhandled state=" + state);
|
||||
}
|
||||
} catch (InterruptedException|IOException e) {
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Error trying to add the column family" + getColumnFamilyName() + " to the table "
|
||||
+ tableName + " (in state=" + state + ")", e);
|
||||
|
||||
|
|
|
@ -94,7 +94,8 @@ public class CreateTableProcedure
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, final CreateTableState state) {
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, final CreateTableState state)
|
||||
throws InterruptedException {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(this + " execute state=" + state);
|
||||
}
|
||||
|
@ -135,7 +136,7 @@ public class CreateTableProcedure
|
|||
default:
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
}
|
||||
} catch (InterruptedException|IOException e) {
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error trying to create table=" + getTableName() + " state=" + state, e);
|
||||
setFailure("master-create-table", e);
|
||||
}
|
||||
|
|
|
@ -81,7 +81,8 @@ public class DeleteColumnFamilyProcedure
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, DeleteColumnFamilyState state) {
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, DeleteColumnFamilyState state)
|
||||
throws InterruptedException {
|
||||
if (isTraceEnabled()) {
|
||||
LOG.trace(this + " execute state=" + state);
|
||||
}
|
||||
|
@ -114,7 +115,7 @@ public class DeleteColumnFamilyProcedure
|
|||
default:
|
||||
throw new UnsupportedOperationException(this + " unhandled state=" + state);
|
||||
}
|
||||
} catch (InterruptedException|IOException e) {
|
||||
} catch (IOException e) {
|
||||
if (!isRollbackSupported(state)) {
|
||||
// We reach a state that cannot be rolled back. We just need to keep retry.
|
||||
LOG.warn("Error trying to delete the column family " + getColumnFamilyName()
|
||||
|
|
|
@ -90,7 +90,8 @@ public class DeleteTableProcedure
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, DeleteTableState state) {
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, DeleteTableState state)
|
||||
throws InterruptedException {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(this + " execute state=" + state);
|
||||
}
|
||||
|
@ -146,9 +147,6 @@ public class DeleteTableProcedure
|
|||
}
|
||||
} catch (HBaseException|IOException e) {
|
||||
LOG.warn("Retriable error trying to delete table=" + getTableName() + " state=" + state, e);
|
||||
} catch (InterruptedException e) {
|
||||
// if the interrupt is real, the executor will be stopped.
|
||||
LOG.warn("Interrupted trying to delete table=" + getTableName() + " state=" + state, e);
|
||||
}
|
||||
return Flow.HAS_MORE_STATE;
|
||||
}
|
||||
|
|
|
@ -120,7 +120,8 @@ public class DisableTableProcedure
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, final DisableTableState state) {
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, final DisableTableState state)
|
||||
throws InterruptedException {
|
||||
if (isTraceEnabled()) {
|
||||
LOG.trace(this + " execute state=" + state);
|
||||
}
|
||||
|
@ -161,7 +162,7 @@ public class DisableTableProcedure
|
|||
default:
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
}
|
||||
} catch (InterruptedException|IOException e) {
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Retriable error trying to disable table=" + tableName + " state=" + state, e);
|
||||
}
|
||||
return Flow.HAS_MORE_STATE;
|
||||
|
|
|
@ -118,7 +118,8 @@ public class EnableTableProcedure
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, final EnableTableState state) {
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, final EnableTableState state)
|
||||
throws InterruptedException {
|
||||
if (isTraceEnabled()) {
|
||||
LOG.trace(this + " execute state=" + state);
|
||||
}
|
||||
|
@ -155,7 +156,7 @@ public class EnableTableProcedure
|
|||
default:
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
}
|
||||
} catch (InterruptedException|IOException e) {
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error trying to enable table=" + tableName + " state=" + state, e);
|
||||
setFailure("master-enable-table", e);
|
||||
}
|
||||
|
|
|
@ -78,7 +78,7 @@ public class ModifyColumnFamilyProcedure
|
|||
|
||||
@Override
|
||||
protected Flow executeFromState(final MasterProcedureEnv env,
|
||||
final ModifyColumnFamilyState state) {
|
||||
final ModifyColumnFamilyState state) throws InterruptedException {
|
||||
if (isTraceEnabled()) {
|
||||
LOG.trace(this + " execute state=" + state);
|
||||
}
|
||||
|
@ -107,7 +107,7 @@ public class ModifyColumnFamilyProcedure
|
|||
default:
|
||||
throw new UnsupportedOperationException(this + " unhandled state=" + state);
|
||||
}
|
||||
} catch (InterruptedException|IOException e) {
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Error trying to modify the column family " + getColumnFamilyName()
|
||||
+ " of the table " + tableName + "(in state=" + state + ")", e);
|
||||
|
||||
|
|
|
@ -87,7 +87,8 @@ public class ModifyTableProcedure
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, final ModifyTableState state) {
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, final ModifyTableState state)
|
||||
throws InterruptedException {
|
||||
if (isTraceEnabled()) {
|
||||
LOG.trace(this + " execute state=" + state);
|
||||
}
|
||||
|
@ -128,7 +129,7 @@ public class ModifyTableProcedure
|
|||
default:
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
}
|
||||
} catch (InterruptedException|IOException e) {
|
||||
} catch (IOException e) {
|
||||
if (!isRollbackSupported(state)) {
|
||||
// We reach a state that cannot be rolled back. We just need to keep retry.
|
||||
LOG.warn("Error trying to modify table=" + getTableName() + " state=" + state, e);
|
||||
|
|
|
@ -745,7 +745,7 @@ implements ServerProcedureInterface {
|
|||
* stuck waiting for regions to online so it can replay edits.
|
||||
*/
|
||||
@Override
|
||||
protected boolean isYieldAfterSuccessfulFlowStateStep() {
|
||||
protected boolean isYieldBeforeExecuteFromState(MasterProcedureEnv env, ServerCrashState state) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,7 +68,8 @@ public class TruncateTableProcedure
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, TruncateTableState state) {
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, TruncateTableState state)
|
||||
throws InterruptedException {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(this + " execute state=" + state);
|
||||
}
|
||||
|
@ -131,9 +132,6 @@ public class TruncateTableProcedure
|
|||
}
|
||||
} catch (HBaseException|IOException e) {
|
||||
LOG.warn("Retriable error trying to truncate table=" + getTableName() + " state=" + state, e);
|
||||
} catch (InterruptedException e) {
|
||||
// if the interrupt is real, the executor will be stopped.
|
||||
LOG.warn("Interrupted trying to truncate table=" + getTableName() + " state=" + state, e);
|
||||
}
|
||||
return Flow.HAS_MORE_STATE;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue