diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java index abc336aed14..f1424ee4280 100644 --- a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java @@ -33,44 +33,48 @@ public class IndexLifecycleRunner { this.clusterService = clusterService; } - public void runPolicy(String policy, Index index, Settings indexSettings) { + public void runPolicy(String policy, Index index, Settings indexSettings, boolean fromClusterStateChange) { Step currentStep = getCurrentStep(stepRegistry, policy, indexSettings); logger.warn("running policy with current-step[" + currentStep.getKey() + "]"); if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) { - executeClusterStateSteps(index, policy, currentStep); + executeClusterStateSteps(index, policy, currentStep); } else if (currentStep instanceof AsyncWaitStep) { - ((AsyncWaitStep) currentStep).evaluateCondition(index, new AsyncWaitStep.Listener() { - - @Override - public void onResponse(boolean conditionMet) { - logger.error("cs-change-async-wait-callback. current-step:" + currentStep.getKey()); - if (conditionMet) { - moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey()); + if (fromClusterStateChange == false) { + ((AsyncWaitStep) currentStep).evaluateCondition(index, new AsyncWaitStep.Listener() { + + @Override + public void onResponse(boolean conditionMet) { + logger.error("cs-change-async-wait-callback. current-step:" + currentStep.getKey()); + if (conditionMet) { + moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey()); + } } - } - - @Override - public void onFailure(Exception e) { - throw new RuntimeException(e); // NORELEASE implement error handling - } - - }); + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); // NORELEASE implement error handling + } + + }); + } } else if (currentStep instanceof AsyncActionStep) { - ((AsyncActionStep) currentStep).performAction(index, new AsyncActionStep.Listener() { - - @Override - public void onResponse(boolean complete) { - logger.error("cs-change-async-action-callback. current-step:" + currentStep.getKey()); - if (complete && ((AsyncActionStep) currentStep).indexSurvives()) { - moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey()); + if (fromClusterStateChange == false) { + ((AsyncActionStep) currentStep).performAction(index, new AsyncActionStep.Listener() { + + @Override + public void onResponse(boolean complete) { + logger.error("cs-change-async-action-callback. current-step:" + currentStep.getKey()); + if (complete && ((AsyncActionStep) currentStep).indexSurvives()) { + moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey()); + } } - } - - @Override - public void onFailure(Exception e) { - throw new RuntimeException(e); // NORELEASE implement error handling - } - }); + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); // NORELEASE implement error handling + } + }); + } } else { throw new IllegalStateException( "Step with key [" + currentStep.getKey() + "] is not a recognised type: [" + currentStep.getClass().getName() + "]"); @@ -81,7 +85,7 @@ public class IndexLifecycleRunner { IndexMetaData indexMetaData = clusterState.getMetaData().index(index); Settings indexSettings = indexMetaData.getSettings(); String policy = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings); - runPolicy(policy, index, indexSettings); + runPolicy(policy, index, indexSettings, false); } private void executeClusterStateSteps(Index index, String policy, Step step) { diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java index c6577578794..f631bdb3dd0 100644 --- a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java @@ -97,7 +97,7 @@ public class IndexLifecycleService extends AbstractComponent scheduleJob(pollInterval); } - triggerPolicies(event.state()); + triggerPolicies(event.state(), true); } else { cancelJob(); } @@ -119,7 +119,7 @@ public class IndexLifecycleService extends AbstractComponent public void triggered(SchedulerEngine.Event event) { if (event.getJobName().equals(IndexLifecycle.NAME)) { logger.info("Job triggered: " + event.getJobName() + ", " + event.getScheduledTime() + ", " + event.getTriggeredTime()); - triggerPolicies(clusterService.state()); + triggerPolicies(clusterService.state(), false); } } @@ -142,14 +142,14 @@ public class IndexLifecycleService extends AbstractComponent })); } - public void triggerPolicies(ClusterState clusterState) { + public void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) { // loop through all indices in cluster state and filter for ones that are // managed by the Index Lifecycle Service they have a index.lifecycle.name setting // associated to a policy clusterState.metaData().indices().valuesIt().forEachRemaining((idxMeta) -> { String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings()); if (Strings.isNullOrEmpty(policyName) == false) { - lifecycleRunner.runPolicy(policyName, idxMeta.getIndex(), idxMeta.getSettings()); + lifecycleRunner.runPolicy(policyName, idxMeta.getIndex(), idxMeta.getSettings(), fromClusterStateChange); } }); } diff --git a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunnerTests.java b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunnerTests.java index 816ca788a0a..0c5ff36ab49 100644 --- a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunnerTests.java +++ b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunnerTests.java @@ -55,7 +55,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { Index index = new Index("my_index", "my_index_id"); Settings indexSettings = Settings.builder().build(); - runner.runPolicy(policyName, index, indexSettings); + runner.runPolicy(policyName, index, indexSettings, randomBoolean()); Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"), Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(index, policyName, step))); @@ -73,7 +73,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { Index index = new Index("my_index", "my_index_id"); Settings indexSettings = Settings.builder().build(); - runner.runPolicy(policyName, index, indexSettings); + runner.runPolicy(policyName, index, indexSettings, randomBoolean()); Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"), Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(index, policyName, step))); @@ -91,7 +91,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { Index index = new Index("my_index", "my_index_id"); Settings indexSettings = Settings.builder().build(); - runner.runPolicy(policyName, index, indexSettings); + runner.runPolicy(policyName, index, indexSettings, false); assertEquals(1, step.getExecuteCount()); Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"), @@ -111,7 +111,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { Index index = new Index("my_index", "my_index_id"); Settings indexSettings = Settings.builder().build(); - runner.runPolicy(policyName, index, indexSettings); + runner.runPolicy(policyName, index, indexSettings, false); assertEquals(1, step.getExecuteCount()); Mockito.verifyZeroInteractions(clusterService); @@ -128,7 +128,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { Index index = new Index("my_index", "my_index_id"); Settings indexSettings = Settings.builder().build(); - runner.runPolicy(policyName, index, indexSettings); + runner.runPolicy(policyName, index, indexSettings, false); assertEquals(1, step.getExecuteCount()); Mockito.verifyZeroInteractions(clusterService); @@ -147,13 +147,31 @@ public class IndexLifecycleRunnerTests extends ESTestCase { Settings indexSettings = Settings.builder().build(); RuntimeException exception = expectThrows(RuntimeException.class, - () -> runner.runPolicy(policyName, index, indexSettings)); + () -> runner.runPolicy(policyName, index, indexSettings, false)); assertSame(expectedException, exception.getCause()); assertEquals(1, step.getExecuteCount()); Mockito.verifyZeroInteractions(clusterService); } + public void testRunPolicyAsyncActionStepClusterStateChangeIgnored() { + String policyName = "async_action_policy"; + StepKey stepKey = new StepKey("phase", "action", "async_action_step"); + MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null); + Exception expectedException = new RuntimeException(); + step.setException(expectedException); + PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); + ClusterService clusterService = Mockito.mock(ClusterService.class); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService); + Index index = new Index("my_index", "my_index_id"); + Settings indexSettings = Settings.builder().build(); + + runner.runPolicy(policyName, index, indexSettings, true); + + assertEquals(0, step.getExecuteCount()); + Mockito.verifyZeroInteractions(clusterService); + } + public void testRunPolicyAsyncWaitStepCompletes() { String policyName = "async_wait_policy"; StepKey stepKey = new StepKey("phase", "action", "async_wait_step"); @@ -165,7 +183,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { Index index = new Index("my_index", "my_index_id"); Settings indexSettings = Settings.builder().build(); - runner.runPolicy(policyName, index, indexSettings); + runner.runPolicy(policyName, index, indexSettings, false); assertEquals(1, step.getExecuteCount()); Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"), @@ -184,7 +202,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { Index index = new Index("my_index", "my_index_id"); Settings indexSettings = Settings.builder().build(); - runner.runPolicy(policyName, index, indexSettings); + runner.runPolicy(policyName, index, indexSettings, false); assertEquals(1, step.getExecuteCount()); Mockito.verifyZeroInteractions(clusterService); @@ -203,13 +221,31 @@ public class IndexLifecycleRunnerTests extends ESTestCase { Settings indexSettings = Settings.builder().build(); RuntimeException exception = expectThrows(RuntimeException.class, - () -> runner.runPolicy(policyName, index, indexSettings)); + () -> runner.runPolicy(policyName, index, indexSettings, false)); assertSame(expectedException, exception.getCause()); assertEquals(1, step.getExecuteCount()); Mockito.verifyZeroInteractions(clusterService); } + public void testRunPolicyAsyncWaitStepClusterStateChangeIgnored() { + String policyName = "async_wait_policy"; + StepKey stepKey = new StepKey("phase", "action", "async_wait_step"); + MockAsyncWaitStep step = new MockAsyncWaitStep(stepKey, null); + Exception expectedException = new RuntimeException(); + step.setException(expectedException); + PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); + ClusterService clusterService = Mockito.mock(ClusterService.class); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService); + Index index = new Index("my_index", "my_index_id"); + Settings indexSettings = Settings.builder().build(); + + runner.runPolicy(policyName, index, indexSettings, true); + + assertEquals(0, step.getExecuteCount()); + Mockito.verifyZeroInteractions(clusterService); + } + public void testRunPolicyUnknownStepType() { String policyName = "cluster_state_action_policy"; StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); @@ -221,7 +257,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { Settings indexSettings = Settings.builder().build(); IllegalStateException exception = expectThrows(IllegalStateException.class, - () -> runner.runPolicy(policyName, index, indexSettings)); + () -> runner.runPolicy(policyName, index, indexSettings, randomBoolean())); assertEquals("Step with key [" + stepKey + "] is not a recognised type: [" + step.getClass().getName() + "]", exception.getMessage()); Mockito.verifyZeroInteractions(clusterService);