Removes Cause stuff

This commit is contained in:
Colin Goodheart-Smithe 2018-04-05 17:10:56 +01:00
parent 907586c9e4
commit ed260a0430
3 changed files with 50 additions and 131 deletions

View File

@ -33,61 +33,55 @@ public class IndexLifecycleRunner {
this.clusterService = clusterService; this.clusterService = clusterService;
} }
public void runPolicy(String policy, Index index, Settings indexSettings, Cause cause) { public void runPolicy(String policy, Index index, Settings indexSettings) {
Step currentStep = getCurrentStep(stepRegistry, policy, indexSettings); Step currentStep = getCurrentStep(stepRegistry, policy, indexSettings);
logger.warn("running policy with current-step[" + currentStep.getKey() + "]"); logger.warn("running policy with current-step[" + currentStep.getKey() + "]");
if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) { if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) {
if (cause != Cause.SCHEDULE_TRIGGER) { executeClusterStateSteps(index, policy, currentStep);
executeClusterStateSteps(index, policy, currentStep);
}
} else if (currentStep instanceof AsyncWaitStep) { } else if (currentStep instanceof AsyncWaitStep) {
if (cause != Cause.CLUSTER_STATE_CHANGE) { ((AsyncWaitStep) currentStep).evaluateCondition(index, new AsyncWaitStep.Listener() {
((AsyncWaitStep) currentStep).evaluateCondition(index, new AsyncWaitStep.Listener() {
@Override
@Override public void onResponse(boolean conditionMet) {
public void onResponse(boolean conditionMet) { logger.error("cs-change-async-wait-callback. current-step:" + currentStep.getKey());
logger.error("cs-change-async-wait-callback. current-step:" + currentStep.getKey()); if (conditionMet) {
if (conditionMet) { moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey());
moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey(), Cause.CALLBACK);
}
} }
}
@Override
public void onFailure(Exception e) { @Override
throw new RuntimeException(e); // NORELEASE implement error handling public void onFailure(Exception e) {
} throw new RuntimeException(e); // NORELEASE implement error handling
}
});
} });
} else if (currentStep instanceof AsyncActionStep) { } else if (currentStep instanceof AsyncActionStep) {
if (cause != Cause.CLUSTER_STATE_CHANGE) { ((AsyncActionStep) currentStep).performAction(index, new AsyncActionStep.Listener() {
((AsyncActionStep) currentStep).performAction(index, new AsyncActionStep.Listener() {
@Override
@Override public void onResponse(boolean complete) {
public void onResponse(boolean complete) { logger.error("cs-change-async-action-callback. current-step:" + currentStep.getKey());
logger.error("cs-change-async-action-callback. current-step:" + currentStep.getKey()); if (complete && ((AsyncActionStep) currentStep).indexSurvives()) {
if (complete && ((AsyncActionStep) currentStep).indexSurvives()) { moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey());
moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey(), Cause.CALLBACK);
}
} }
}
@Override
public void onFailure(Exception e) { @Override
throw new RuntimeException(e); // NORELEASE implement error handling public void onFailure(Exception e) {
} throw new RuntimeException(e); // NORELEASE implement error handling
}); }
} });
} else { } else {
throw new IllegalStateException( throw new IllegalStateException(
"Step with key [" + currentStep.getKey() + "] is not a recognised type: [" + currentStep.getClass().getName() + "]"); "Step with key [" + currentStep.getKey() + "] is not a recognised type: [" + currentStep.getClass().getName() + "]");
} }
} }
private void runPolicy(Index index, ClusterState clusterState, Cause cause) { private void runPolicy(Index index, ClusterState clusterState) {
IndexMetaData indexMetaData = clusterState.getMetaData().index(index); IndexMetaData indexMetaData = clusterState.getMetaData().index(index);
Settings indexSettings = indexMetaData.getSettings(); Settings indexSettings = indexMetaData.getSettings();
String policy = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings); String policy = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings);
runPolicy(policy, index, indexSettings, cause); runPolicy(policy, index, indexSettings);
} }
private void executeClusterStateSteps(Index index, String policy, Step step) { private void executeClusterStateSteps(Index index, String policy, Step step) {
@ -139,14 +133,10 @@ public class IndexLifecycleRunner {
return newClusterStateBuilder.build(); return newClusterStateBuilder.build();
} }
private void moveToStep(Index index, String policy, StepKey currentStepKey, StepKey nextStepKey, Cause cause) { private void moveToStep(Index index, String policy, StepKey currentStepKey, StepKey nextStepKey) {
logger.error("moveToStep[" + policy + "] [" + index.getName() + "]" + currentStepKey + " -> " logger.error("moveToStep[" + policy + "] [" + index.getName() + "]" + currentStepKey + " -> "
+ nextStepKey + ". because:" + cause.name()); + nextStepKey);
clusterService.submitStateUpdateTask("ILM", new MoveToNextStepUpdateTask(index, policy, currentStepKey, clusterService.submitStateUpdateTask("ILM", new MoveToNextStepUpdateTask(index, policy, currentStepKey,
nextStepKey, newState -> runPolicy(index, newState, cause))); nextStepKey, newState -> runPolicy(index, newState)));
}
public enum Cause {
CLUSTER_STATE_CHANGE, SCHEDULE_TRIGGER, CALLBACK;
} }
} }

