From ed260a04306a2e4cbb295186988732cfebb56e47 Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Thu, 5 Apr 2018 17:10:56 +0100 Subject: [PATCH] Removes Cause stuff --- .../indexlifecycle/IndexLifecycleRunner.java | 82 ++++++++--------- .../indexlifecycle/IndexLifecycleService.java | 9 +- .../IndexLifecycleRunnerTests.java | 90 +++---------------- 3 files changed, 50 insertions(+), 131 deletions(-) diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java index 262573baed2..abc336aed14 100644 --- a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java @@ -33,61 +33,55 @@ public class IndexLifecycleRunner { this.clusterService = clusterService; } - public void runPolicy(String policy, Index index, Settings indexSettings, Cause cause) { + public void runPolicy(String policy, Index index, Settings indexSettings) { Step currentStep = getCurrentStep(stepRegistry, policy, indexSettings); logger.warn("running policy with current-step[" + currentStep.getKey() + "]"); if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) { - if (cause != Cause.SCHEDULE_TRIGGER) { - executeClusterStateSteps(index, policy, currentStep); - } + executeClusterStateSteps(index, policy, currentStep); } else if (currentStep instanceof AsyncWaitStep) { - if (cause != Cause.CLUSTER_STATE_CHANGE) { - ((AsyncWaitStep) currentStep).evaluateCondition(index, new AsyncWaitStep.Listener() { - - @Override - public void onResponse(boolean conditionMet) { - logger.error("cs-change-async-wait-callback. current-step:" + currentStep.getKey()); - if (conditionMet) { - moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey(), Cause.CALLBACK); - } + ((AsyncWaitStep) currentStep).evaluateCondition(index, new AsyncWaitStep.Listener() { + + @Override + public void onResponse(boolean conditionMet) { + logger.error("cs-change-async-wait-callback. current-step:" + currentStep.getKey()); + if (conditionMet) { + moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey()); } - - @Override - public void onFailure(Exception e) { - throw new RuntimeException(e); // NORELEASE implement error handling - } - - }); - } + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); // NORELEASE implement error handling + } + + }); } else if (currentStep instanceof AsyncActionStep) { - if (cause != Cause.CLUSTER_STATE_CHANGE) { - ((AsyncActionStep) currentStep).performAction(index, new AsyncActionStep.Listener() { - - @Override - public void onResponse(boolean complete) { - logger.error("cs-change-async-action-callback. current-step:" + currentStep.getKey()); - if (complete && ((AsyncActionStep) currentStep).indexSurvives()) { - moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey(), Cause.CALLBACK); - } + ((AsyncActionStep) currentStep).performAction(index, new AsyncActionStep.Listener() { + + @Override + public void onResponse(boolean complete) { + logger.error("cs-change-async-action-callback. current-step:" + currentStep.getKey()); + if (complete && ((AsyncActionStep) currentStep).indexSurvives()) { + moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey()); } - - @Override - public void onFailure(Exception e) { - throw new RuntimeException(e); // NORELEASE implement error handling - } - }); - } + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); // NORELEASE implement error handling + } + }); } else { throw new IllegalStateException( "Step with key [" + currentStep.getKey() + "] is not a recognised type: [" + currentStep.getClass().getName() + "]"); } } - private void runPolicy(Index index, ClusterState clusterState, Cause cause) { + private void runPolicy(Index index, ClusterState clusterState) { IndexMetaData indexMetaData = clusterState.getMetaData().index(index); Settings indexSettings = indexMetaData.getSettings(); String policy = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings); - runPolicy(policy, index, indexSettings, cause); + runPolicy(policy, index, indexSettings); } private void executeClusterStateSteps(Index index, String policy, Step step) { @@ -139,14 +133,10 @@ public class IndexLifecycleRunner { return newClusterStateBuilder.build(); } - private void moveToStep(Index index, String policy, StepKey currentStepKey, StepKey nextStepKey, Cause cause) { + private void moveToStep(Index index, String policy, StepKey currentStepKey, StepKey nextStepKey) { logger.error("moveToStep[" + policy + "] [" + index.getName() + "]" + currentStepKey + " -> " - + nextStepKey + ". because:" + cause.name()); + + nextStepKey); clusterService.submitStateUpdateTask("ILM", new MoveToNextStepUpdateTask(index, policy, currentStepKey, - nextStepKey, newState -> runPolicy(index, newState, cause))); - } - - public enum Cause { - CLUSTER_STATE_CHANGE, SCHEDULE_TRIGGER, CALLBACK; + nextStepKey, newState -> runPolicy(index, newState))); } } diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java index 2a0e3a6d2e1..c6577578794 100644 --- a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java @@ -23,7 +23,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; -import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleRunner.Cause; import java.io.Closeable; import java.time.Clock; @@ -98,7 +97,7 @@ public class IndexLifecycleService extends AbstractComponent scheduleJob(pollInterval); } - triggerPolicies(event.state(), Cause.CLUSTER_STATE_CHANGE); + triggerPolicies(event.state()); } else { cancelJob(); } @@ -120,7 +119,7 @@ public class IndexLifecycleService extends AbstractComponent public void triggered(SchedulerEngine.Event event) { if (event.getJobName().equals(IndexLifecycle.NAME)) { logger.info("Job triggered: " + event.getJobName() + ", " + event.getScheduledTime() + ", " + event.getTriggeredTime()); - triggerPolicies(clusterService.state(), Cause.SCHEDULE_TRIGGER); + triggerPolicies(clusterService.state()); } } @@ -143,14 +142,14 @@ public class IndexLifecycleService extends AbstractComponent })); } - public void triggerPolicies(ClusterState clusterState, Cause cause) { + public void triggerPolicies(ClusterState clusterState) { // loop through all indices in cluster state and filter for ones that are // managed by the Index Lifecycle Service they have a index.lifecycle.name setting // associated to a policy clusterState.metaData().indices().valuesIt().forEachRemaining((idxMeta) -> { String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings()); if (Strings.isNullOrEmpty(policyName) == false) { - lifecycleRunner.runPolicy(policyName, idxMeta.getIndex(), idxMeta.getSettings(), cause); + lifecycleRunner.runPolicy(policyName, idxMeta.getIndex(), idxMeta.getSettings()); } }); } diff --git a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunnerTests.java b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunnerTests.java index 5206374b7e5..816ca788a0a 100644 --- a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunnerTests.java +++ b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunnerTests.java @@ -23,7 +23,6 @@ import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings; import org.elasticsearch.xpack.core.indexlifecycle.MockStep; import org.elasticsearch.xpack.core.indexlifecycle.Step; import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey; -import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleRunner.Cause; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; @@ -31,7 +30,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.SortedMap; -import java.util.function.Supplier; public class IndexLifecycleRunnerTests extends ESTestCase { @@ -57,29 +55,13 @@ public class IndexLifecycleRunnerTests extends ESTestCase { Index index = new Index("my_index", "my_index_id"); Settings indexSettings = Settings.builder().build(); - runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.CLUSTER_STATE_CHANGE)); + runner.runPolicy(policyName, index, indexSettings); Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"), Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(index, policyName, step))); Mockito.verifyNoMoreInteractions(clusterService); } - public void testRunPolicyClusterStateActionStepScheduleTriggerIgnored() { - String policyName = "cluster_state_action_policy"; - StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); - MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, null); - step.setWillComplete(true); - PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = Mockito.mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService); - Index index = new Index("my_index", "my_index_id"); - Settings indexSettings = Settings.builder().build(); - - runner.runPolicy(policyName, index, indexSettings, Cause.SCHEDULE_TRIGGER); - - Mockito.verifyZeroInteractions(clusterService); - } - public void testRunPolicyClusterStateWaitStep() { String policyName = "cluster_state_action_policy"; StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); @@ -91,29 +73,13 @@ public class IndexLifecycleRunnerTests extends ESTestCase { Index index = new Index("my_index", "my_index_id"); Settings indexSettings = Settings.builder().build(); - runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.CLUSTER_STATE_CHANGE)); + runner.runPolicy(policyName, index, indexSettings); Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"), Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(index, policyName, step))); Mockito.verifyNoMoreInteractions(clusterService); } - public void testRunPolicyClusterStateWaitStepScheduleTriggerIgnored() { - String policyName = "cluster_state_action_policy"; - StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); - MockClusterStateWaitStep step = new MockClusterStateWaitStep(stepKey, null); - step.setWillComplete(true); - PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = Mockito.mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService); - Index index = new Index("my_index", "my_index_id"); - Settings indexSettings = Settings.builder().build(); - - runner.runPolicy(policyName, index, indexSettings, Cause.SCHEDULE_TRIGGER); - - Mockito.verifyZeroInteractions(clusterService); - } - public void testRunPolicyAsyncActionStepCompletes() { String policyName = "async_action_policy"; StepKey stepKey = new StepKey("phase", "action", "async_action_step"); @@ -125,7 +91,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { Index index = new Index("my_index", "my_index_id"); Settings indexSettings = Settings.builder().build(); - runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER)); + runner.runPolicy(policyName, index, indexSettings); assertEquals(1, step.getExecuteCount()); Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"), @@ -145,7 +111,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { Index index = new Index("my_index", "my_index_id"); Settings indexSettings = Settings.builder().build(); - runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER)); + runner.runPolicy(policyName, index, indexSettings); assertEquals(1, step.getExecuteCount()); Mockito.verifyZeroInteractions(clusterService); @@ -162,7 +128,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { Index index = new Index("my_index", "my_index_id"); Settings indexSettings = Settings.builder().build(); - runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER)); + runner.runPolicy(policyName, index, indexSettings); assertEquals(1, step.getExecuteCount()); Mockito.verifyZeroInteractions(clusterService); @@ -181,31 +147,13 @@ public class IndexLifecycleRunnerTests extends ESTestCase { Settings indexSettings = Settings.builder().build(); RuntimeException exception = expectThrows(RuntimeException.class, - () -> runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER))); + () -> runner.runPolicy(policyName, index, indexSettings)); assertSame(expectedException, exception.getCause()); assertEquals(1, step.getExecuteCount()); Mockito.verifyZeroInteractions(clusterService); } - public void testRunPolicyAsyncActionStepClusterStateChangeIgnored() { - String policyName = "async_action_policy"; - StepKey stepKey = new StepKey("phase", "action", "async_action_step"); - MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null); - Exception expectedException = new RuntimeException(); - step.setException(expectedException); - PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = Mockito.mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService); - Index index = new Index("my_index", "my_index_id"); - Settings indexSettings = Settings.builder().build(); - - runner.runPolicy(policyName, index, indexSettings, Cause.CLUSTER_STATE_CHANGE); - - assertEquals(0, step.getExecuteCount()); - Mockito.verifyZeroInteractions(clusterService); - } - public void testRunPolicyAsyncWaitStepCompletes() { String policyName = "async_wait_policy"; StepKey stepKey = new StepKey("phase", "action", "async_wait_step"); @@ -217,7 +165,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { Index index = new Index("my_index", "my_index_id"); Settings indexSettings = Settings.builder().build(); - runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER)); + runner.runPolicy(policyName, index, indexSettings); assertEquals(1, step.getExecuteCount()); Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"), @@ -236,7 +184,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { Index index = new Index("my_index", "my_index_id"); Settings indexSettings = Settings.builder().build(); - runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER)); + runner.runPolicy(policyName, index, indexSettings); assertEquals(1, step.getExecuteCount()); Mockito.verifyZeroInteractions(clusterService); @@ -255,31 +203,13 @@ public class IndexLifecycleRunnerTests extends ESTestCase { Settings indexSettings = Settings.builder().build(); RuntimeException exception = expectThrows(RuntimeException.class, - () -> runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER))); + () -> runner.runPolicy(policyName, index, indexSettings)); assertSame(expectedException, exception.getCause()); assertEquals(1, step.getExecuteCount()); Mockito.verifyZeroInteractions(clusterService); } - public void testRunPolicyAsyncWaitStepClusterStateChangeIgnored() { - String policyName = "async_wait_policy"; - StepKey stepKey = new StepKey("phase", "action", "async_wait_step"); - MockAsyncWaitStep step = new MockAsyncWaitStep(stepKey, null); - Exception expectedException = new RuntimeException(); - step.setException(expectedException); - PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = Mockito.mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService); - Index index = new Index("my_index", "my_index_id"); - Settings indexSettings = Settings.builder().build(); - - runner.runPolicy(policyName, index, indexSettings, Cause.CLUSTER_STATE_CHANGE); - - assertEquals(0, step.getExecuteCount()); - Mockito.verifyZeroInteractions(clusterService); - } - public void testRunPolicyUnknownStepType() { String policyName = "cluster_state_action_policy"; StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); @@ -291,7 +221,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { Settings indexSettings = Settings.builder().build(); IllegalStateException exception = expectThrows(IllegalStateException.class, - () -> runner.runPolicy(policyName, index, indexSettings, Cause.SCHEDULE_TRIGGER)); + () -> runner.runPolicy(policyName, index, indexSettings)); assertEquals("Step with key [" + stepKey + "] is not a recognised type: [" + step.getClass().getName() + "]", exception.getMessage()); Mockito.verifyZeroInteractions(clusterService);