Adds more unit tests for IndexLifecycleRunner

This commit is contained in:
Colin Goodheart-Smithe 2018-04-04 15:59:10 +01:00
parent 9ef26dbe51
commit a8dc8a7c98
7 changed files with 494 additions and 14 deletions

View File

@ -21,6 +21,10 @@ public abstract class AsyncActionStep extends Step {
return client;
}
public boolean indexSurvives() {
return true;
}
public abstract void performAction(Index index, Listener listener);
public static interface Listener {

View File

@ -16,4 +16,8 @@ public abstract class ClusterStateActionStep extends Step {
public abstract ClusterState performAction(Index index, ClusterState clusterState);
public boolean indexSurvives() {
return true;
}
}

View File

@ -5,9 +5,6 @@
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import java.util.Objects;
/**
@ -30,10 +27,6 @@ public abstract class Step {
return nextStepKey;
}
public boolean indexSurvives() {
return true;
}
@Override
public int hashCode() {
return Objects.hash(key, nextStepKey);

View File

@ -14,9 +14,6 @@ import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateActionStep;
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep;
import org.elasticsearch.xpack.core.indexlifecycle.Step;
import java.util.function.BiFunction;
import java.util.function.Function;
public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
private static final Logger logger = ESLoggerFactory.getLogger(ExecuteStepsUpdateTask.class);
private final String policy;
@ -31,6 +28,18 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
this.policyStepsRegistry = policyStepsRegistry;
}
String getPolicy() {
return policy;
}
Index getIndex() {
return index;
}
Step getStartStep() {
return startStep;
}
@Override
public ClusterState execute(ClusterState currentState) {

View File

@ -66,7 +66,7 @@ public class IndexLifecycleRunner {
@Override
public void onResponse(boolean complete) {
logger.error("cs-change-async-action-callback. current-step:" + currentStep.getKey());
if (complete && currentStep.indexSurvives()) {
if (complete && ((AsyncActionStep) currentStep).indexSurvives()) {
moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey(), Cause.CALLBACK);
}
}

View File

@ -31,6 +31,22 @@ public class MoveToNextStepUpdateTask extends ClusterStateUpdateTask {
this.listener = listener;
}
Index getIndex() {
return index;
}
String getPolicy() {
return policy;
}
Step.StepKey getCurrentStepKey() {
return currentStepKey;
}
Step.StepKey getNextStepKey() {
return nextStepKey;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Settings indexSettings = currentState.getMetaData().index(index).getSettings();

View File

@ -10,25 +10,290 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.indexlifecycle.AsyncActionStep;
import org.elasticsearch.xpack.core.indexlifecycle.AsyncWaitStep;
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateActionStep;
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
import org.elasticsearch.xpack.core.indexlifecycle.MockStep;
import org.elasticsearch.xpack.core.indexlifecycle.Step;
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
import org.junit.Before;
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleRunner.Cause;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
public class IndexLifecycleRunnerTests extends ESTestCase {
private PolicyStepsRegistry createOneStepPolicyStepRegistry(String policyName, Step step) {
SortedMap<String, LifecyclePolicy> lifecyclePolicyMap = null; // Not used in this test
Map<String, Step> firstStepMap = new HashMap<>();
firstStepMap.put(policyName, step);
Map<String, Map<StepKey, Step>> stepMap = new HashMap<>();
Map<StepKey, Step> policySteps = new HashMap<>();
policySteps.put(step.getKey(), step);
stepMap.put(policyName, policySteps);
return new PolicyStepsRegistry(lifecyclePolicyMap, firstStepMap, stepMap);
}
@Before
public void prepareServices() {
public void testRunPolicyClusterStateActionStep() {
String policyName = "cluster_state_action_policy";
StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step");
MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, null);
step.setWillComplete(true);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.CLUSTER_STATE_CHANGE));
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(index, policyName, step)));
Mockito.verifyNoMoreInteractions(clusterService);
}
public void testRunPolicyClusterStateActionStepScheduleTriggerIgnored() {
String policyName = "cluster_state_action_policy";
StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step");
MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, null);
step.setWillComplete(true);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, Cause.SCHEDULE_TRIGGER);
Mockito.verifyZeroInteractions(clusterService);
}
public void testRunPolicyClusterStateWaitStep() {
String policyName = "cluster_state_action_policy";
StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step");
MockClusterStateWaitStep step = new MockClusterStateWaitStep(stepKey, null);
step.setWillComplete(true);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.CLUSTER_STATE_CHANGE));
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(index, policyName, step)));
Mockito.verifyNoMoreInteractions(clusterService);
}
public void testRunPolicyClusterStateWaitStepScheduleTriggerIgnored() {
String policyName = "cluster_state_action_policy";
StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step");
MockClusterStateWaitStep step = new MockClusterStateWaitStep(stepKey, null);
step.setWillComplete(true);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, Cause.SCHEDULE_TRIGGER);
Mockito.verifyZeroInteractions(clusterService);
}
public void testRunPolicyAsyncActionStepCompletes() {
String policyName = "async_action_policy";
StepKey stepKey = new StepKey("phase", "action", "async_action_step");
MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null);
step.setWillComplete(true);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER));
assertEquals(1, step.getExecuteCount());
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
Mockito.argThat(new MoveToNextStepUpdateTaskMatcher(index, policyName, stepKey, null)));
Mockito.verifyNoMoreInteractions(clusterService);
}
public void testRunPolicyAsyncActionStepCompletesIndexDestroyed() {
String policyName = "async_action_policy";
StepKey stepKey = new StepKey("phase", "action", "async_action_step");
MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null);
step.setWillComplete(true);
step.setIndexSurvives(false);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER));
assertEquals(1, step.getExecuteCount());
Mockito.verifyZeroInteractions(clusterService);
}
public void testRunPolicyAsyncActionStepNotComplete() {
String policyName = "async_action_policy";
StepKey stepKey = new StepKey("phase", "action", "async_action_step");
MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null);
step.setWillComplete(false);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER));
assertEquals(1, step.getExecuteCount());
Mockito.verifyZeroInteractions(clusterService);
}
public void testRunPolicyAsyncActionStepFails() {
String policyName = "async_action_policy";
StepKey stepKey = new StepKey("phase", "action", "async_action_step");
MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null);
Exception expectedException = new RuntimeException();
step.setException(expectedException);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build();
RuntimeException exception = expectThrows(RuntimeException.class,
() -> runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER)));
assertSame(expectedException, exception.getCause());
assertEquals(1, step.getExecuteCount());
Mockito.verifyZeroInteractions(clusterService);
}
public void testRunPolicyAsyncActionStepClusterStateChangeIgnored() {
String policyName = "async_action_policy";
StepKey stepKey = new StepKey("phase", "action", "async_action_step");
MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null);
Exception expectedException = new RuntimeException();
step.setException(expectedException);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, Cause.CLUSTER_STATE_CHANGE);
assertEquals(0, step.getExecuteCount());
Mockito.verifyZeroInteractions(clusterService);
}
public void testRunPolicyAsyncWaitStepCompletes() {
String policyName = "async_wait_policy";
StepKey stepKey = new StepKey("phase", "action", "async_wait_step");
MockAsyncWaitStep step = new MockAsyncWaitStep(stepKey, null);
step.setWillComplete(true);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER));
assertEquals(1, step.getExecuteCount());
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
Mockito.argThat(new MoveToNextStepUpdateTaskMatcher(index, policyName, stepKey, null)));
Mockito.verifyNoMoreInteractions(clusterService);
}
public void testRunPolicyAsyncWaitStepNotComplete() {
String policyName = "async_wait_policy";
StepKey stepKey = new StepKey("phase", "action", "async_wait_step");
MockAsyncWaitStep step = new MockAsyncWaitStep(stepKey, null);
step.setWillComplete(false);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER));
assertEquals(1, step.getExecuteCount());
Mockito.verifyZeroInteractions(clusterService);
}
public void testRunPolicyAsyncWaitStepFails() {
String policyName = "async_wait_policy";
StepKey stepKey = new StepKey("phase", "action", "async_wait_step");
MockAsyncWaitStep step = new MockAsyncWaitStep(stepKey, null);
Exception expectedException = new RuntimeException();
step.setException(expectedException);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build();
RuntimeException exception = expectThrows(RuntimeException.class,
() -> runner.runPolicy(policyName, index, indexSettings, randomFrom(Cause.CALLBACK, Cause.SCHEDULE_TRIGGER)));
assertSame(expectedException, exception.getCause());
assertEquals(1, step.getExecuteCount());
Mockito.verifyZeroInteractions(clusterService);
}
public void testRunPolicyAsyncWaitStepClusterStateChangeIgnored() {
String policyName = "async_wait_policy";
StepKey stepKey = new StepKey("phase", "action", "async_wait_step");
MockAsyncWaitStep step = new MockAsyncWaitStep(stepKey, null);
Exception expectedException = new RuntimeException();
step.setException(expectedException);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build();
runner.runPolicy(policyName, index, indexSettings, Cause.CLUSTER_STATE_CHANGE);
assertEquals(0, step.getExecuteCount());
Mockito.verifyZeroInteractions(clusterService);
}
public void testRunPolicyUnknownStepType() {
String policyName = "cluster_state_action_policy";
StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step");
MockStep step = new MockStep(stepKey, null);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
Index index = new Index("my_index", "my_index_id");
Settings indexSettings = Settings.builder().build();
IllegalStateException exception = expectThrows(IllegalStateException.class,
() -> runner.runPolicy(policyName, index, indexSettings, Cause.SCHEDULE_TRIGGER));
assertEquals("Step with key [" + stepKey + "] is not a recognised type: [" + step.getClass().getName() + "]",
exception.getMessage());
Mockito.verifyZeroInteractions(clusterService);
}
public void testGetCurrentStepKey() {
@ -238,4 +503,193 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
assertEquals(nextStep.getAction(), LifecycleSettings.LIFECYCLE_ACTION_SETTING.get(newIndexSettings));
assertEquals(nextStep.getName(), LifecycleSettings.LIFECYCLE_STEP_SETTING.get(newIndexSettings));
}
private static class MockAsyncActionStep extends AsyncActionStep {
private Exception exception;
private boolean willComplete;
private boolean indexSurvives = true;
private long executeCount = 0;
public MockAsyncActionStep(StepKey key, StepKey nextStepKey) {
super(key, nextStepKey, null);
}
public void setException(Exception exception) {
this.exception = exception;
}
public void setIndexSurvives(boolean indexSurvives) {
this.indexSurvives = indexSurvives;
}
@Override
public boolean indexSurvives() {
return indexSurvives;
}
public void setWillComplete(boolean willComplete) {
this.willComplete = willComplete;
}
public long getExecuteCount() {
return executeCount;
}
@Override
public void performAction(Index index, Listener listener) {
executeCount++;
if (exception == null) {
listener.onResponse(willComplete);
} else {
listener.onFailure(exception);
}
}
}
private static class MockAsyncWaitStep extends AsyncWaitStep {
private Exception exception;
private boolean willComplete;
private long executeCount = 0;
public MockAsyncWaitStep(StepKey key, StepKey nextStepKey) {
super(key, nextStepKey, null);
}
public void setException(Exception exception) {
this.exception = exception;
}
public void setWillComplete(boolean willComplete) {
this.willComplete = willComplete;
}
public long getExecuteCount() {
return executeCount;
}
@Override
public void evaluateCondition(Index index, Listener listener) {
executeCount++;
if (exception == null) {
listener.onResponse(willComplete);
} else {
listener.onFailure(exception);
}
}
}
private static class MockClusterStateActionStep extends ClusterStateActionStep {
private Exception exception;
private boolean willComplete;
private long executeCount = 0;
public MockClusterStateActionStep(StepKey key, StepKey nextStepKey) {
super(key, nextStepKey);
}
public void setException(Exception exception) {
this.exception = exception;
}
public void setWillComplete(boolean willComplete) {
this.willComplete = willComplete;
}
public long getExecuteCount() {
return executeCount;
}
@Override
public ClusterState performAction(Index index, ClusterState clusterState) {
return null;
}
}
private static class MockClusterStateWaitStep extends ClusterStateWaitStep {
private Exception exception;
private boolean willComplete;
private long executeCount = 0;
public MockClusterStateWaitStep(StepKey key, StepKey nextStepKey) {
super(key, nextStepKey);
}
public void setException(Exception exception) {
this.exception = exception;
}
public void setWillComplete(boolean willComplete) {
this.willComplete = willComplete;
}
public long getExecuteCount() {
return executeCount;
}
@Override
public boolean isConditionMet(Index index, ClusterState clusterState) {
return false;
}
}
private static class MoveToNextStepUpdateTaskMatcher extends ArgumentMatcher<MoveToNextStepUpdateTask> {
private Index index;
private String policy;
private StepKey currentStepKey;
private StepKey nextStepKey;
public MoveToNextStepUpdateTaskMatcher(Index index, String policy, StepKey currentStepKey, StepKey nextStepKey) {
this.index = index;
this.policy = policy;
this.currentStepKey = currentStepKey;
this.nextStepKey = nextStepKey;
}
@Override
public boolean matches(Object argument) {
if (argument == null || argument instanceof MoveToNextStepUpdateTask == false) {
return false;
}
MoveToNextStepUpdateTask task = (MoveToNextStepUpdateTask) argument;
return Objects.equals(index, task.getIndex()) &&
Objects.equals(policy, task.getPolicy()) &&
Objects.equals(currentStepKey, task.getCurrentStepKey()) &&
Objects.equals(nextStepKey, task.getNextStepKey());
}
}
private static class ExecuteStepsUpdateTaskMatcher extends ArgumentMatcher<ExecuteStepsUpdateTask> {
private Index index;
private String policy;
private Step startStep;
public ExecuteStepsUpdateTaskMatcher(Index index, String policy, Step startStep) {
this.index = index;
this.policy = policy;
this.startStep = startStep;
}
@Override
public boolean matches(Object argument) {
if (argument == null || argument instanceof ExecuteStepsUpdateTask == false) {
return false;
}
ExecuteStepsUpdateTask task = (ExecuteStepsUpdateTask) argument;
return Objects.equals(index, task.getIndex()) &&
Objects.equals(policy, task.getPolicy()) &&
Objects.equals(startStep, task.getStartStep());
}
}
}