View File

@ -23,7 +23,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleRunner.Cause;
import java.io.Closeable; import java.io.Closeable;
import java.time.Clock; import java.time.Clock;
@ -98,7 +97,7 @@ public class IndexLifecycleService extends AbstractComponent
scheduleJob(pollInterval); scheduleJob(pollInterval);
} }
triggerPolicies(event.state(), Cause.CLUSTER_STATE_CHANGE); triggerPolicies(event.state());
} else { } else {
cancelJob(); cancelJob();
} }
@ -120,7 +119,7 @@ public class IndexLifecycleService extends AbstractComponent
public void triggered(SchedulerEngine.Event event) { public void triggered(SchedulerEngine.Event event) {
if (event.getJobName().equals(IndexLifecycle.NAME)) { if (event.getJobName().equals(IndexLifecycle.NAME)) {
logger.info("Job triggered: " + event.getJobName() + ", " + event.getScheduledTime() + ", " + event.getTriggeredTime()); logger.info("Job triggered: " + event.getJobName() + ", " + event.getScheduledTime() + ", " + event.getTriggeredTime());
triggerPolicies(clusterService.state(), Cause.SCHEDULE_TRIGGER); triggerPolicies(clusterService.state());
} }
} }
@ -143,14 +142,14 @@ public class IndexLifecycleService extends AbstractComponent
})); }));
} }
public void triggerPolicies(ClusterState clusterState, Cause cause) { public void triggerPolicies(ClusterState clusterState) {
// loop through all indices in cluster state and filter for ones that are // loop through all indices in cluster state and filter for ones that are
// managed by the Index Lifecycle Service they have a index.lifecycle.name setting // managed by the Index Lifecycle Service they have a index.lifecycle.name setting
// associated to a policy // associated to a policy
clusterState.metaData().indices().valuesIt().forEachRemaining((idxMeta) -> { clusterState.metaData().indices().valuesIt().forEachRemaining((idxMeta) -> {
String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings()); String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings());
if (Strings.isNullOrEmpty(policyName) == false) { if (Strings.isNullOrEmpty(policyName) == false) {
lifecycleRunner.runPolicy(policyName, idxMeta.getIndex(), idxMeta.getSettings(), cause); lifecycleRunner.runPolicy(policyName, idxMeta.getIndex(), idxMeta.getSettings());
} }
}); });
} }

View File

@ -23,7 +23,6 @@ import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
import org.elasticsearch.xpack.core.indexlifecycle.MockStep; import org.elasticsearch.xpack.core.indexlifecycle.MockStep;
import org.elasticsearch.xpack.core.indexlifecycle.Step; import org.elasticsearch.xpack.core.indexlifecycle.Step;
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey; import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleRunner.Cause;
import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatcher;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -31,7 +30,6 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.function.Supplier;
public class IndexLifecycleRunnerTests extends ESTestCase { public class IndexLifecycleRunnerTests extends ESTestCase {
@ -57,29 +55,13 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
Index index = new Index("my_index", "my_index_id"); Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build(); Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.CLUSTER_STATE_CHANGE)); runner.runPolicy(policyName, index, indexSettings);
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"), Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(index, policyName, step))); Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(index, policyName, step)));
Mockito.verifyNoMoreInteractions(clusterService); Mockito.verifyNoMoreInteractions(clusterService);
} }
public void testRunPolicyClusterStateActionStepScheduleTriggerIgnored() {
String policyName = "cluster_state_action_policy";
StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step");
MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, null);
step.setWillComplete(true);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, Cause.SCHEDULE_TRIGGER);
Mockito.verifyZeroInteractions(clusterService);
}
public void testRunPolicyClusterStateWaitStep() { public void testRunPolicyClusterStateWaitStep() {
String policyName = "cluster_state_action_policy"; String policyName = "cluster_state_action_policy";
StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step");
@ -91,29 +73,13 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
Index index = new Index("my_index", "my_index_id"); Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build(); Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.CLUSTER_STATE_CHANGE)); runner.runPolicy(policyName, index, indexSettings);
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"), Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(index, policyName, step))); Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(index, policyName, step)));
Mockito.verifyNoMoreInteractions(clusterService); Mockito.verifyNoMoreInteractions(clusterService);
} }
public void testRunPolicyClusterStateWaitStepScheduleTriggerIgnored() {
String policyName = "cluster_state_action_policy";
StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step");
MockClusterStateWaitStep step = new MockClusterStateWaitStep(stepKey, null);
step.setWillComplete(true);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, Cause.SCHEDULE_TRIGGER);
Mockito.verifyZeroInteractions(clusterService);
}
public void testRunPolicyAsyncActionStepCompletes() { public void testRunPolicyAsyncActionStepCompletes() {
String policyName = "async_action_policy"; String policyName = "async_action_policy";
StepKey stepKey = new StepKey("phase", "action", "async_action_step"); StepKey stepKey = new StepKey("phase", "action", "async_action_step");
@ -125,7 +91,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
Index index = new Index("my_index", "my_index_id"); Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build(); Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER)); runner.runPolicy(policyName, index, indexSettings);
assertEquals(1, step.getExecuteCount()); assertEquals(1, step.getExecuteCount());
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"), Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
@ -145,7 +111,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
Index index = new Index("my_index", "my_index_id"); Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build(); Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER)); runner.runPolicy(policyName, index, indexSettings);
assertEquals(1, step.getExecuteCount()); assertEquals(1, step.getExecuteCount());
Mockito.verifyZeroInteractions(clusterService); Mockito.verifyZeroInteractions(clusterService);
@ -162,7 +128,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
Index index = new Index("my_index", "my_index_id"); Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build(); Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER)); runner.runPolicy(policyName, index, indexSettings);
assertEquals(1, step.getExecuteCount()); assertEquals(1, step.getExecuteCount());
Mockito.verifyZeroInteractions(clusterService); Mockito.verifyZeroInteractions(clusterService);
@ -181,31 +147,13 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
Settings indexSettings = Settings.builder().build(); Settings indexSettings = Settings.builder().build();
RuntimeException exception = expectThrows(RuntimeException.class, RuntimeException exception = expectThrows(RuntimeException.class,
() -> runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER))); () -> runner.runPolicy(policyName, index, indexSettings));
assertSame(expectedException, exception.getCause()); assertSame(expectedException, exception.getCause());
assertEquals(1, step.getExecuteCount()); assertEquals(1, step.getExecuteCount());
Mockito.verifyZeroInteractions(clusterService); Mockito.verifyZeroInteractions(clusterService);
} }
public void testRunPolicyAsyncActionStepClusterStateChangeIgnored() {
String policyName = "async_action_policy";
StepKey stepKey = new StepKey("phase", "action", "async_action_step");
MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null);
Exception expectedException = new RuntimeException();
step.setException(expectedException);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, Cause.CLUSTER_STATE_CHANGE);
assertEquals(0, step.getExecuteCount());
Mockito.verifyZeroInteractions(clusterService);
}
public void testRunPolicyAsyncWaitStepCompletes() { public void testRunPolicyAsyncWaitStepCompletes() {
String policyName = "async_wait_policy"; String policyName = "async_wait_policy";
StepKey stepKey = new StepKey("phase", "action", "async_wait_step"); StepKey stepKey = new StepKey("phase", "action", "async_wait_step");
@ -217,7 +165,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
Index index = new Index("my_index", "my_index_id"); Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build(); Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER)); runner.runPolicy(policyName, index, indexSettings);
assertEquals(1, step.getExecuteCount()); assertEquals(1, step.getExecuteCount());
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"), Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
@ -236,7 +184,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
Index index = new Index("my_index", "my_index_id"); Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build(); Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER)); runner.runPolicy(policyName, index, indexSettings);
assertEquals(1, step.getExecuteCount()); assertEquals(1, step.getExecuteCount());
Mockito.verifyZeroInteractions(clusterService); Mockito.verifyZeroInteractions(clusterService);
@ -255,31 +203,13 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
Settings indexSettings = Settings.builder().build(); Settings indexSettings = Settings.builder().build();
RuntimeException exception = expectThrows(RuntimeException.class, RuntimeException exception = expectThrows(RuntimeException.class,
() -> runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER))); () -> runner.runPolicy(policyName, index, indexSettings));
assertSame(expectedException, exception.getCause()); assertSame(expectedException, exception.getCause());
assertEquals(1, step.getExecuteCount()); assertEquals(1, step.getExecuteCount());
Mockito.verifyZeroInteractions(clusterService); Mockito.verifyZeroInteractions(clusterService);
} }
public void testRunPolicyAsyncWaitStepClusterStateChangeIgnored() {
String policyName = "async_wait_policy";
StepKey stepKey = new StepKey("phase", "action", "async_wait_step");
MockAsyncWaitStep step = new MockAsyncWaitStep(stepKey, null);
Exception expectedException = new RuntimeException();
step.setException(expectedException);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, Cause.CLUSTER_STATE_CHANGE);
assertEquals(0, step.getExecuteCount());
Mockito.verifyZeroInteractions(clusterService);
}
public void testRunPolicyUnknownStepType() { public void testRunPolicyUnknownStepType() {
String policyName = "cluster_state_action_policy"; String policyName = "cluster_state_action_policy";
StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step");
@ -291,7 +221,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
Settings indexSettings = Settings.builder().build(); Settings indexSettings = Settings.builder().build();
IllegalStateException exception = expectThrows(IllegalStateException.class, IllegalStateException exception = expectThrows(IllegalStateException.class,
() -> runner.runPolicy(policyName, index, indexSettings, Cause.SCHEDULE_TRIGGER)); () -> runner.runPolicy(policyName, index, indexSettings));
assertEquals("Step with key [" + stepKey + "] is not a recognised type: [" + step.getClass().getName() + "]", assertEquals("Step with key [" + stepKey + "] is not a recognised type: [" + step.getClass().getName() + "]",
exception.getMessage()); exception.getMessage());
Mockito.verifyZeroInteractions(clusterService); Mockito.verifyZeroInteractions(clusterService);