Move to Error step if ClusterState* steps throw (#35069)

Previously, if ClusterStateActionSteps or ClusterStateWaitSteps threw an
exception executing, the exception would only be caught and logged by
the generic ClusterStateUpdateTask machinery and the index would become
stuck on that step.

Now, exceptions thrown in these steps will be caught and the index will
be moved to the Error step.
This commit is contained in:
Gordon Brown 2018-10-30 13:33:32 -06:00 committed by GitHub
parent 18c72e86c5
commit 6ecb8ff344
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 58 additions and 2 deletions

View File

@ -92,7 +92,11 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
// move the cluster state to the next step // move the cluster state to the next step
logger.trace("[{}] performing cluster state action ({}) [{}], next: [{}]", logger.trace("[{}] performing cluster state action ({}) [{}], next: [{}]",
index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey()); index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey());
state = ((ClusterStateActionStep) currentStep).performAction(index, state); try {
state = ((ClusterStateActionStep) currentStep).performAction(index, state);
} catch (Exception exception) {
return moveToErrorStep(state, currentStep.getKey(), exception);
}
if (currentStep.getNextStepKey() == null) { if (currentStep.getNextStepKey() == null) {
return state; return state;
} else { } else {
@ -108,7 +112,12 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
// condition again // condition again
logger.trace("[{}] waiting for cluster state step condition ({}) [{}], next: [{}]", logger.trace("[{}] waiting for cluster state step condition ({}) [{}], next: [{}]",
index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey()); index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey());
ClusterStateWaitStep.Result result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, state); ClusterStateWaitStep.Result result;
try {
result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, state);
} catch (Exception exception) {
return moveToErrorStep(state, currentStep.getKey(), exception);
}
if (result.isComplete()) { if (result.isComplete()) {
logger.trace("[{}] cluster state step condition met successfully ({}) [{}], moving to next step {}", logger.trace("[{}] cluster state step condition met successfully ({}) [{}], moving to next step {}",
index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey()); index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey());
@ -172,4 +181,12 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
throw new ElasticsearchException( throw new ElasticsearchException(
"policy [" + policy + "] for index [" + index.getName() + "] failed on step [" + startStep.getKey() + "].", e); "policy [" + policy + "] for index [" + index.getName() + "] failed on step [" + startStep.getKey() + "].", e);
} }
private ClusterState moveToErrorStep(final ClusterState state, Step.StepKey currentStepKey, Exception cause) throws IOException {
logger.error("policy [{}] for index [{}] failed on cluster state step [{}]. Moving to ERROR step", policy, index.getName(),
currentStepKey);
MoveToErrorStepUpdateTask moveToErrorStepUpdateTask = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause,
nowSupplier);
return moveToErrorStepUpdateTask.execute(state);
}
} }

View File

@ -22,6 +22,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.indexlifecycle.ErrorStep;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState;
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata; import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
@ -253,6 +254,44 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase {
assertSame(expectedException, exception.getCause()); assertSame(expectedException, exception.getCause());
} }
public void testClusterActionStepThrowsException() throws IOException {
RuntimeException thrownException = new RuntimeException("error");
firstStep.setException(thrownException);
setStateToKey(firstStepKey);
Step startStep = policyStepsRegistry.getStep(indexMetaData, firstStepKey);
long now = randomNonNegativeLong();
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now);
ClusterState newState = task.execute(clusterState);
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(newState.getMetaData().index(index));
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState);
assertThat(currentStepKey, equalTo(new StepKey(firstStepKey.getPhase(), firstStepKey.getAction(), ErrorStep.NAME)));
assertThat(firstStep.getExecuteCount(), equalTo(1L));
assertThat(secondStep.getExecuteCount(), equalTo(0L));
assertThat(task.getNextStepKey(), equalTo(secondStep.getKey()));
assertThat(lifecycleState.getPhaseTime(), nullValue());
assertThat(lifecycleState.getActionTime(), nullValue());
assertThat(lifecycleState.getStepInfo(), equalTo("{\"type\":\"runtime_exception\",\"reason\":\"error\"}"));
}
public void testClusterWaitStepThrowsException() throws IOException {
RuntimeException thrownException = new RuntimeException("error");
secondStep.setException(thrownException);
setStateToKey(firstStepKey);
Step startStep = policyStepsRegistry.getStep(indexMetaData, firstStepKey);
long now = randomNonNegativeLong();
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now);
ClusterState newState = task.execute(clusterState);
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(newState.getMetaData().index(index));
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState);
assertThat(currentStepKey, equalTo(new StepKey(firstStepKey.getPhase(), firstStepKey.getAction(), ErrorStep.NAME)));
assertThat(firstStep.getExecuteCount(), equalTo(1L));
assertThat(secondStep.getExecuteCount(), equalTo(1L));
assertThat(task.getNextStepKey(), equalTo(thirdStepKey));
assertThat(lifecycleState.getPhaseTime(), nullValue());
assertThat(lifecycleState.getActionTime(), nullValue());
assertThat(lifecycleState.getStepInfo(), equalTo("{\"type\":\"runtime_exception\",\"reason\":\"error\"}"));
}
private void setStateToKey(StepKey stepKey) throws IOException { private void setStateToKey(StepKey stepKey) throws IOException {
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder( LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder(
LifecycleExecutionState.fromIndexMetadata(clusterState.getMetaData().index(index))); LifecycleExecutionState.fromIndexMetadata(clusterState.getMetaData().index(index)));