replaces cause with boolean
This commit is contained in:
parent
ed260a0430
commit
05c2c5655b
|
@ -33,44 +33,48 @@ public class IndexLifecycleRunner {
|
|||
this.clusterService = clusterService;
|
||||
}
|
||||
|
||||
public void runPolicy(String policy, Index index, Settings indexSettings) {
|
||||
public void runPolicy(String policy, Index index, Settings indexSettings, boolean fromClusterStateChange) {
|
||||
Step currentStep = getCurrentStep(stepRegistry, policy, indexSettings);
|
||||
logger.warn("running policy with current-step[" + currentStep.getKey() + "]");
|
||||
if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) {
|
||||
executeClusterStateSteps(index, policy, currentStep);
|
||||
executeClusterStateSteps(index, policy, currentStep);
|
||||
} else if (currentStep instanceof AsyncWaitStep) {
|
||||
((AsyncWaitStep) currentStep).evaluateCondition(index, new AsyncWaitStep.Listener() {
|
||||
|
||||
@Override
|
||||
public void onResponse(boolean conditionMet) {
|
||||
logger.error("cs-change-async-wait-callback. current-step:" + currentStep.getKey());
|
||||
if (conditionMet) {
|
||||
moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey());
|
||||
if (fromClusterStateChange == false) {
|
||||
((AsyncWaitStep) currentStep).evaluateCondition(index, new AsyncWaitStep.Listener() {
|
||||
|
||||
@Override
|
||||
public void onResponse(boolean conditionMet) {
|
||||
logger.error("cs-change-async-wait-callback. current-step:" + currentStep.getKey());
|
||||
if (conditionMet) {
|
||||
moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new RuntimeException(e); // NORELEASE implement error handling
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new RuntimeException(e); // NORELEASE implement error handling
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
} else if (currentStep instanceof AsyncActionStep) {
|
||||
((AsyncActionStep) currentStep).performAction(index, new AsyncActionStep.Listener() {
|
||||
|
||||
@Override
|
||||
public void onResponse(boolean complete) {
|
||||
logger.error("cs-change-async-action-callback. current-step:" + currentStep.getKey());
|
||||
if (complete && ((AsyncActionStep) currentStep).indexSurvives()) {
|
||||
moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey());
|
||||
if (fromClusterStateChange == false) {
|
||||
((AsyncActionStep) currentStep).performAction(index, new AsyncActionStep.Listener() {
|
||||
|
||||
@Override
|
||||
public void onResponse(boolean complete) {
|
||||
logger.error("cs-change-async-action-callback. current-step:" + currentStep.getKey());
|
||||
if (complete && ((AsyncActionStep) currentStep).indexSurvives()) {
|
||||
moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new RuntimeException(e); // NORELEASE implement error handling
|
||||
}
|
||||
});
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new RuntimeException(e); // NORELEASE implement error handling
|
||||
}
|
||||
});
|
||||
}
|
||||
} else {
|
||||
throw new IllegalStateException(
|
||||
"Step with key [" + currentStep.getKey() + "] is not a recognised type: [" + currentStep.getClass().getName() + "]");
|
||||
|
@ -81,7 +85,7 @@ public class IndexLifecycleRunner {
|
|||
IndexMetaData indexMetaData = clusterState.getMetaData().index(index);
|
||||
Settings indexSettings = indexMetaData.getSettings();
|
||||
String policy = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings);
|
||||
runPolicy(policy, index, indexSettings);
|
||||
runPolicy(policy, index, indexSettings, false);
|
||||
}
|
||||
|
||||
private void executeClusterStateSteps(Index index, String policy, Step step) {
|
||||
|
|
|
@ -97,7 +97,7 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
scheduleJob(pollInterval);
|
||||
}
|
||||
|
||||
triggerPolicies(event.state());
|
||||
triggerPolicies(event.state(), true);
|
||||
} else {
|
||||
cancelJob();
|
||||
}
|
||||
|
@ -119,7 +119,7 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
public void triggered(SchedulerEngine.Event event) {
|
||||
if (event.getJobName().equals(IndexLifecycle.NAME)) {
|
||||
logger.info("Job triggered: " + event.getJobName() + ", " + event.getScheduledTime() + ", " + event.getTriggeredTime());
|
||||
triggerPolicies(clusterService.state());
|
||||
triggerPolicies(clusterService.state(), false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -142,14 +142,14 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
}));
|
||||
}
|
||||
|
||||
public void triggerPolicies(ClusterState clusterState) {
|
||||
public void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) {
|
||||
// loop through all indices in cluster state and filter for ones that are
|
||||
// managed by the Index Lifecycle Service they have a index.lifecycle.name setting
|
||||
// associated to a policy
|
||||
clusterState.metaData().indices().valuesIt().forEachRemaining((idxMeta) -> {
|
||||
String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings());
|
||||
if (Strings.isNullOrEmpty(policyName) == false) {
|
||||
lifecycleRunner.runPolicy(policyName, idxMeta.getIndex(), idxMeta.getSettings());
|
||||
lifecycleRunner.runPolicy(policyName, idxMeta.getIndex(), idxMeta.getSettings(), fromClusterStateChange);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
Index index = new Index("my_index", "my_index_id");
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
||||
runner.runPolicy(policyName, index, indexSettings);
|
||||
runner.runPolicy(policyName, index, indexSettings, randomBoolean());
|
||||
|
||||
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
|
||||
Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(index, policyName, step)));
|
||||
|
@ -73,7 +73,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
Index index = new Index("my_index", "my_index_id");
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
||||
runner.runPolicy(policyName, index, indexSettings);
|
||||
runner.runPolicy(policyName, index, indexSettings, randomBoolean());
|
||||
|
||||
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
|
||||
Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(index, policyName, step)));
|
||||
|
@ -91,7 +91,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
Index index = new Index("my_index", "my_index_id");
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
||||
runner.runPolicy(policyName, index, indexSettings);
|
||||
runner.runPolicy(policyName, index, indexSettings, false);
|
||||
|
||||
assertEquals(1, step.getExecuteCount());
|
||||
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
|
||||
|
@ -111,7 +111,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
Index index = new Index("my_index", "my_index_id");
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
||||
runner.runPolicy(policyName, index, indexSettings);
|
||||
runner.runPolicy(policyName, index, indexSettings, false);
|
||||
|
||||
assertEquals(1, step.getExecuteCount());
|
||||
Mockito.verifyZeroInteractions(clusterService);
|
||||
|
@ -128,7 +128,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
Index index = new Index("my_index", "my_index_id");
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
||||
runner.runPolicy(policyName, index, indexSettings);
|
||||
runner.runPolicy(policyName, index, indexSettings, false);
|
||||
|
||||
assertEquals(1, step.getExecuteCount());
|
||||
Mockito.verifyZeroInteractions(clusterService);
|
||||
|
@ -147,13 +147,31 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
Settings indexSettings = Settings.builder().build();
|
||||
|
||||
RuntimeException exception = expectThrows(RuntimeException.class,
|
||||
() -> runner.runPolicy(policyName, index, indexSettings));
|
||||
() -> runner.runPolicy(policyName, index, indexSettings, false));
|
||||
|
||||
assertSame(expectedException, exception.getCause());
|
||||
assertEquals(1, step.getExecuteCount());
|
||||
Mockito.verifyZeroInteractions(clusterService);
|
||||
}
|
||||
|
||||
public void testRunPolicyAsyncActionStepClusterStateChangeIgnored() {
|
||||
String policyName = "async_action_policy";
|
||||
StepKey stepKey = new StepKey("phase", "action", "async_action_step");
|
||||
MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null);
|
||||
Exception expectedException = new RuntimeException();
|
||||
step.setException(expectedException);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
|
||||
Index index = new Index("my_index", "my_index_id");
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
||||
runner.runPolicy(policyName, index, indexSettings, true);
|
||||
|
||||
assertEquals(0, step.getExecuteCount());
|
||||
Mockito.verifyZeroInteractions(clusterService);
|
||||
}
|
||||
|
||||
public void testRunPolicyAsyncWaitStepCompletes() {
|
||||
String policyName = "async_wait_policy";
|
||||
StepKey stepKey = new StepKey("phase", "action", "async_wait_step");
|
||||
|
@ -165,7 +183,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
Index index = new Index("my_index", "my_index_id");
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
||||
runner.runPolicy(policyName, index, indexSettings);
|
||||
runner.runPolicy(policyName, index, indexSettings, false);
|
||||
|
||||
assertEquals(1, step.getExecuteCount());
|
||||
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
|
||||
|
@ -184,7 +202,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
Index index = new Index("my_index", "my_index_id");
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
||||
runner.runPolicy(policyName, index, indexSettings);
|
||||
runner.runPolicy(policyName, index, indexSettings, false);
|
||||
|
||||
assertEquals(1, step.getExecuteCount());
|
||||
Mockito.verifyZeroInteractions(clusterService);
|
||||
|
@ -203,13 +221,31 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
Settings indexSettings = Settings.builder().build();
|
||||
|
||||
RuntimeException exception = expectThrows(RuntimeException.class,
|
||||
() -> runner.runPolicy(policyName, index, indexSettings));
|
||||
() -> runner.runPolicy(policyName, index, indexSettings, false));
|
||||
|
||||
assertSame(expectedException, exception.getCause());
|
||||
assertEquals(1, step.getExecuteCount());
|
||||
Mockito.verifyZeroInteractions(clusterService);
|
||||
}
|
||||
|
||||
public void testRunPolicyAsyncWaitStepClusterStateChangeIgnored() {
|
||||
String policyName = "async_wait_policy";
|
||||
StepKey stepKey = new StepKey("phase", "action", "async_wait_step");
|
||||
MockAsyncWaitStep step = new MockAsyncWaitStep(stepKey, null);
|
||||
Exception expectedException = new RuntimeException();
|
||||
step.setException(expectedException);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
|
||||
Index index = new Index("my_index", "my_index_id");
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
||||
runner.runPolicy(policyName, index, indexSettings, true);
|
||||
|
||||
assertEquals(0, step.getExecuteCount());
|
||||
Mockito.verifyZeroInteractions(clusterService);
|
||||
}
|
||||
|
||||
public void testRunPolicyUnknownStepType() {
|
||||
String policyName = "cluster_state_action_policy";
|
||||
StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step");
|
||||
|
@ -221,7 +257,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
Settings indexSettings = Settings.builder().build();
|
||||
|
||||
IllegalStateException exception = expectThrows(IllegalStateException.class,
|
||||
() -> runner.runPolicy(policyName, index, indexSettings));
|
||||
() -> runner.runPolicy(policyName, index, indexSettings, randomBoolean()));
|
||||
assertEquals("Step with key [" + stepKey + "] is not a recognised type: [" + step.getClass().getName() + "]",
|
||||
exception.getMessage());
|
||||
Mockito.verifyZeroInteractions(clusterService);
|
||||
|
|
Loading…
Reference in New Issue