Adds infrastructure for dealing with errors in step execution (#30341)
* Adds infrastructure for dealing with errors in step execution This change adds a new ErrorStep which a step can move to if it encounters an error it cannot automatically recover from by retrying on the next execution. The error step is special in that it cannot complete. The intention is that the user will need to call an API to man ually mocve the step in order to progress the index's lifecycle. The error step retains the phase and action names of the step before it but with the step name set to `ERROR`. For this reason no ordinary step can have this name. `AbstractStepTestCase.testStepNameNotError()` ensures that no step uses `ERROR` as the step name for either its stepKey or its nextStepKey. The new `index.lifecycle.failed_step` setting is used to store the name of the failed step so the user can know in which step the error occured. More error information will be added shortly. The async steps will now move to the error step if listener.onFailure() is called. x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/LifecycleSettings.java x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde xlifecycle/ErrorStep.java x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde xlifecycle/IndexLifecycleRunner.java x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde xlifecycle/MoveToErrorStepUpdateTask.java x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde xlifecycle/PolicyStepsRegistry.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/ErrorStepTests.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/IndexLifecycleRunnerTests.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/MoveToErrorStepUpdateTaskTests.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/MoveToNextStepUpdateTaskTests.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/PolicyStepsRegistryTests.java x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/ErrorStep.java x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/LifecycleSettings.java x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifec ycle/AbstractStepTestCase.java x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifec ycle/ErrorStepTests.java x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde xlifecycle/IndexLifecycleRunner.java x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde xlifecycle/MoveToErrorStepUpdateTask.java x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde xlifecycle/PolicyStepsRegistry.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/IndexLifecycleRunnerTests.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/MoveToErrorStepUpdateTaskTests.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/MoveToNextStepUpdateTaskTests.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/PolicyStepsRegistryTests.java x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/ErrorStep.java x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/LifecycleSettings.java x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifec ycle/AbstractStepTestCase.java x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifec ycle/ErrorStepTests.java x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde xlifecycle/IndexLifecycleRunner.java x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde xlifecycle/MoveToErrorStepUpdateTask.java x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde xlifecycle/PolicyStepsRegistry.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/IndexLifecycleRunnerTests.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/MoveToErrorStepUpdateTaskTests.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/MoveToNextStepUpdateTaskTests.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/PolicyStepsRegistryTests.java * Addresses review comments x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/ErrorStep.java x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifec ycle/ErrorStepTests.java x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde xlifecycle/IndexLifecycleRunner.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/IndexLifecycleRunnerTests.java
This commit is contained in:
parent
6546535a8b
commit
b5ae7a1aeb
|
@ -0,0 +1,17 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
public class ErrorStep extends Step {
|
||||
public static final String NAME = "ERROR";
|
||||
|
||||
public ErrorStep(StepKey key) {
|
||||
super(key, key);
|
||||
if (NAME.equals(key.getName()) == false) {
|
||||
throw new IllegalArgumentException("An error step must have a step key whose step name is " + NAME);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,6 +21,7 @@ public class LifecycleSettings {
|
|||
public static final String LIFECYCLE_PHASE_TIME = "index.lifecycle.phase_time";
|
||||
public static final String LIFECYCLE_ACTION_TIME = "index.lifecycle.action_time";
|
||||
public static final String LIFECYCLE_STEP_TIME = "index.lifecycle.step_time";
|
||||
public static final String LIFECYCLE_FAILED_STEP = "index.lifecycle.failed_step";
|
||||
|
||||
// NORELEASE: we should probably change the default to something other than three seconds for initial release
|
||||
public static final Setting<TimeValue> LIFECYCLE_POLL_INTERVAL_SETTING = Setting.positiveTimeSetting(LIFECYCLE_POLL_INTERVAL,
|
||||
|
@ -33,6 +34,8 @@ public class LifecycleSettings {
|
|||
Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
public static final Setting<String> LIFECYCLE_STEP_SETTING = Setting.simpleString(LIFECYCLE_STEP,
|
||||
Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
public static final Setting<String> LIFECYCLE_FAILED_STEP_SETTING = Setting.simpleString(LIFECYCLE_FAILED_STEP,
|
||||
Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
public static final Setting<Long> LIFECYCLE_INDEX_CREATION_DATE_SETTING = Setting.longSetting(LIFECYCLE_INDEX_CREATION_DATE,
|
||||
-1L, -1L, Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
public static final Setting<Long> LIFECYCLE_PHASE_TIME_SETTING = Setting.longSetting(LIFECYCLE_PHASE_TIME,
|
||||
|
|
|
@ -26,4 +26,12 @@ public abstract class AbstractStepTestCase<T extends Step> extends ESTestCase {
|
|||
public static StepKey randomStepKey() {
|
||||
return new StepKey(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10));
|
||||
}
|
||||
|
||||
public void testStepNameNotError() {
|
||||
T instance = createRandomInstance();
|
||||
StepKey stepKey = instance.getKey();
|
||||
assertFalse(ErrorStep.NAME.equals(stepKey.getName()));
|
||||
StepKey nextStepKey = instance.getKey();
|
||||
assertFalse(ErrorStep.NAME.equals(nextStepKey.getName()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||
|
||||
public class ErrorStepTests extends AbstractStepTestCase<ErrorStep> {
|
||||
|
||||
@Override
|
||||
public ErrorStep createRandomInstance() {
|
||||
StepKey stepKey = new StepKey(randomAlphaOfLength(10), randomAlphaOfLength(10), ErrorStep.NAME);
|
||||
return new ErrorStep(stepKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ErrorStep mutateInstance(ErrorStep instance) {
|
||||
StepKey key = instance.getKey();
|
||||
assertSame(instance.getNextStepKey(), instance.getKey());
|
||||
|
||||
key = new StepKey(key.getPhase(), key.getAction() + randomAlphaOfLength(5), key.getName());
|
||||
|
||||
return new ErrorStep(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ErrorStep copyInstance(ErrorStep instance) {
|
||||
assertSame(instance.getNextStepKey(), instance.getKey());
|
||||
return new ErrorStep(instance.getKey());
|
||||
}
|
||||
|
||||
public void testInvalidStepKey() {
|
||||
StepKey invalidKey = new StepKey(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10));
|
||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> new ErrorStep(invalidKey));
|
||||
assertEquals("An error step must have a step key whose step name is " + ErrorStep.NAME, exception.getMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testStepNameNotError() {
|
||||
// Need to override this test because this is the one special step that
|
||||
// is allowed to have ERROR as the step name
|
||||
}
|
||||
|
||||
}
|
|
@ -13,11 +13,11 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.Settings.Builder;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.AsyncActionStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.AsyncWaitStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ErrorStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.InitializePolicyContextStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
||||
|
@ -45,6 +45,9 @@ public class IndexLifecycleRunner {
|
|||
logger.warn("running policy with current-step[" + currentStep.getKey() + "]");
|
||||
if (currentStep instanceof TerminalPolicyStep) {
|
||||
logger.debug("policy [" + policy + "] for index [" + indexMetaData.getIndex().getName() + "] complete, skipping execution");
|
||||
} else if (currentStep instanceof ErrorStep) {
|
||||
logger.debug(
|
||||
"policy [" + policy + "] for index [" + indexMetaData.getIndex().getName() + "] on an error step, skipping execution");
|
||||
} else if (currentStep instanceof InitializePolicyContextStep || currentStep instanceof ClusterStateWaitStep) {
|
||||
executeClusterStateSteps(indexMetaData.getIndex(), policy, currentStep);
|
||||
} else if (currentStep instanceof AsyncWaitStep) {
|
||||
|
@ -61,7 +64,7 @@ public class IndexLifecycleRunner {
|
|||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new RuntimeException(e); // NORELEASE implement error handling
|
||||
moveToErrorStep(indexMetaData.getIndex(), policy, currentStep.getKey(), e);
|
||||
}
|
||||
|
||||
});
|
||||
|
@ -80,7 +83,7 @@ public class IndexLifecycleRunner {
|
|||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new RuntimeException(e); // NORELEASE implement error handling
|
||||
moveToErrorStep(indexMetaData.getIndex(), policy, currentStep.getKey(), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -136,20 +139,42 @@ public class IndexLifecycleRunner {
|
|||
|
||||
static ClusterState moveClusterStateToNextStep(Index index, ClusterState clusterState, StepKey currentStep, StepKey nextStep,
|
||||
LongSupplier nowSupplier) {
|
||||
ClusterState.Builder newClusterStateBuilder = ClusterState.builder(clusterState);
|
||||
IndexMetaData idxMeta = clusterState.getMetaData().index(index);
|
||||
Builder indexSettings = Settings.builder().put(idxMeta.getSettings()).put(LifecycleSettings.LIFECYCLE_PHASE, nextStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, nextStep.getAction()).put(LifecycleSettings.LIFECYCLE_STEP, nextStep.getName());
|
||||
Settings.Builder indexSettings = moveIndexSettingsToNextStep(idxMeta.getSettings(), currentStep, nextStep, nowSupplier);
|
||||
ClusterState.Builder newClusterStateBuilder = newClusterStateWithIndexSettings(index, clusterState, indexSettings);
|
||||
return newClusterStateBuilder.build();
|
||||
}
|
||||
|
||||
static ClusterState moveClusterStateToErrorStep(Index index, ClusterState clusterState, StepKey currentStep, LongSupplier nowSupplier) {
|
||||
IndexMetaData idxMeta = clusterState.getMetaData().index(index);
|
||||
Settings.Builder indexSettings = moveIndexSettingsToNextStep(idxMeta.getSettings(), currentStep,
|
||||
new StepKey(currentStep.getPhase(), currentStep.getAction(), ErrorStep.NAME), nowSupplier)
|
||||
.put(LifecycleSettings.LIFECYCLE_FAILED_STEP, currentStep.getName());
|
||||
ClusterState.Builder newClusterStateBuilder = newClusterStateWithIndexSettings(index, clusterState, indexSettings);
|
||||
return newClusterStateBuilder.build();
|
||||
}
|
||||
|
||||
private static Settings.Builder moveIndexSettingsToNextStep(Settings existingSettings, StepKey currentStep, StepKey nextStep,
|
||||
LongSupplier nowSupplier) {
|
||||
long nowAsMillis = nowSupplier.getAsLong();
|
||||
Settings.Builder newSettings = Settings.builder().put(existingSettings).put(LifecycleSettings.LIFECYCLE_PHASE, nextStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, nextStep.getAction()).put(LifecycleSettings.LIFECYCLE_STEP, nextStep.getName())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP_TIME, nowAsMillis);
|
||||
if (currentStep.getPhase().equals(nextStep.getPhase()) == false) {
|
||||
indexSettings.put(LifecycleSettings.LIFECYCLE_PHASE_TIME, nowSupplier.getAsLong());
|
||||
newSettings.put(LifecycleSettings.LIFECYCLE_PHASE_TIME, nowAsMillis);
|
||||
}
|
||||
if (currentStep.getAction().equals(nextStep.getAction()) == false) {
|
||||
indexSettings.put(LifecycleSettings.LIFECYCLE_ACTION_TIME, nowSupplier.getAsLong());
|
||||
newSettings.put(LifecycleSettings.LIFECYCLE_ACTION_TIME, nowAsMillis);
|
||||
}
|
||||
newClusterStateBuilder.metaData(MetaData.builder(clusterState.getMetaData()).put(IndexMetaData
|
||||
.builder(clusterState.getMetaData().index(index))
|
||||
.settings(indexSettings)));
|
||||
return newClusterStateBuilder.build();
|
||||
return newSettings;
|
||||
}
|
||||
|
||||
private static ClusterState.Builder newClusterStateWithIndexSettings(Index index, ClusterState clusterState,
|
||||
Settings.Builder newSettings) {
|
||||
ClusterState.Builder newClusterStateBuilder = ClusterState.builder(clusterState);
|
||||
newClusterStateBuilder.metaData(MetaData.builder(clusterState.getMetaData())
|
||||
.put(IndexMetaData.builder(clusterState.getMetaData().index(index)).settings(newSettings)));
|
||||
return newClusterStateBuilder;
|
||||
}
|
||||
|
||||
private void moveToStep(Index index, String policy, StepKey currentStepKey, StepKey nextStepKey) {
|
||||
|
@ -158,4 +183,10 @@ public class IndexLifecycleRunner {
|
|||
clusterService.submitStateUpdateTask("ILM", new MoveToNextStepUpdateTask(index, policy, currentStepKey,
|
||||
nextStepKey, nowSupplier, newState -> runPolicy(newState.getMetaData().index(index), newState)));
|
||||
}
|
||||
|
||||
private void moveToErrorStep(Index index, String policy, StepKey currentStepKey, Exception e) {
|
||||
logger.error("policy [" + policy + "] for index [" + index.getName() + "] failed on step [" + currentStepKey
|
||||
+ "]. Moving to ERROR step.", e);
|
||||
clusterService.submitStateUpdateTask("ILM", new MoveToErrorStepUpdateTask(index, policy, currentStepKey, nowSupplier));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
||||
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
public class MoveToErrorStepUpdateTask extends ClusterStateUpdateTask {
|
||||
private final Index index;
|
||||
private final String policy;
|
||||
private final Step.StepKey currentStepKey;
|
||||
private LongSupplier nowSupplier;
|
||||
|
||||
public MoveToErrorStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey,
|
||||
LongSupplier nowSupplier) {
|
||||
this.index = index;
|
||||
this.policy = policy;
|
||||
this.currentStepKey = currentStepKey;
|
||||
this.nowSupplier = nowSupplier;
|
||||
}
|
||||
|
||||
Index getIndex() {
|
||||
return index;
|
||||
}
|
||||
|
||||
String getPolicy() {
|
||||
return policy;
|
||||
}
|
||||
|
||||
Step.StepKey getCurrentStepKey() {
|
||||
return currentStepKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
Settings indexSettings = currentState.getMetaData().index(index).getSettings();
|
||||
if (policy.equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings))
|
||||
&& currentStepKey.equals(IndexLifecycleRunner.getCurrentStepKey(indexSettings))) {
|
||||
return IndexLifecycleRunner.moveClusterStateToErrorStep(index, currentState, currentStepKey, nowSupplier);
|
||||
} else {
|
||||
// either the policy has changed or the step is now
|
||||
// not the same as when we submitted the update task. In
|
||||
// either case we don't want to do anything now
|
||||
return currentState;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
throw new RuntimeException(e); // NORELEASE implement error handling
|
||||
}
|
||||
}
|
|
@ -9,6 +9,7 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.Diff;
|
||||
import org.elasticsearch.cluster.DiffableUtils;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ErrorStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
||||
|
@ -69,6 +70,7 @@ public class PolicyStepsRegistry {
|
|||
stepMap.put(policy.getName(), new HashMap<>());
|
||||
Map<Step.StepKey, Step> stepMapForPolicy = stepMap.get(policy.getName());
|
||||
for (Step step : policyAsSteps) {
|
||||
assert ErrorStep.NAME.equals(step.getKey().getName()) == false;
|
||||
stepMapForPolicy.put(step.getKey(), step);
|
||||
}
|
||||
}
|
||||
|
@ -92,6 +94,9 @@ public class PolicyStepsRegistry {
|
|||
* @return step
|
||||
*/
|
||||
public Step getStep(String policy, Step.StepKey stepKey) {
|
||||
if (ErrorStep.NAME.equals(stepKey.getName())) {
|
||||
return new ErrorStep(new Step.StepKey(stepKey.getPhase(), stepKey.getAction(), ErrorStep.NAME));
|
||||
}
|
||||
Map<Step.StepKey, Step> steps = stepMap.get(policy);
|
||||
if (steps == null) {
|
||||
throw new IllegalStateException("policy [" + policy + "] does not exist");
|
||||
|
|
|
@ -17,6 +17,7 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.elasticsearch.xpack.core.indexlifecycle.AsyncActionStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.AsyncWaitStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ErrorStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.InitializePolicyContextStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
||||
|
@ -59,7 +60,25 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
Mockito.verifyZeroInteractions(clusterService);
|
||||
}
|
||||
|
||||
public void testRunPolicyClusterStateActionStep() {
|
||||
public void testRunPolicyErrorStep() {
|
||||
String policyName = "async_action_policy";
|
||||
StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step");
|
||||
MockClusterStateWaitStep step = new MockClusterStateWaitStep(stepKey, null);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)
|
||||
.put(LifecycleSettings.LIFECYCLE_PHASE, stepKey.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, stepKey.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, ErrorStep.NAME))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
runner.runPolicy(policyName, indexMetaData, null, false);
|
||||
|
||||
Mockito.verifyZeroInteractions(clusterService);
|
||||
}
|
||||
|
||||
public void testRunPolicyInitializePolicyContextStep() {
|
||||
String policyName = "cluster_state_action_policy";
|
||||
StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step");
|
||||
MockInitializePolicyContextStep step = new MockInitializePolicyContextStep(stepKey, null);
|
||||
|
@ -158,14 +177,14 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
RuntimeException exception = expectThrows(RuntimeException.class,
|
||||
() -> runner.runPolicy(policyName, indexMetaData, null, false));
|
||||
runner.runPolicy(policyName, indexMetaData, null, false);
|
||||
|
||||
assertSame(expectedException, exception.getCause());
|
||||
assertEquals(1, step.getExecuteCount());
|
||||
Mockito.verifyZeroInteractions(clusterService);
|
||||
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
|
||||
Mockito.argThat(new MoveToErrorStepUpdateTaskMatcher(indexMetaData.getIndex(), policyName, stepKey)));
|
||||
Mockito.verifyNoMoreInteractions(clusterService);
|
||||
}
|
||||
|
||||
public void testRunPolicyAsyncActionStepClusterStateChangeIgnored() {
|
||||
|
@ -234,12 +253,12 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
RuntimeException exception = expectThrows(RuntimeException.class,
|
||||
() -> runner.runPolicy(policyName, indexMetaData, null, false));
|
||||
runner.runPolicy(policyName, indexMetaData, null, false);
|
||||
|
||||
assertSame(expectedException, exception.getCause());
|
||||
assertEquals(1, step.getExecuteCount());
|
||||
Mockito.verifyZeroInteractions(clusterService);
|
||||
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
|
||||
Mockito.argThat(new MoveToErrorStepUpdateTaskMatcher(indexMetaData.getIndex(), policyName, stepKey)));
|
||||
Mockito.verifyNoMoreInteractions(clusterService);
|
||||
}
|
||||
|
||||
public void testRunPolicyAsyncWaitStepClusterStateChangeIgnored() {
|
||||
|
@ -510,6 +529,20 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
||||
}
|
||||
|
||||
public void testMoveClusterStateToErrorStep() {
|
||||
String indexName = "my_index";
|
||||
StepKey currentStep = new StepKey("current_phase", "current_action", "current_step");
|
||||
long now = randomNonNegativeLong();
|
||||
|
||||
ClusterState clusterState = buildClusterState(indexName,
|
||||
Settings.builder().put(LifecycleSettings.LIFECYCLE_PHASE, currentStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, currentStep.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, currentStep.getName()));
|
||||
Index index = clusterState.metaData().index(indexName).getIndex();
|
||||
ClusterState newClusterState = IndexLifecycleRunner.moveClusterStateToErrorStep(index, clusterState, currentStep, () -> now);
|
||||
assertClusterStateOnErrorStep(clusterState, index, currentStep, newClusterState, now);
|
||||
}
|
||||
|
||||
private ClusterState buildClusterState(String indexName, Settings.Builder indexSettingsBuilder) {
|
||||
Settings indexSettings = indexSettingsBuilder.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||
|
@ -543,6 +576,28 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
} else {
|
||||
assertEquals(now, (long) LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newIndexSettings));
|
||||
}
|
||||
assertEquals(now, (long) LifecycleSettings.LIFECYCLE_STEP_TIME_SETTING.get(newIndexSettings));
|
||||
assertFalse(LifecycleSettings.LIFECYCLE_FAILED_STEP_SETTING.exists(newIndexSettings));
|
||||
}
|
||||
|
||||
private void assertClusterStateOnErrorStep(ClusterState oldClusterState, Index index, StepKey currentStep, ClusterState newClusterState,
|
||||
long now) {
|
||||
assertNotSame(oldClusterState, newClusterState);
|
||||
MetaData newMetadata = newClusterState.metaData();
|
||||
assertNotSame(oldClusterState.metaData(), newMetadata);
|
||||
IndexMetaData newIndexMetadata = newMetadata.getIndexSafe(index);
|
||||
assertNotSame(oldClusterState.metaData().index(index), newIndexMetadata);
|
||||
Settings newIndexSettings = newIndexMetadata.getSettings();
|
||||
assertNotSame(oldClusterState.metaData().index(index).getSettings(), newIndexSettings);
|
||||
assertEquals(currentStep.getPhase(), LifecycleSettings.LIFECYCLE_PHASE_SETTING.get(newIndexSettings));
|
||||
assertEquals(currentStep.getAction(), LifecycleSettings.LIFECYCLE_ACTION_SETTING.get(newIndexSettings));
|
||||
assertEquals(ErrorStep.NAME, LifecycleSettings.LIFECYCLE_STEP_SETTING.get(newIndexSettings));
|
||||
assertEquals(currentStep.getName(), LifecycleSettings.LIFECYCLE_FAILED_STEP_SETTING.get(newIndexSettings));
|
||||
assertEquals(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(oldClusterState.metaData().index(index).getSettings()),
|
||||
LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newIndexSettings));
|
||||
assertEquals(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(oldClusterState.metaData().index(index).getSettings()),
|
||||
LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newIndexSettings));
|
||||
assertEquals(now, (long) LifecycleSettings.LIFECYCLE_STEP_TIME_SETTING.get(newIndexSettings));
|
||||
}
|
||||
|
||||
private static class MockAsyncActionStep extends AsyncActionStep {
|
||||
|
@ -711,6 +766,30 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
|
||||
}
|
||||
|
||||
private static class MoveToErrorStepUpdateTaskMatcher extends ArgumentMatcher<MoveToErrorStepUpdateTask> {
|
||||
|
||||
private Index index;
|
||||
private String policy;
|
||||
private StepKey currentStepKey;
|
||||
|
||||
MoveToErrorStepUpdateTaskMatcher(Index index, String policy, StepKey currentStepKey) {
|
||||
this.index = index;
|
||||
this.policy = policy;
|
||||
this.currentStepKey = currentStepKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean matches(Object argument) {
|
||||
if (argument == null || argument instanceof MoveToErrorStepUpdateTask == false) {
|
||||
return false;
|
||||
}
|
||||
MoveToErrorStepUpdateTask task = (MoveToErrorStepUpdateTask) argument;
|
||||
return Objects.equals(index, task.getIndex()) && Objects.equals(policy, task.getPolicy())
|
||||
&& Objects.equals(currentStepKey, task.getCurrentStepKey());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class ExecuteStepsUpdateTaskMatcher extends ArgumentMatcher<ExecuteStepsUpdateTask> {
|
||||
|
||||
private Index index;
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ErrorStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||
import org.junit.Before;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
public class MoveToErrorStepUpdateTaskTests extends ESTestCase {
|
||||
|
||||
String policy;
|
||||
ClusterState clusterState;
|
||||
Index index;
|
||||
|
||||
@Before
|
||||
public void setupClusterState() {
|
||||
policy = randomAlphaOfLength(10);
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder(randomAlphaOfLength(5))
|
||||
.settings(settings(Version.CURRENT)
|
||||
.put(LifecycleSettings.LIFECYCLE_NAME, policy))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
index = indexMetadata.getIndex();
|
||||
MetaData metaData = MetaData.builder()
|
||||
.persistentSettings(settings(Version.CURRENT).build())
|
||||
.put(IndexMetaData.builder(indexMetadata))
|
||||
.build();
|
||||
clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build();
|
||||
}
|
||||
|
||||
public void testExecuteSuccessfullyMoved() {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
long now = randomNonNegativeLong();
|
||||
|
||||
setStateToKey(currentStepKey);
|
||||
|
||||
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, () -> now);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
StepKey actualKey = IndexLifecycleRunner.getCurrentStepKey(newState.metaData().index(index).getSettings());
|
||||
assertThat(actualKey, equalTo(new StepKey(currentStepKey.getPhase(), currentStepKey.getAction(), ErrorStep.NAME)));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_FAILED_STEP_SETTING.get(newState.metaData().index(index).getSettings()),
|
||||
equalTo(currentStepKey.getName()));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(-1L));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(-1L));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_STEP_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
|
||||
}
|
||||
|
||||
public void testExecuteNoopDifferentStep() {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
StepKey notCurrentStepKey = new StepKey("not-current", "not-current", "not-current");
|
||||
long now = randomNonNegativeLong();
|
||||
setStateToKey(notCurrentStepKey);
|
||||
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, () -> now);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
assertThat(newState, sameInstance(clusterState));
|
||||
}
|
||||
|
||||
public void testExecuteNoopDifferentPolicy() {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
long now = randomNonNegativeLong();
|
||||
setStateToKey(currentStepKey);
|
||||
setStatePolicy("not-" + policy);
|
||||
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, () -> now);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
assertThat(newState, sameInstance(clusterState));
|
||||
}
|
||||
|
||||
private void setStatePolicy(String policy) {
|
||||
clusterState = ClusterState.builder(clusterState)
|
||||
.metaData(MetaData.builder(clusterState.metaData())
|
||||
.updateSettings(Settings.builder()
|
||||
.put(LifecycleSettings.LIFECYCLE_NAME, policy).build(), index.getName())).build();
|
||||
|
||||
}
|
||||
private void setStateToKey(StepKey stepKey) {
|
||||
clusterState = ClusterState.builder(clusterState)
|
||||
.metaData(MetaData.builder(clusterState.metaData())
|
||||
.updateSettings(Settings.builder()
|
||||
.put(LifecycleSettings.LIFECYCLE_PHASE, stepKey.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, stepKey.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, stepKey.getName()).build(), index.getName())).build();
|
||||
}
|
||||
}
|
|
@ -57,20 +57,28 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase {
|
|||
assertThat(actualKey, equalTo(nextStepKey));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_STEP_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
|
||||
task.clusterStateProcessed("source", clusterState, newState);
|
||||
assertTrue(changed.get());
|
||||
}
|
||||
|
||||
public void testExecuteNoop() {
|
||||
public void testExecuteNoopifferentStep() {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
StepKey notCurrentStepKey = new StepKey("not-current", "not-current", "not-current");
|
||||
long now = randomNonNegativeLong();
|
||||
setStateToKey(notCurrentStepKey);
|
||||
MoveToNextStepUpdateTask.Listener listener = (c) -> {
|
||||
};
|
||||
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now, listener);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
assertThat(newState, sameInstance(clusterState));
|
||||
}
|
||||
|
||||
public void testExecuteNoopDifferentPolicy() {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
long now = randomNonNegativeLong();
|
||||
setStateToKey(currentStepKey);
|
||||
if (randomBoolean()) {
|
||||
setStateToKey(notCurrentStepKey);
|
||||
} else {
|
||||
setStatePolicy("not-" + policy);
|
||||
}
|
||||
setStatePolicy("not-" + policy);
|
||||
MoveToNextStepUpdateTask.Listener listener = (c) -> {};
|
||||
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now, listener);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
|
|
|
@ -14,6 +14,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ErrorStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyTests;
|
||||
|
@ -25,6 +26,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
public class PolicyStepsRegistryTests extends ESTestCase {
|
||||
private static final Step.StepKey MOCK_STEP_KEY = new Step.StepKey("mock", "mock", "mock");
|
||||
|
@ -35,7 +37,7 @@ public class PolicyStepsRegistryTests extends ESTestCase {
|
|||
Map<String, Step> firstStepMap = Collections.singletonMap(policyName, expectedFirstStep);
|
||||
PolicyStepsRegistry registry = new PolicyStepsRegistry(null, firstStepMap, null);
|
||||
Step actualFirstStep = registry.getFirstStep(policyName);
|
||||
assertThat(actualFirstStep, equalTo(expectedFirstStep));
|
||||
assertThat(actualFirstStep, sameInstance(expectedFirstStep));
|
||||
}
|
||||
|
||||
public void testGetFirstStepUnknownPolicy() {
|
||||
|
@ -54,6 +56,17 @@ public class PolicyStepsRegistryTests extends ESTestCase {
|
|||
Collections.singletonMap(policyName, Collections.singletonMap(MOCK_STEP_KEY, expectedStep));
|
||||
PolicyStepsRegistry registry = new PolicyStepsRegistry(null, null, stepMap);
|
||||
Step actualStep = registry.getStep(policyName, MOCK_STEP_KEY);
|
||||
assertThat(actualStep, sameInstance(expectedStep));
|
||||
}
|
||||
|
||||
public void testGetStepErrorStep() {
|
||||
String policyName = randomAlphaOfLengthBetween(2, 10);
|
||||
Step.StepKey errorStepKey = new Step.StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), ErrorStep.NAME);
|
||||
Step expectedStep = new ErrorStep(errorStepKey);
|
||||
Map<String, Map<Step.StepKey, Step>> stepMap = Collections.singletonMap(policyName,
|
||||
Collections.singletonMap(MOCK_STEP_KEY, expectedStep));
|
||||
PolicyStepsRegistry registry = new PolicyStepsRegistry(null, null, stepMap);
|
||||
Step actualStep = registry.getStep(policyName, errorStepKey);
|
||||
assertThat(actualStep, equalTo(expectedStep));
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue