Remove unneeded listener on MoveToNextStepUpdateTask (#33725)

There was a listener that re-runs the policy with the new state when the cluster
state is processed by the `MoveToNextStepUpdateTask`. This removes this listener
as we will execute the policy through the `IndexLifecyleService` cluster state
listener.
This commit is contained in:
Lee Hinman 2018-09-14 14:38:23 -06:00 committed by GitHub
parent 8e59de3eb2
commit 1f048d3d3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 7 additions and 45 deletions

View File

@ -364,7 +364,7 @@ public class IndexLifecycleRunner {
logger.debug("moveToStep[" + policy + "] [" + index.getName() + "]" + currentStepKey + " -> "
+ nextStepKey);
clusterService.submitStateUpdateTask("ILM", new MoveToNextStepUpdateTask(index, policy, currentStepKey,
nextStepKey, nowSupplier, newState -> runPolicy(newState.getMetaData().index(index), newState)));
nextStepKey, nowSupplier));
}
private void moveToErrorStep(Index index, String policy, StepKey currentStepKey, Exception e) {

View File

@ -21,17 +21,15 @@ public class MoveToNextStepUpdateTask extends ClusterStateUpdateTask {
private final String policy;
private final Step.StepKey currentStepKey;
private final Step.StepKey nextStepKey;
private final Listener listener;
private final LongSupplier nowSupplier;
public MoveToNextStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey, Step.StepKey nextStepKey,
LongSupplier nowSupplier, Listener listener) {
LongSupplier nowSupplier) {
this.index = index;
this.policy = policy;
this.currentStepKey = currentStepKey;
this.nextStepKey = nextStepKey;
this.nowSupplier = nowSupplier;
this.listener = listener;
}
Index getIndex() {
@ -69,24 +67,9 @@ public class MoveToNextStepUpdateTask extends ClusterStateUpdateTask {
}
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
// if the new cluster state is different from the old one then
// we moved to the new step in the execute method so we should
// execute the next step
if (oldState != newState) {
listener.onClusterStateProcessed(newState);
}
}
@Override
public void onFailure(String source, Exception e) {
throw new ElasticsearchException("policy [" + policy + "] for index [" + index.getName() + "] failed trying to move from step ["
+ currentStepKey + "] to step [" + nextStepKey + "].", e);
}
@FunctionalInterface
public interface Listener {
void onClusterStateProcessed(ClusterState clusterState);
}
}

View File

@ -66,9 +66,7 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase {
setStateToKey(currentStepKey, now);
SetOnce<Boolean> changed = new SetOnce<>();
MoveToNextStepUpdateTask.Listener listener = (c) -> changed.set(true);
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, () -> now, listener);
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, () -> now);
ClusterState newState = task.execute(clusterState);
StepKey actualKey = IndexLifecycleRunner.getCurrentStepKey(newState.metaData().index(index).getSettings());
assertThat(actualKey, equalTo(nextStepKey));
@ -76,7 +74,6 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase {
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
assertThat(LifecycleSettings.LIFECYCLE_STEP_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
task.clusterStateProcessed("source", clusterState, newState);
assertTrue(changed.get());
}
public void testExecuteDifferentCurrentStep() {
@ -84,9 +81,7 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase {
StepKey notCurrentStepKey = new StepKey("not-current", "not-current", "not-current");
long now = randomNonNegativeLong();
setStateToKey(notCurrentStepKey, now);
MoveToNextStepUpdateTask.Listener listener = (c) -> {
};
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now, listener);
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now);
ClusterState newState = task.execute(clusterState);
assertSame(newState, clusterState);
}
@ -96,8 +91,7 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase {
long now = randomNonNegativeLong();
setStateToKey(currentStepKey, now);
setStatePolicy("not-" + policy);
MoveToNextStepUpdateTask.Listener listener = (c) -> {};
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now, listener);
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now);
ClusterState newState = task.execute(clusterState);
assertSame(newState, clusterState);
}
@ -111,8 +105,7 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase {
setStateToKey(currentStepKey, now);
SetOnce<Boolean> changed = new SetOnce<>();
MoveToNextStepUpdateTask.Listener listener = (c) -> changed.set(true);
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, invalidNextStep, () -> now, listener);
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, invalidNextStep, () -> now);
ClusterState newState = task.execute(clusterState);
StepKey actualKey = IndexLifecycleRunner.getCurrentStepKey(newState.metaData().index(index).getSettings());
assertThat(actualKey, equalTo(invalidNextStep));
@ -120,18 +113,6 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase {
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
assertThat(LifecycleSettings.LIFECYCLE_STEP_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
task.clusterStateProcessed("source", clusterState, newState);
assertTrue(changed.get());
}
public void testClusterProcessedWithNoChange() {
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
long now = randomNonNegativeLong();
setStateToKey(currentStepKey, now);
SetOnce<Boolean> changed = new SetOnce<>();
MoveToNextStepUpdateTask.Listener listener = (c) -> changed.set(true);
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now, listener);
task.clusterStateProcessed("source", clusterState, clusterState);
assertNull(changed.get());
}
public void testOnFailure() {
@ -141,9 +122,7 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase {
setStateToKey(currentStepKey, now);
SetOnce<Boolean> changed = new SetOnce<>();
MoveToNextStepUpdateTask.Listener listener = (c) -> changed.set(true);
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, () -> now, listener);
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, () -> now);
Exception expectedException = new RuntimeException();
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> task.onFailure(randomAlphaOfLength(10), expectedException));