update cluster-state task execution to halt on new phase (#32886)
As we migrate to a per-phase execution model, we need to prepare our cluster-state-step execution model to be aligned. It is the case that the final iteration into the next "currentStep" from the next phase would not be available in the registry yet. This change exits the execution loop early as to not jump into executing the next phase's steps before the registry is properly updated
This commit is contained in:
parent
4baa721459
commit
33522d4fb4
|
@ -50,6 +50,16 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
|
|||
}
|
||||
|
||||
|
||||
/**
|
||||
* {@link Step}s for the current index and policy are executed in succession until the next step to be
|
||||
* executed is not a {@link ClusterStateActionStep}, or not a {@link ClusterStateWaitStep}, or does not
|
||||
* belong to the same phase as the executed step. All other types of steps are executed outside of this
|
||||
* {@link ClusterStateUpdateTask}, so they are of no concern here.
|
||||
*
|
||||
* @param currentState The current state to execute the <code>startStep</code> with
|
||||
* @return the new cluster state after cluster-state operations and step transitions are applied
|
||||
* @throws IOException if any exceptions occur
|
||||
*/
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws IOException {
|
||||
Step currentStep = startStep;
|
||||
|
@ -94,6 +104,9 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (currentStep.getKey().getPhase().equals(currentStep.getNextStepKey().getPhase()) == false) {
|
||||
return currentState;
|
||||
}
|
||||
currentStep = policyStepsRegistry.getStep(policy, currentStep.getNextStepKey());
|
||||
}
|
||||
return currentState;
|
||||
|
|
|
@ -121,35 +121,32 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase {
|
|||
policyStepsRegistry.update(lifecycleMetadata, client, () -> 0L);
|
||||
}
|
||||
|
||||
public void testExecuteAllUntilEndOfPolicy() throws IOException {
|
||||
public void testExecuteAllUntilEndOfPhase() throws IOException {
|
||||
Step startStep = policyStepsRegistry.getFirstStep(allClusterPolicyName);
|
||||
Step afterStep = policyStepsRegistry.getStep(allClusterPolicyName, startStep.getNextStepKey());
|
||||
long now = randomNonNegativeLong();
|
||||
// test execute start till end of phase `new`
|
||||
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(allClusterPolicyName, index, startStep, policyStepsRegistry, () -> now);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(newState.metaData().index(index).getSettings());
|
||||
assertThat(currentStepKey, equalTo(afterStep.getKey()));
|
||||
// test execute phase-after until firstStep
|
||||
task = new ExecuteStepsUpdateTask(allClusterPolicyName, index, afterStep, policyStepsRegistry, () -> now);
|
||||
newState = task.execute(newState);
|
||||
currentStepKey = IndexLifecycleRunner.getCurrentStepKey(newState.metaData().index(index).getSettings());
|
||||
assertThat(currentStepKey, equalTo(firstStep.getKey()));
|
||||
// test execute all actions in same phase
|
||||
task = new ExecuteStepsUpdateTask(allClusterPolicyName, index, firstStep, policyStepsRegistry, () -> now);
|
||||
newState = task.execute(newState);
|
||||
assertThat(firstStep.getExecuteCount(), equalTo(1L));
|
||||
assertThat(allClusterSecondStep.getExecuteCount(), equalTo(1L));
|
||||
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(newState.metaData().index(index).getSettings());
|
||||
currentStepKey = IndexLifecycleRunner.getCurrentStepKey(newState.metaData().index(index).getSettings());
|
||||
assertThat(currentStepKey, equalTo(TerminalPolicyStep.KEY));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_STEP_INFO_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(""));
|
||||
}
|
||||
|
||||
public void testExecuteMoveToNextActionStep() throws IOException {
|
||||
secondStep.setWillComplete(false);
|
||||
Step startStep = policyStepsRegistry.getFirstStep(mixedPolicyName);
|
||||
long now = randomNonNegativeLong();
|
||||
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
assertThat(firstStep.getExecuteCount(), equalTo(1L));
|
||||
assertThat(secondStep.getExecuteCount(), equalTo(1L));
|
||||
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(newState.metaData().index(index).getSettings());
|
||||
assertThat(currentStepKey, equalTo(secondStepKey));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_STEP_INFO_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(""));
|
||||
}
|
||||
|
||||
public void testNeverExecuteNonClusterStateStep() throws IOException {
|
||||
setStateToKey(thirdStepKey);
|
||||
Step startStep = policyStepsRegistry.getStep(mixedPolicyName, thirdStepKey);
|
||||
|
|
Loading…
Reference in New Issue