Adds phase and action change times on state transition
This commit is contained in:
parent
d17886f1b0
commit
bf3db3f1f7
|
@ -14,18 +14,23 @@ import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep;
|
|||
import org.elasticsearch.xpack.core.indexlifecycle.InitializePolicyContextStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
||||
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
|
||||
private static final Logger logger = ESLoggerFactory.getLogger(ExecuteStepsUpdateTask.class);
|
||||
private final String policy;
|
||||
private final Index index;
|
||||
private final Step startStep;
|
||||
private final PolicyStepsRegistry policyStepsRegistry;
|
||||
private LongSupplier nowSupplier;
|
||||
|
||||
public ExecuteStepsUpdateTask(String policy, Index index, Step startStep, PolicyStepsRegistry policyStepsRegistry) {
|
||||
public ExecuteStepsUpdateTask(String policy, Index index, Step startStep, PolicyStepsRegistry policyStepsRegistry,
|
||||
LongSupplier nowSupplier) {
|
||||
this.policy = policy;
|
||||
this.index = index;
|
||||
this.startStep = startStep;
|
||||
this.policyStepsRegistry = policyStepsRegistry;
|
||||
this.nowSupplier = nowSupplier;
|
||||
}
|
||||
|
||||
String getPolicy() {
|
||||
|
@ -59,7 +64,8 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
|
|||
if (currentStep.getNextStepKey() == null) {
|
||||
return currentState;
|
||||
}
|
||||
currentState = IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStep.getNextStepKey());
|
||||
currentState = IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStep.getKey(),
|
||||
currentStep.getNextStepKey(), nowSupplier);
|
||||
} else {
|
||||
// cluster state wait step so evaluate the
|
||||
// condition, if the condition is met move to the
|
||||
|
@ -72,7 +78,8 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
|
|||
if (currentStep.getNextStepKey() == null) {
|
||||
return currentState;
|
||||
}
|
||||
currentState = IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStep.getNextStepKey());
|
||||
currentState = IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStep.getKey(),
|
||||
currentStep.getNextStepKey(), nowSupplier);
|
||||
} else {
|
||||
logger.warn("condition not met, returning existing state");
|
||||
return currentState;
|
||||
|
|
|
@ -24,14 +24,18 @@ import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
|||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep;
|
||||
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
public class IndexLifecycleRunner {
|
||||
private static final Logger logger = ESLoggerFactory.getLogger(IndexLifecycleRunner.class);
|
||||
private PolicyStepsRegistry stepRegistry;
|
||||
private ClusterService clusterService;
|
||||
private LongSupplier nowSupplier;
|
||||
|
||||
public IndexLifecycleRunner(PolicyStepsRegistry stepRegistry, ClusterService clusterService) {
|
||||
public IndexLifecycleRunner(PolicyStepsRegistry stepRegistry, ClusterService clusterService, LongSupplier nowSupplier) {
|
||||
this.stepRegistry = stepRegistry;
|
||||
this.clusterService = clusterService;
|
||||
this.nowSupplier = nowSupplier;
|
||||
}
|
||||
|
||||
public void runPolicy(String policy, IndexMetaData indexMetaData, Settings indexSettings, boolean fromClusterStateChange) {
|
||||
|
@ -92,7 +96,7 @@ public class IndexLifecycleRunner {
|
|||
|
||||
private void executeClusterStateSteps(Index index, String policy, Step step) {
|
||||
assert step instanceof InitializePolicyContextStep || step instanceof ClusterStateWaitStep;
|
||||
clusterService.submitStateUpdateTask("ILM", new ExecuteStepsUpdateTask(policy, index, step, stepRegistry));
|
||||
clusterService.submitStateUpdateTask("ILM", new ExecuteStepsUpdateTask(policy, index, step, stepRegistry, nowSupplier));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -128,11 +132,18 @@ public class IndexLifecycleRunner {
|
|||
}
|
||||
}
|
||||
|
||||
static ClusterState moveClusterStateToNextStep(Index index, ClusterState clusterState, StepKey nextStep) {
|
||||
static ClusterState moveClusterStateToNextStep(Index index, ClusterState clusterState, StepKey currentStep, StepKey nextStep,
|
||||
LongSupplier nowSupplier) {
|
||||
ClusterState.Builder newClusterStateBuilder = ClusterState.builder(clusterState);
|
||||
IndexMetaData idxMeta = clusterState.getMetaData().index(index);
|
||||
Builder indexSettings = Settings.builder().put(idxMeta.getSettings()).put(LifecycleSettings.LIFECYCLE_PHASE, nextStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, nextStep.getAction()).put(LifecycleSettings.LIFECYCLE_STEP, nextStep.getName());
|
||||
if (currentStep.getPhase().equals(nextStep.getPhase()) == false) {
|
||||
indexSettings.put(LifecycleSettings.LIFECYCLE_PHASE_TIME, nowSupplier.getAsLong());
|
||||
}
|
||||
if (currentStep.getAction().equals(nextStep.getAction()) == false) {
|
||||
indexSettings.put(LifecycleSettings.LIFECYCLE_ACTION_TIME, nowSupplier.getAsLong());
|
||||
}
|
||||
newClusterStateBuilder.metaData(MetaData.builder(clusterState.getMetaData()).put(IndexMetaData
|
||||
.builder(clusterState.getMetaData().index(index))
|
||||
.settings(indexSettings)));
|
||||
|
@ -143,6 +154,6 @@ public class IndexLifecycleRunner {
|
|||
logger.error("moveToStep[" + policy + "] [" + index.getName() + "]" + currentStepKey + " -> "
|
||||
+ nextStepKey);
|
||||
clusterService.submitStateUpdateTask("ILM", new MoveToNextStepUpdateTask(index, policy, currentStepKey,
|
||||
nextStepKey, newState -> runPolicy(newState.getMetaData().index(index))));
|
||||
nextStepKey, nowSupplier, newState -> runPolicy(newState.getMetaData().index(index))));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,7 +56,7 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
this.nowSupplier = nowSupplier;
|
||||
this.scheduledJob = null;
|
||||
this.policyRegistry = new PolicyStepsRegistry();
|
||||
this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, clusterService);
|
||||
this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, clusterService, nowSupplier);
|
||||
clusterService.addListener(this);
|
||||
}
|
||||
|
||||
|
|
|
@ -12,19 +12,23 @@ import org.elasticsearch.index.Index;
|
|||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
||||
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
public class MoveToNextStepUpdateTask extends ClusterStateUpdateTask {
|
||||
private final Index index;
|
||||
private final String policy;
|
||||
private final Step.StepKey currentStepKey;
|
||||
private final Step.StepKey nextStepKey;
|
||||
private final Listener listener;
|
||||
private LongSupplier nowSupplier;
|
||||
|
||||
public MoveToNextStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey, Step.StepKey nextStepKey,
|
||||
Listener listener) {
|
||||
LongSupplier nowSupplier, Listener listener) {
|
||||
this.index = index;
|
||||
this.policy = policy;
|
||||
this.currentStepKey = currentStepKey;
|
||||
this.nextStepKey = nextStepKey;
|
||||
this.nowSupplier = nowSupplier;
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
|
@ -49,7 +53,7 @@ public class MoveToNextStepUpdateTask extends ClusterStateUpdateTask {
|
|||
Settings indexSettings = currentState.getMetaData().index(index).getSettings();
|
||||
if (policy.equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings))
|
||||
&& currentStepKey.equals(IndexLifecycleRunner.getCurrentStepKey(indexSettings))) {
|
||||
return IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, nextStepKey);
|
||||
return IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStepKey, nextStepKey, nowSupplier);
|
||||
} else {
|
||||
// either the policy has changed or the step is now
|
||||
// not the same as when we submitted the update task. In
|
||||
|
|
|
@ -27,10 +27,8 @@ import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
|||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.TestLifecycleType;
|
||||
import static org.elasticsearch.xpack.indexlifecycle.IndexLifecycleRunnerTests.MockInitializePolicyContextStep;
|
||||
import static org.elasticsearch.xpack.indexlifecycle.IndexLifecycleRunnerTests.MockClusterStateWaitStep;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleRunnerTests.MockClusterStateWaitStep;
|
||||
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleRunnerTests.MockInitializePolicyContextStep;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
@ -38,6 +36,9 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
public class ExecuteStepsUpdateTaskTests extends ESTestCase {
|
||||
|
||||
private static final StepKey firstStepKey = new StepKey("phase_1", "action_1", "step_1");
|
||||
|
@ -103,53 +104,66 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase {
|
|||
|
||||
public void testExecuteAllUntilEndOfPolicy() {
|
||||
Step startStep = policyStepsRegistry.getFirstStep(allClusterPolicyName);
|
||||
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(allClusterPolicyName, index, startStep, policyStepsRegistry);
|
||||
long now = randomNonNegativeLong();
|
||||
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(allClusterPolicyName, index, startStep, policyStepsRegistry, () -> now);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
assertThat(firstStep.getExecuteCount(), equalTo(1L));
|
||||
assertThat(allClusterSecondStep.getExecuteCount(), equalTo(1L));
|
||||
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(newState.metaData().index(index).getSettings());
|
||||
assertThat(currentStepKey, equalTo(TerminalPolicyStep.KEY));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
|
||||
}
|
||||
|
||||
public void testExecuteMoveToNextActionStep() {
|
||||
secondStep.setWillComplete(false);
|
||||
Step startStep = policyStepsRegistry.getFirstStep(mixedPolicyName);
|
||||
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry);
|
||||
long now = randomNonNegativeLong();
|
||||
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
assertThat(firstStep.getExecuteCount(), equalTo(1L));
|
||||
assertThat(secondStep.getExecuteCount(), equalTo(1L));
|
||||
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(newState.metaData().index(index).getSettings());
|
||||
assertThat(currentStepKey, equalTo(secondStepKey));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
|
||||
}
|
||||
|
||||
public void testNeverExecuteNonClusterStateStep() {
|
||||
setStateToKey(thirdStepKey);
|
||||
Step startStep = policyStepsRegistry.getStep(mixedPolicyName, thirdStepKey);
|
||||
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry);
|
||||
assertThat(task.execute(clusterState), equalTo(clusterState));
|
||||
long now = randomNonNegativeLong();
|
||||
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now);
|
||||
assertThat(task.execute(clusterState), sameInstance(clusterState));
|
||||
}
|
||||
|
||||
public void testExecuteUntilFirstNonClusterStateStep() {
|
||||
setStateToKey(secondStepKey);
|
||||
Step startStep = policyStepsRegistry.getStep(mixedPolicyName, secondStepKey);
|
||||
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry);
|
||||
long now = randomNonNegativeLong();
|
||||
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(newState.metaData().index(index).getSettings());
|
||||
assertThat(currentStepKey, equalTo(thirdStepKey));
|
||||
assertThat(firstStep.getExecuteCount(), equalTo(0L));
|
||||
assertThat(secondStep.getExecuteCount(), equalTo(1L));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(-1L));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(-1L));
|
||||
}
|
||||
|
||||
public void testExecuteIncompleteWaitStep() {
|
||||
secondStep.setWillComplete(false);
|
||||
setStateToKey(secondStepKey);
|
||||
Step startStep = policyStepsRegistry.getStep(mixedPolicyName, secondStepKey);
|
||||
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry);
|
||||
long now = randomNonNegativeLong();
|
||||
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(newState.metaData().index(index).getSettings());
|
||||
assertThat(currentStepKey, equalTo(secondStepKey));
|
||||
assertThat(firstStep.getExecuteCount(), equalTo(0L));
|
||||
assertThat(secondStep.getExecuteCount(), equalTo(1L));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(-1L));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(-1L));
|
||||
}
|
||||
|
||||
private void setStateToKey(StepKey stepKey) {
|
||||
|
|
|
@ -50,7 +50,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
TerminalPolicyStep step = TerminalPolicyStep.INSTANCE;
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
@ -66,7 +66,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
MockInitializePolicyContextStep step = new MockInitializePolicyContextStep(stepKey, null);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
@ -85,7 +85,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
step.setWillComplete(true);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
@ -104,7 +104,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
step.setWillComplete(true);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
@ -125,7 +125,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
step.setIndexSurvives(false);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
@ -143,7 +143,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
step.setWillComplete(false);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
@ -162,7 +162,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
step.setException(expectedException);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
@ -183,7 +183,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
step.setException(expectedException);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
@ -201,7 +201,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
step.setWillComplete(true);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
@ -221,7 +221,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
step.setWillComplete(false);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
@ -240,7 +240,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
step.setException(expectedException);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
@ -261,7 +261,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
step.setException(expectedException);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
@ -278,7 +278,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
MockStep step = new MockStep(stepKey, null);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
Settings indexSettings = Settings.builder().build();
|
||||
|
@ -462,18 +462,65 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
|
||||
public void testMoveClusterStateToNextStep() {
|
||||
String indexName = "my_index";
|
||||
StepKey currentStep = new StepKey("current_phase", "current_action", "current_step");
|
||||
StepKey nextStep = new StepKey("next_phase", "next_action", "next_step");
|
||||
long now = randomNonNegativeLong();
|
||||
|
||||
ClusterState clusterState = buildClusterState(indexName, Settings.builder());
|
||||
Index index = clusterState.metaData().index(indexName).getIndex();
|
||||
ClusterState newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, nextStep);
|
||||
assertClusterStateOnNextStep(clusterState, index, nextStep, newClusterState);
|
||||
ClusterState newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep,
|
||||
() -> now);
|
||||
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
||||
|
||||
clusterState = buildClusterState(indexName, Settings.builder().put(LifecycleSettings.LIFECYCLE_PHASE, "current_phase")
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, "current_action").put(LifecycleSettings.LIFECYCLE_STEP, "current_step"));
|
||||
clusterState = buildClusterState(indexName,
|
||||
Settings.builder().put(LifecycleSettings.LIFECYCLE_PHASE, currentStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, currentStep.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, currentStep.getName()));
|
||||
index = clusterState.metaData().index(indexName).getIndex();
|
||||
newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, nextStep);
|
||||
assertClusterStateOnNextStep(clusterState, index, nextStep, newClusterState);
|
||||
newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep, () -> now);
|
||||
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
||||
}
|
||||
|
||||
public void testMoveClusterStateToNextStepSamePhase() {
|
||||
String indexName = "my_index";
|
||||
StepKey currentStep = new StepKey("current_phase", "current_action", "current_step");
|
||||
StepKey nextStep = new StepKey("current_phase", "next_action", "next_step");
|
||||
long now = randomNonNegativeLong();
|
||||
|
||||
ClusterState clusterState = buildClusterState(indexName, Settings.builder());
|
||||
Index index = clusterState.metaData().index(indexName).getIndex();
|
||||
ClusterState newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep,
|
||||
() -> now);
|
||||
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
||||
|
||||
clusterState = buildClusterState(indexName,
|
||||
Settings.builder().put(LifecycleSettings.LIFECYCLE_PHASE, currentStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, currentStep.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, currentStep.getName()));
|
||||
index = clusterState.metaData().index(indexName).getIndex();
|
||||
newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep, () -> now);
|
||||
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
||||
}
|
||||
|
||||
public void testMoveClusterStateToNextStepSameAction() {
|
||||
String indexName = "my_index";
|
||||
StepKey currentStep = new StepKey("current_phase", "current_action", "current_step");
|
||||
StepKey nextStep = new StepKey("current_phase", "current_action", "next_step");
|
||||
long now = randomNonNegativeLong();
|
||||
|
||||
ClusterState clusterState = buildClusterState(indexName, Settings.builder());
|
||||
Index index = clusterState.metaData().index(indexName).getIndex();
|
||||
ClusterState newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep,
|
||||
() -> now);
|
||||
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
||||
|
||||
clusterState = buildClusterState(indexName,
|
||||
Settings.builder().put(LifecycleSettings.LIFECYCLE_PHASE, currentStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, currentStep.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, currentStep.getName()));
|
||||
index = clusterState.metaData().index(indexName).getIndex();
|
||||
newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep, () -> now);
|
||||
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
||||
}
|
||||
|
||||
private ClusterState buildClusterState(String indexName, Settings.Builder indexSettingsBuilder) {
|
||||
|
@ -485,7 +532,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
return ClusterState.builder(new ClusterName("my_cluster")).metaData(metadata).build();
|
||||
}
|
||||
|
||||
private void assertClusterStateOnNextStep(ClusterState oldClusterState, Index index, StepKey nextStep, ClusterState newClusterState) {
|
||||
private void assertClusterStateOnNextStep(ClusterState oldClusterState, Index index, StepKey currentStep, StepKey nextStep, ClusterState newClusterState, long now) {
|
||||
assertNotSame(oldClusterState, newClusterState);
|
||||
MetaData newMetadata = newClusterState.metaData();
|
||||
assertNotSame(oldClusterState.metaData(), newMetadata);
|
||||
|
@ -496,6 +543,18 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
assertEquals(nextStep.getPhase(), LifecycleSettings.LIFECYCLE_PHASE_SETTING.get(newIndexSettings));
|
||||
assertEquals(nextStep.getAction(), LifecycleSettings.LIFECYCLE_ACTION_SETTING.get(newIndexSettings));
|
||||
assertEquals(nextStep.getName(), LifecycleSettings.LIFECYCLE_STEP_SETTING.get(newIndexSettings));
|
||||
if (currentStep.getPhase().equals(nextStep.getPhase())) {
|
||||
assertEquals(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(oldClusterState.metaData().index(index).getSettings()),
|
||||
LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newIndexSettings));
|
||||
} else {
|
||||
assertEquals(now, (long) LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newIndexSettings));
|
||||
}
|
||||
if (currentStep.getAction().equals(nextStep.getAction())) {
|
||||
assertEquals(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(oldClusterState.metaData().index(index).getSettings()),
|
||||
LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newIndexSettings));
|
||||
} else {
|
||||
assertEquals(now, (long) LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newIndexSettings));
|
||||
}
|
||||
}
|
||||
|
||||
private static class MockAsyncActionStep extends AsyncActionStep {
|
||||
|
|
|
@ -19,6 +19,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
|||
import org.junit.Before;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
public class MoveToNextStepUpdateTaskTests extends ESTestCase {
|
||||
|
||||
|
@ -44,15 +45,18 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase {
|
|||
public void testExecuteSuccessfullyMoved() {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
StepKey nextStepKey = new StepKey("next-phase", "next-action", "next-name");
|
||||
long now = randomNonNegativeLong();
|
||||
|
||||
setStateToKey(currentStepKey);
|
||||
|
||||
SetOnce<Boolean> changed = new SetOnce<>();
|
||||
MoveToNextStepUpdateTask.Listener listener = (c) -> changed.set(true);
|
||||
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, listener);
|
||||
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, () -> now, listener);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
StepKey actualKey = IndexLifecycleRunner.getCurrentStepKey(newState.metaData().index(index).getSettings());
|
||||
assertThat(actualKey, equalTo(nextStepKey));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
|
||||
task.clusterStateProcessed("source", clusterState, newState);
|
||||
assertTrue(changed.get());
|
||||
}
|
||||
|
@ -60,6 +64,7 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase {
|
|||
public void testExecuteNoop() {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
StepKey notCurrentStepKey = new StepKey("not-current", "not-current", "not-current");
|
||||
long now = randomNonNegativeLong();
|
||||
setStateToKey(currentStepKey);
|
||||
if (randomBoolean()) {
|
||||
setStateToKey(notCurrentStepKey);
|
||||
|
@ -67,17 +72,18 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase {
|
|||
setStatePolicy("not-" + policy);
|
||||
}
|
||||
MoveToNextStepUpdateTask.Listener listener = (c) -> {};
|
||||
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, listener);
|
||||
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now, listener);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
assertThat(newState, equalTo(clusterState));
|
||||
assertThat(newState, sameInstance(clusterState));
|
||||
}
|
||||
|
||||
public void testClusterProcessedWithNoChange() {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
long now = randomNonNegativeLong();
|
||||
setStateToKey(currentStepKey);
|
||||
SetOnce<Boolean> changed = new SetOnce<>();
|
||||
MoveToNextStepUpdateTask.Listener listener = (c) -> changed.set(true);
|
||||
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, listener);
|
||||
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now, listener);
|
||||
task.clusterStateProcessed("source", clusterState, clusterState);
|
||||
assertNull(changed.get());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue