Throws ElasticsearchException from cluster state tasks if something
really bad happens If the `onFailure()` method is called on the cluster state task then something bad happened that we can't really deal with so this change throws an ElasticsearchException in that case and the step will be re-executed when the policy is next triggered. If we can't submit a cluster state task then we can't move to the error state so there isn't really anything else we can do here. If the cluster state task fails like this there are probably bigger issues witht he cluster anyway. x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde xlifecycle/ExecuteStepsUpdateTask.java x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde xlifecycle/MoveToErrorStepUpdateTask.java x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde xlifecycle/MoveToNextStepUpdateTask.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/ClusterStateUpdateStepTests.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/ExecuteStepsUpdateTaskTests.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/MoveToErrorStepUpdateTaskTests.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/MoveToNextStepUpdateTaskTests.java
This commit is contained in:
parent
61d88a8fdd
commit
e9dc190479
|
@ -6,6 +6,7 @@
|
|||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
|
@ -98,6 +99,7 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
|
|||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
throw new RuntimeException(e); // NORELEASE implement error handling
|
||||
throw new ElasticsearchException(
|
||||
"policy [" + policy + "] for index [" + index.getName() + "] failed on step [" + startStep.getKey() + "].", e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -56,6 +57,7 @@ public class MoveToErrorStepUpdateTask extends ClusterStateUpdateTask {
|
|||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
throw new RuntimeException(e); // NORELEASE implement error handling
|
||||
throw new ElasticsearchException("policy [" + policy + "] for index [" + index.getName()
|
||||
+ "] failed trying to move from step [" + currentStepKey + "] to the ERROR step.", e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -74,7 +75,8 @@ public class MoveToNextStepUpdateTask extends ClusterStateUpdateTask {
|
|||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
throw new RuntimeException(e); // NORELEASE implement error handling
|
||||
throw new ElasticsearchException("policy [" + policy + "] for index [" + index.getName() + "] failed trying to move from step ["
|
||||
+ currentStepKey + "] to step [" + nextStepKey + "].", e);
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
|
|
|
@ -1,15 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
public class ClusterStateUpdateStepTests extends ESTestCase {
|
||||
|
||||
public void test() {
|
||||
|
||||
}
|
||||
}
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -167,6 +168,19 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase {
|
|||
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(-1L));
|
||||
}
|
||||
|
||||
public void testOnFailure() {
|
||||
setStateToKey(secondStepKey);
|
||||
Step startStep = policyStepsRegistry.getStep(mixedPolicyName, secondStepKey);
|
||||
long now = randomNonNegativeLong();
|
||||
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now);
|
||||
Exception expectedException = new RuntimeException();
|
||||
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
|
||||
() -> task.onFailure(randomAlphaOfLength(10), expectedException));
|
||||
assertEquals("policy [" + mixedPolicyName + "] for index [" + index.getName() + "] failed on step [" + startStep.getKey() + "].",
|
||||
exception.getMessage());
|
||||
assertSame(expectedException, exception.getCause());
|
||||
}
|
||||
|
||||
private void setStateToKey(StepKey stepKey) {
|
||||
clusterState = ClusterState.builder(clusterState)
|
||||
.metaData(MetaData.builder(clusterState.metaData())
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -79,6 +80,21 @@ public class MoveToErrorStepUpdateTaskTests extends ESTestCase {
|
|||
assertThat(newState, sameInstance(clusterState));
|
||||
}
|
||||
|
||||
public void testOnFailure() {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
long now = randomNonNegativeLong();
|
||||
|
||||
setStateToKey(currentStepKey);
|
||||
|
||||
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, () -> now);
|
||||
Exception expectedException = new RuntimeException();
|
||||
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
|
||||
() -> task.onFailure(randomAlphaOfLength(10), expectedException));
|
||||
assertEquals("policy [" + policy + "] for index [" + index.getName() + "] failed trying to move from step [" + currentStepKey
|
||||
+ "] to the ERROR step.", exception.getMessage());
|
||||
assertSame(expectedException, exception.getCause());
|
||||
}
|
||||
|
||||
private void setStatePolicy(String policy) {
|
||||
clusterState = ClusterState.builder(clusterState)
|
||||
.metaData(MetaData.builder(clusterState.metaData())
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -96,6 +97,24 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase {
|
|||
assertNull(changed.get());
|
||||
}
|
||||
|
||||
public void testOnFailure() {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
StepKey nextStepKey = new StepKey("next-phase", "next-action", "next-name");
|
||||
long now = randomNonNegativeLong();
|
||||
|
||||
setStateToKey(currentStepKey);
|
||||
|
||||
SetOnce<Boolean> changed = new SetOnce<>();
|
||||
MoveToNextStepUpdateTask.Listener listener = (c) -> changed.set(true);
|
||||
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, () -> now, listener);
|
||||
Exception expectedException = new RuntimeException();
|
||||
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
|
||||
() -> task.onFailure(randomAlphaOfLength(10), expectedException));
|
||||
assertEquals("policy [" + policy + "] for index [" + index.getName() + "] failed trying to move from step [" + currentStepKey
|
||||
+ "] to step [" + nextStepKey + "].", exception.getMessage());
|
||||
assertSame(expectedException, exception.getCause());
|
||||
}
|
||||
|
||||
private void setStatePolicy(String policy) {
|
||||
clusterState = ClusterState.builder(clusterState)
|
||||
.metaData(MetaData.builder(clusterState.metaData())
|
||||
|
|
Loading…
Reference in New Issue