[7.x] Stop policy on last PhaseCompleteStep instead of Termina… (#51758)

Currently when an ILM policy finishes its execution, the index moves into the `TerminalPolicyStep`,
denoted by a completed/completed/completed phase/action/step lifecycle execution state.

This commit changes the behavior so that the index lifecycle execution state halts at the last
configured phase's `PhaseCompleteStep`, so for instance, if an index were configured with a policy
containing a `hot` and `cold` phase, the index would stop at the `cold/complete/complete`
`PhaseCompleteStep`. This allows an ILM user to update the policy to add any later phases and have
indices configured to use that policy pick up execution at the newly added "later" phase. For
example, if a `delete` phase were added to the policy specified about, the index would then move
from `cold/complete/complete` into the `delete` phase.

Relates to #48431
This commit is contained in:
Lee Hinman 2020-01-31 10:36:41 -07:00 committed by GitHub
parent 3147453600
commit deefc85d60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 157 additions and 145 deletions

View File

@ -174,9 +174,7 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy>
List<Phase> orderedPhases = type.getOrderedPhases(phases);
ListIterator<Phase> phaseIterator = orderedPhases.listIterator(orderedPhases.size());
// final step so that policy can properly update cluster-state with last action completed
steps.add(TerminalPolicyStep.INSTANCE);
Step.StepKey lastStepKey = TerminalPolicyStep.KEY;
Step.StepKey lastStepKey = null;
Phase phase = null;
// add steps for each phase, in reverse
@ -185,7 +183,7 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy>
Phase previousPhase = phaseIterator.previous();
// add `after` step for phase before next
if (phase != null) {
if (previousPhase != null) {
// after step should have the name of the previous phase since the index is still in the
// previous phase until the after condition is reached
Step.StepKey afterStepKey = new Step.StepKey(previousPhase.getName(), PhaseCompleteStep.NAME, PhaseCompleteStep.NAME);
@ -210,13 +208,11 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy>
}
}
if (phase != null) {
// The very first after step is in a phase before the hot phase so call this "new"
Step.StepKey afterStepKey = new Step.StepKey("new", PhaseCompleteStep.NAME, PhaseCompleteStep.NAME);
Step phaseAfterStep = new PhaseCompleteStep(afterStepKey, lastStepKey);
steps.add(phaseAfterStep);
lastStepKey = phaseAfterStep.getKey();
}
// The very first after step is in a phase before the hot phase so call this "new"
Step.StepKey afterStepKey = new Step.StepKey("new", PhaseCompleteStep.NAME, PhaseCompleteStep.NAME);
Step phaseAfterStep = new PhaseCompleteStep(afterStepKey, lastStepKey);
steps.add(phaseAfterStep);
lastStepKey = phaseAfterStep.getKey();
// init step so that policy is guaranteed to have
steps.add(new InitializePolicyContextStep(InitializePolicyContextStep.KEY, lastStepKey));

View File

@ -15,4 +15,8 @@ public class PhaseCompleteStep extends Step {
public PhaseCompleteStep(StepKey key, StepKey nextStepKey) {
super(key, nextStepKey);
}
public static PhaseCompleteStep finalStep(String phase) {
return new PhaseCompleteStep(new StepKey(phase, NAME, NAME), null);
}
}

View File

@ -247,14 +247,14 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
assertThat(steps.size(), equalTo(2));
assertThat(steps.get(0), instanceOf(InitializePolicyContextStep.class));
assertThat(steps.get(0).getKey(), equalTo(new StepKey("new", "init", "init")));
assertThat(steps.get(0).getNextStepKey(), equalTo(TerminalPolicyStep.KEY));
assertSame(steps.get(1), TerminalPolicyStep.INSTANCE);
assertThat(steps.get(0).getNextStepKey(), equalTo(PhaseCompleteStep.finalStep("new").getKey()));
assertThat(steps.get(1), equalTo(PhaseCompleteStep.finalStep("new")));
}
public void testToStepsWithOneStep() {
Client client = mock(Client.class);
MockStep mockStep = new MockStep(
new Step.StepKey("test", "test", "test"), TerminalPolicyStep.KEY);
new Step.StepKey("test", "test", "test"), PhaseCompleteStep.finalStep("test").getKey());
lifecycleName = randomAlphaOfLengthBetween(1, 20);
Map<String, Phase> phases = new LinkedHashMap<>();
@ -264,7 +264,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
phases.put(firstPhase.getName(), firstPhase);
LifecyclePolicy policy = new LifecyclePolicy(TestLifecycleType.INSTANCE, lifecycleName, phases);
StepKey firstStepKey = InitializePolicyContextStep.KEY;
StepKey secondStepKey = new StepKey("new", PhaseCompleteStep.NAME, PhaseCompleteStep.NAME);
StepKey secondStepKey = PhaseCompleteStep.finalStep("new").getKey();
List<Step> steps = policy.toSteps(client);
assertThat(steps.size(), equalTo(4));
assertSame(steps.get(0).getKey(), firstStepKey);
@ -272,13 +272,14 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
assertThat(steps.get(1).getKey(), equalTo(secondStepKey));
assertThat(steps.get(1).getNextStepKey(), equalTo(mockStep.getKey()));
assertThat(steps.get(2).getKey(), equalTo(mockStep.getKey()));
assertThat(steps.get(2).getNextStepKey(), equalTo(TerminalPolicyStep.KEY));
assertSame(steps.get(3), TerminalPolicyStep.INSTANCE);
assertThat(steps.get(2).getNextStepKey(), equalTo(PhaseCompleteStep.finalStep("test").getKey()));
assertThat(steps.get(3), equalTo(PhaseCompleteStep.finalStep("test")));
}
public void testToStepsWithTwoPhases() {
Client client = mock(Client.class);
MockStep secondActionStep = new MockStep(new StepKey("second_phase", "test2", "test"), TerminalPolicyStep.KEY);
MockStep secondActionStep = new MockStep(new StepKey("second_phase", "test2", "test"),
PhaseCompleteStep.finalStep("second_phase").getKey());
MockStep secondAfter = new MockStep(new StepKey("first_phase", PhaseCompleteStep.NAME, PhaseCompleteStep.NAME),
secondActionStep.getKey());
MockStep firstActionAnotherStep = new MockStep(new StepKey("first_phase", "test", "bar"), secondAfter.getKey());
@ -312,7 +313,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
assertThat(steps.get(4).getKey(), equalTo(secondAfter.getKey()));
assertThat(steps.get(4).getNextStepKey(), equalTo(secondAfter.getNextStepKey()));
assertThat(steps.get(5), equalTo(secondActionStep));
assertSame(steps.get(6), TerminalPolicyStep.INSTANCE);
assertThat(steps.get(6), equalTo(PhaseCompleteStep.finalStep("second_phase")));
}
public void testIsActionSafe() {

View File

@ -146,7 +146,8 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
.build());
// start snapshot
request = new Request("PUT", "/_snapshot/repo/snapshot");
String snapName = "snapshot-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
request = new Request("PUT", "/_snapshot/repo/" + snapName);
request.addParameter("wait_for_completion", "false");
request.setJsonEntity("{\"indices\": \"" + indexName + "\"}");
assertOK(client().performRequest(request));
@ -165,14 +166,14 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
// Following index should have the document
assertDocumentExists(client(), indexName, "1");
// ILM should have completed the unfollow
assertILMPolicy(client(), indexName, "unfollow-only", "completed");
assertILMPolicy(client(), indexName, "unfollow-only", "hot", "complete", "complete");
}, 2, TimeUnit.MINUTES);
// assert that snapshot succeeded
assertThat(getSnapshotState("snapshot"), equalTo("SUCCESS"));
assertOK(client().performRequest(new Request("DELETE", "/_snapshot/repo/snapshot")));
assertThat(getSnapshotState(snapName), equalTo("SUCCESS"));
assertOK(client().performRequest(new Request("DELETE", "/_snapshot/repo/" + snapName)));
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest(new Request("GET", "/_snapshot/repo/snapshot")));
() -> client().performRequest(new Request("GET", "/_snapshot/repo/" + snapName)));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404));
}
} else {
@ -344,7 +345,7 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
assertBusy(() -> assertOK(client().performRequest(new Request("HEAD", "/" + shrunkenIndexName + "/_alias/" + indexName))));
// Wait for the index to complete its policy
assertBusy(() -> assertILMPolicy(client(), shrunkenIndexName, policyName, "completed", "completed", "completed"));
assertBusy(() -> assertILMPolicy(client(), shrunkenIndexName, policyName, null, "complete", "complete"));
}
}
@ -391,7 +392,7 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
assertBusy(() -> assertTrue(indexExists(shrunkenIndexName)));
// Wait for the index to complete its policy
assertBusy(() -> assertILMPolicy(client(), shrunkenIndexName, policyName, "completed", "completed", "completed"));
assertBusy(() -> assertILMPolicy(client(), shrunkenIndexName, policyName, null, "complete", "complete"));
}
}
@ -461,8 +462,8 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
assertEquals(RestStatus.OK.getStatus(), shrunkenIndexExistsResponse.getStatusLine().getStatusCode());
// And both of these should now finish their policies
assertILMPolicy(leaderClient, shrunkenIndexName, policyName, "completed");
assertILMPolicy(client(), indexName, policyName, "completed");
assertILMPolicy(leaderClient, shrunkenIndexName, policyName, null, "complete", "complete");
assertILMPolicy(client(), indexName, policyName, "hot", "complete", "complete");
});
}
} else {
@ -542,7 +543,7 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
client().performRequest(new Request("POST", "/_ilm/start"));
// Wait for the policy to be complete
assertBusy(() -> {
assertILMPolicy(client(), followerIndex, policyName, "completed", "completed", "completed");
assertILMPolicy(client(), followerIndex, policyName, "hot", "complete", "complete");
});
// Ensure the "follower" index has successfully unfollowed

View File

@ -20,9 +20,9 @@ import org.elasticsearch.xpack.core.ilm.AllocateAction;
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ilm.Phase;
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep;
import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep;
import java.io.IOException;
@ -113,7 +113,7 @@ public class ChangePolicyforIndexIT extends ESRestTestCase {
assertOK(client().performRequest(request));
// Check the index goes to the warm phase and completes
assertBusy(() -> assertStep(indexName, TerminalPolicyStep.KEY), 30, TimeUnit.SECONDS);
assertBusy(() -> assertStep(indexName, PhaseCompleteStep.finalStep("warm").getKey()), 30, TimeUnit.SECONDS);
// Check index is allocated on integTest-1 and integTest-2 as per policy_2
Request getSettingsRequest = new Request("GET", "/" + indexName + "/_settings");

View File

@ -35,6 +35,7 @@ import org.elasticsearch.xpack.core.ilm.LifecycleAction;
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ilm.Phase;
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
@ -42,7 +43,6 @@ import org.elasticsearch.xpack.core.ilm.ShrinkAction;
import org.elasticsearch.xpack.core.ilm.ShrinkStep;
import org.elasticsearch.xpack.core.ilm.Step;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep;
import org.elasticsearch.xpack.core.ilm.UpdateRolloverLifecycleDateStep;
import org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep;
import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep;
@ -229,7 +229,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
// assert corrected policy is picked up and index is shrunken
assertBusy(() -> assertTrue(indexExists(shrunkenIndex)), 30, TimeUnit.SECONDS);
assertBusy(() -> assertTrue(aliasExists(shrunkenIndex, index)));
assertBusy(() -> assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(TerminalPolicyStep.KEY)));
assertBusy(() -> assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey())));
assertBusy(() -> {
Map<String, Object> settings = getOnlyIndexSettings(shrunkenIndex);
assertThat(settings.get(IndexMetaData.SETTING_NUMBER_OF_SHARDS), equalTo(String.valueOf(expectedFinalShards)));
@ -291,7 +291,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
updatePolicy(originalIndex, policy);
// index document {"foo": "bar"} to trigger rollover
index(client(), originalIndex, "_id", "foo", "bar");
assertBusy(() -> assertEquals(TerminalPolicyStep.KEY, getStepKeyForIndex(originalIndex)));
assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey())));
assertBusy(() -> assertTrue(indexExists(originalIndex)));
assertBusy(() -> assertFalse(indexExists(secondIndex)));
assertBusy(() -> assertEquals("true", getOnlyIndexSettings(originalIndex).get(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE)));
@ -302,10 +302,11 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
String allocateNodeName = "integTest-" + randomFrom(0, 1);
AllocateAction allocateAction = new AllocateAction(null, null, null, singletonMap("_name", allocateNodeName));
createNewSingletonPolicy(randomFrom("warm", "cold"), allocateAction);
String endPhase = randomFrom("warm", "cold");
createNewSingletonPolicy(endPhase, allocateAction);
updatePolicy(index, policy);
assertBusy(() -> {
assertThat(getStepKeyForIndex(index), equalTo(TerminalPolicyStep.KEY));
assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep(endPhase).getKey()));
});
ensureGreen(index);
}
@ -317,11 +318,12 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numReplicas));
AllocateAction allocateAction = new AllocateAction(finalNumReplicas, null, null, null);
createNewSingletonPolicy(randomFrom("warm", "cold"), allocateAction);
String endPhase = randomFrom("warm", "cold");
createNewSingletonPolicy(endPhase, allocateAction);
updatePolicy(index, policy);
assertBusy(() -> {
Map<String, Object> settings = getOnlyIndexSettings(index);
assertThat(getStepKeyForIndex(index), equalTo(TerminalPolicyStep.KEY));
assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep(endPhase).getKey()));
assertThat(settings.get(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey()), equalTo(String.valueOf(finalNumReplicas)));
});
}
@ -453,7 +455,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
updatePolicy(index, policy);
assertBusy(() -> {
Map<String, Object> settings = getOnlyIndexSettings(index);
assertThat(getStepKeyForIndex(index), equalTo(TerminalPolicyStep.KEY));
assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep("warm").getKey()));
assertThat(settings.get(IndexMetaData.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true"));
});
}
@ -487,7 +489,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
updatePolicy(index, policy);
assertBusy(() -> {
assertThat(getStepKeyForIndex(index), equalTo(TerminalPolicyStep.KEY));
assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep("warm").getKey()));
Map<String, Object> settings = getOnlyIndexSettings(index);
assertThat(numSegments.get(), equalTo(1));
assertThat(settings.get(IndexMetaData.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true"));
@ -506,7 +508,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
updatePolicy(index, policy);
assertBusy(() -> assertTrue(indexExists(shrunkenIndex)), 30, TimeUnit.SECONDS);
assertBusy(() -> assertTrue(aliasExists(shrunkenIndex, index)));
assertBusy(() -> assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(TerminalPolicyStep.KEY)));
assertBusy(() -> assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey())));
assertBusy(() -> {
Map<String, Object> settings = getOnlyIndexSettings(shrunkenIndex);
assertThat(settings.get(IndexMetaData.SETTING_NUMBER_OF_SHARDS), equalTo(String.valueOf(expectedFinalShards)));
@ -528,7 +530,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
assertFalse(indexExists(shrunkenIndex));
assertFalse(aliasExists(shrunkenIndex, index));
Map<String, Object> settings = getOnlyIndexSettings(index);
assertThat(getStepKeyForIndex(index), equalTo(TerminalPolicyStep.KEY));
assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep("warm").getKey()));
assertThat(settings.get(IndexMetaData.SETTING_NUMBER_OF_SHARDS), equalTo(String.valueOf(numberOfShards)));
assertNull(settings.get(IndexMetaData.INDEX_BLOCKS_WRITE_SETTING.getKey()));
assertThat(settings.get(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id"), nullValue());
@ -572,7 +574,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
assertTrue(indexExists(shrunkenIndex));
assertTrue(aliasExists(shrunkenIndex, index));
Map<String, Object> settings = getOnlyIndexSettings(shrunkenIndex);
assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(TerminalPolicyStep.KEY));
assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey()));
assertThat(settings.get(IndexMetaData.SETTING_NUMBER_OF_SHARDS), equalTo(String.valueOf(1)));
assertThat(settings.get(IndexMetaData.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true"));
assertThat(settings.get(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id"), nullValue());
@ -593,7 +595,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
updatePolicy(index, policy);
assertBusy(() -> {
Map<String, Object> settings = getOnlyIndexSettings(index);
assertThat(getStepKeyForIndex(index), equalTo(TerminalPolicyStep.KEY));
assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep("cold").getKey()));
assertThat(settings.get(IndexMetaData.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true"));
assertThat(settings.get(IndexSettings.INDEX_SEARCH_THROTTLED.getKey()), equalTo("true"));
assertThat(settings.get("index.frozen"), equalTo("true"));
@ -631,7 +633,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
// assert that the index froze
assertBusy(() -> {
Map<String, Object> settings = getOnlyIndexSettings(index);
assertThat(getStepKeyForIndex(index), equalTo(TerminalPolicyStep.KEY));
assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep("cold").getKey()));
assertThat(settings.get(IndexMetaData.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true"));
assertThat(settings.get(IndexSettings.INDEX_SEARCH_THROTTLED.getKey()), equalTo("true"));
assertThat(settings.get("index.frozen"), equalTo("true"));
@ -652,7 +654,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
updatePolicy(index, policy);
assertBusy(() -> {
Map<String, Object> settings = getOnlyIndexSettings(index);
assertThat(getStepKeyForIndex(index), equalTo(TerminalPolicyStep.KEY));
assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep("warm").getKey()));
assertThat(settings.get(IndexMetaData.INDEX_PRIORITY_SETTING.getKey()), equalTo(String.valueOf(priority)));
});
}
@ -664,7 +666,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
updatePolicy(index, policy);
assertBusy(() -> {
Map<String, Object> settings = getOnlyIndexSettings(index);
assertThat(getStepKeyForIndex(index), equalTo(TerminalPolicyStep.KEY));
assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep("warm").getKey()));
assertNull(settings.get(IndexMetaData.INDEX_PRIORITY_SETTING.getKey()));
});
}
@ -812,7 +814,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
assertBusy(() -> assertTrue((boolean) explainIndex(originalIndex).getOrDefault("managed", false)));
// Wait for everything to be copacetic
assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(TerminalPolicyStep.KEY)));
assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey())));
}
public void testMoveToInjectedStep() throws Exception {
@ -847,7 +849,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
assertBusy(() -> {
assertTrue(indexExists(shrunkenIndex));
assertTrue(aliasExists(shrunkenIndex, index));
assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(TerminalPolicyStep.KEY));
assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey()));
});
}
@ -1012,7 +1014,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
client().performRequest(allowWritesOnIndexSettingUpdate);
// index is not readonly so the ILM should complete successfully
assertBusy(() -> assertThat(getStepKeyForIndex(firstIndex), equalTo(TerminalPolicyStep.KEY)));
assertBusy(() -> assertThat(getStepKeyForIndex(firstIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey())));
}
public void testILMRolloverOnManuallyRolledIndex() throws Exception {
@ -1062,7 +1064,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
client().performRequest(refreshOriginalIndex);
// Wait for the rollover policy to execute
assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(TerminalPolicyStep.KEY)));
assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey())));
// ILM should manage the second index after attempting (and skipping) rolling the original index
assertBusy(() -> assertTrue((boolean) explainIndex(secondIndex).getOrDefault("managed", true)));
@ -1075,7 +1077,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
client().performRequest(refreshSecondIndex).getStatusLine();
// ILM should rollover the second index even though it skipped the first one
assertBusy(() -> assertThat(getStepKeyForIndex(secondIndex), equalTo(TerminalPolicyStep.KEY)));
assertBusy(() -> assertThat(getStepKeyForIndex(secondIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey())));
assertBusy(() -> assertTrue(indexExists(thirdIndex)));
}
@ -1149,7 +1151,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
// the rollover step should eventually succeed
assertBusy(() -> assertThat(indexExists(rolledIndex), is(true)));
assertBusy(() -> assertThat(getStepKeyForIndex(index), equalTo(TerminalPolicyStep.KEY)));
assertBusy(() -> assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep("hot").getKey())));
}
public void testUpdateRolloverLifecycleDateStepRetriesWhenRolloverInfoIsMissing() throws Exception {
@ -1210,7 +1212,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
"}"
);
client().performRequest(rolloverRequest);
assertBusy(() -> assertThat(getStepKeyForIndex(index), equalTo(TerminalPolicyStep.KEY)));
assertBusy(() -> assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep("hot").getKey())));
}
public void testWaitForActiveShardsStep() throws Exception {
@ -1245,7 +1247,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
// reset the number of replicas to 0 so that the second index wait for active shard condition can be met
updateIndexSettings(secondIndex, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(TerminalPolicyStep.KEY)));
assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey())));
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/50353")
@ -1275,7 +1277,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
Request refreshIndex = new Request("POST", "/" + index + "-1/_refresh");
client().performRequest(refreshIndex);
assertBusy(() -> assertThat(getStepKeyForIndex(index + "-1"), equalTo(TerminalPolicyStep.KEY)));
assertBusy(() -> assertThat(getStepKeyForIndex(index + "-1"), equalTo(PhaseCompleteStep.finalStep("hot").getKey())));
assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-indexing-complete"), 30, TimeUnit.SECONDS);
assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-follow-shard-tasks"), 30, TimeUnit.SECONDS);
@ -1398,7 +1400,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
assertBusy(() -> {
Map<String, Object> explainResp = explainIndex(index);
String phase = (String) explainResp.get("phase");
assertThat(phase, equalTo(TerminalPolicyStep.COMPLETED_PHASE));
assertThat(phase, equalTo("hot"));
});
}
@ -1434,7 +1436,44 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
// Index should now have been able to roll over, creating the new index and proceeding to the "complete" step
assertBusy(() -> assertThat(indexExists(index + "-000002"), is(true)));
assertBusy(() -> assertThat(getStepKeyForIndex(index + "-1").getName(), equalTo(TerminalPolicyStep.KEY.getName())));
assertBusy(() -> assertThat(getStepKeyForIndex(index + "-1").getName(), equalTo(PhaseCompleteStep.NAME)));
}
public void testHaltAtEndOfPhase() throws Exception {
String index = "halt-index";
createNewSingletonPolicy("hot", new SetPriorityAction(100));
createIndexWithSettings(index,
Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(LifecycleSettings.LIFECYCLE_NAME, policy),
randomBoolean());
// Wait for the index to finish the "hot" phase
assertBusy(() -> assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep("hot").getKey())));
// Update the policy to add a delete phase
{
Map<String, LifecycleAction> hotActions = new HashMap<>();
hotActions.put(SetPriorityAction.NAME, new SetPriorityAction(100));
Map<String, Phase> phases = new HashMap<>();
phases.put("hot", new Phase("hot", TimeValue.ZERO, hotActions));
phases.put("delete", new Phase("delete", TimeValue.ZERO, singletonMap(DeleteAction.NAME, new DeleteAction())));
LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policy, phases);
// PUT policy
XContentBuilder builder = jsonBuilder();
lifecyclePolicy.toXContent(builder, null);
final StringEntity entity = new StringEntity(
"{ \"policy\":" + Strings.toString(builder) + "}", ContentType.APPLICATION_JSON);
Request request = new Request("PUT", "_ilm/policy/" + policy);
request.setEntity(entity);
assertOK(client().performRequest(request));
}
// The index should move into the deleted phase and be deleted
assertBusy(() -> assertFalse("expected " + index + " to be deleted by ILM", indexExists(index)));
}
// This method should be called inside an assertBusy, it has no retry logic of its own

View File

@ -140,7 +140,7 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
if (logger.isTraceEnabled()) {
logger.trace("[{}] condition not met ({}) [{}], returning existing state (info: {})",
index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(),
Strings.toString(stepInfo));
stepInfo == null ? "null" : Strings.toString(stepInfo));
}
// We may have executed a step and set "nextStepKey" to
// a value, but in this case, since the condition was

View File

@ -143,6 +143,11 @@ class IndexLifecycleRunner {
index, currentStep.getClass().getSimpleName(), currentStep.getKey());
// Only phase changing and async wait steps should be run through periodic polling
if (currentStep instanceof PhaseCompleteStep) {
if (currentStep.getNextStepKey() == null) {
logger.debug("[{}] stopping in the current phase ({}) as there are no more steps in the policy",
index, currentStep.getKey().getPhase());
return;
}
// Only proceed to the next step if enough time has elapsed to go into the next phase
if (isReadyToTransitionToThisPhase(policy, indexMetaData, currentStep.getNextStepKey().getPhase())) {
moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey());
@ -314,6 +319,11 @@ class IndexLifecycleRunner {
logger.trace("[{}] maybe running step ({}) after state change: {}",
index, currentStep.getClass().getSimpleName(), currentStep.getKey());
if (currentStep instanceof PhaseCompleteStep) {
if (currentStep.getNextStepKey() == null) {
logger.debug("[{}] stopping in the current phase ({}) as there are no more steps in the policy",
index, currentStep.getKey().getPhase());
return;
}
// Only proceed to the next step if enough time has elapsed to go into the next phase
if (isReadyToTransitionToThisPhase(policy, indexMetaData, currentStep.getNextStepKey().getPhase())) {
moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey());

View File

@ -78,7 +78,8 @@ public final class IndexLifecycleTransition {
"], currently: [" + realKey + "]");
}
if (stepRegistry.stepExists(indexPolicySetting, newStepKey) == false) {
// Always allow moving to the terminal step, even if it doesn't exist in the policy
if (stepRegistry.stepExists(indexPolicySetting, newStepKey) == false && newStepKey.equals(TerminalPolicyStep.KEY) == false) {
throw new IllegalArgumentException("step [" + newStepKey + "] for index [" + idxMeta.getIndex().getName() +
"] with policy [" + indexPolicySetting + "] does not exist");
}

View File

@ -5,11 +5,8 @@
*/
package org.elasticsearch.xpack.ilm;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteable;
@ -40,10 +37,10 @@ import org.elasticsearch.xpack.core.ilm.LifecycleType;
import org.elasticsearch.xpack.core.ilm.MockAction;
import org.elasticsearch.xpack.core.ilm.OperationMode;
import org.elasticsearch.xpack.core.ilm.Phase;
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
import org.elasticsearch.xpack.core.ilm.PhaseExecutionInfo;
import org.elasticsearch.xpack.core.ilm.Step;
import org.elasticsearch.xpack.core.ilm.StopILMRequest;
import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep;
import org.elasticsearch.xpack.core.ilm.action.ExplainLifecycleAction;
import org.elasticsearch.xpack.core.ilm.action.GetLifecycleAction;
import org.elasticsearch.xpack.core.ilm.action.GetStatusAction;
@ -65,7 +62,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static java.time.format.DateTimeFormatter.ISO_ZONED_DATE_TIME;
import static org.elasticsearch.client.Requests.clusterHealthRequest;
import static org.elasticsearch.client.Requests.createIndexRequest;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
@ -91,7 +87,8 @@ public class IndexLifecycleInitialisationTests extends ESIntegTestCase {
static {
List<Step> steps = new ArrayList<>();
Step.StepKey key = new Step.StepKey("mock", ObservableAction.NAME, ObservableClusterStateWaitStep.NAME);
steps.add(new ObservableClusterStateWaitStep(key, TerminalPolicyStep.KEY));
Step.StepKey compKey = new Step.StepKey("mock", "complete", "complete");
steps.add(new ObservableClusterStateWaitStep(key, compKey));
OBSERVABLE_ACTION = new ObservableAction(steps, true);
}
@ -147,7 +144,9 @@ public class IndexLifecycleInitialisationTests extends ESIntegTestCase {
.put(SETTING_NUMBER_OF_REPLICAS, 0).put(LifecycleSettings.LIFECYCLE_NAME, "test").build();
List<Step> steps = new ArrayList<>();
Step.StepKey key = new Step.StepKey("mock", ObservableAction.NAME, ObservableClusterStateWaitStep.NAME);
steps.add(new ObservableClusterStateWaitStep(key, TerminalPolicyStep.KEY));
Step.StepKey compKey = new Step.StepKey("mock", "complete", "complete");
steps.add(new ObservableClusterStateWaitStep(key, compKey));
steps.add(new PhaseCompleteStep(compKey, null));
Map<String, LifecycleAction> actions = Collections.singletonMap(ObservableAction.NAME, OBSERVABLE_ACTION);
mockPhase = new Phase("mock", TimeValue.timeValueSeconds(0), actions);
Map<String, Phase> phases = Collections.singletonMap("mock", mockPhase);
@ -203,7 +202,7 @@ public class IndexLifecycleInitialisationTests extends ESIntegTestCase {
assertBusy(() -> {
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(client().admin().cluster()
.prepareState().execute().actionGet().getState().getMetaData().index("test"));
assertThat(lifecycleState.getStep(), equalTo(TerminalPolicyStep.KEY.getName()));
assertThat(lifecycleState.getStep(), equalTo("complete"));
});
}
@ -270,11 +269,12 @@ public class IndexLifecycleInitialisationTests extends ESIntegTestCase {
.setSettings(Collections.singletonMap("index.lifecycle.test.complete", true)).get();
{
PhaseExecutionInfo expectedExecutionInfo = new PhaseExecutionInfo(lifecyclePolicy.getName(), null, 1L, actualModifiedDate);
Phase phase = new Phase("mock", TimeValue.ZERO, Collections.singletonMap("TEST_ACTION", OBSERVABLE_ACTION));
PhaseExecutionInfo expectedExecutionInfo = new PhaseExecutionInfo(lifecyclePolicy.getName(), phase, 1L, actualModifiedDate);
assertBusy(() -> {
IndexLifecycleExplainResponse indexResponse = executeExplainRequestAndGetTestIndexResponse("test");
assertThat(indexResponse.getPhase(), equalTo(TerminalPolicyStep.COMPLETED_PHASE));
assertThat(indexResponse.getStep(), equalTo(TerminalPolicyStep.KEY.getName()));
assertThat("expected to be in the 'mock' phase", indexResponse.getPhase(), equalTo("mock"));
assertThat("expected to be in the mock phase complete step", indexResponse.getStep(), equalTo("complete"));
assertThat(indexResponse.getPhaseExecutionInfo(), equalTo(expectedExecutionInfo));
});
}
@ -395,80 +395,7 @@ public class IndexLifecycleInitialisationTests extends ESIntegTestCase {
assertBusy(() -> {
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(client().admin().cluster()
.prepareState().execute().actionGet().getState().getMetaData().index("test"));
assertThat(lifecycleState.getStep(), equalTo(TerminalPolicyStep.KEY.getName()));
});
}
public void testMasterFailover() throws Exception {
// start one server
logger.info("Starting sever1");
final String server_1 = internalCluster().startNode();
final String node1 = getLocalNodeId(server_1);
logger.info("Creating lifecycle [test_lifecycle]");
PutLifecycleAction.Request putLifecycleRequest = new PutLifecycleAction.Request(lifecyclePolicy);
PutLifecycleAction.Response putLifecycleResponse = client().execute(PutLifecycleAction.INSTANCE, putLifecycleRequest).get();
assertAcked(putLifecycleResponse);
logger.info("Creating index [test]");
CreateIndexResponse createIndexResponse = client().admin().indices().create(createIndexRequest("test").settings(settings))
.actionGet();
assertAcked(createIndexResponse);
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
RoutingNode routingNodeEntry1 = clusterState.getRoutingNodes().node(node1);
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(1));
logger.info("Starting server2");
// start another server
internalCluster().startNode();
// first wait for 2 nodes in the cluster
logger.info("Waiting for replicas to be assigned");
ClusterHealthResponse clusterHealth = client().admin().cluster()
.health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet();
logger.info("Done Cluster Health, status {}", clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
// check step in progress in lifecycle
assertBusy(() -> {
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(client().admin().cluster()
.prepareState().execute().actionGet().getState().getMetaData().index("test"));
assertThat(lifecycleState.getStep(), equalTo(ObservableClusterStateWaitStep.NAME));
});
if (randomBoolean()) {
// this checks that the phase execution is picked up from the phase definition settings
logger.info("updating lifecycle [test_lifecycle] to be empty");
PutLifecycleAction.Request updateLifecycleRequest = new PutLifecycleAction.Request
(newLockableLifecyclePolicy(lifecyclePolicy.getName(), Collections.emptyMap()));
PutLifecycleAction.Response updateLifecycleResponse = client()
.execute(PutLifecycleAction.INSTANCE, updateLifecycleRequest).get();
assertAcked(updateLifecycleResponse);
}
logger.info("Closing server1");
// kill the first server
internalCluster().stopCurrentMasterNode();
// check that index lifecycle picked back up where it
assertBusy(() -> {
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(client().admin().cluster()
.prepareState().execute().actionGet().getState().getMetaData().index("test"));
assertThat(lifecycleState.getStep(), equalTo(ObservableClusterStateWaitStep.NAME));
});
logger.info("new master is operation");
// complete the step
AcknowledgedResponse repsonse = client().admin().indices().prepareUpdateSettings("test")
.setSettings(Collections.singletonMap("index.lifecycle.test.complete", true)).get();
assertBusy(() -> {
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(client().admin().cluster()
.prepareState().execute().actionGet().getState().getMetaData().index("test"));
assertThat(lifecycleState.getStep(), equalTo(TerminalPolicyStep.KEY.getName()));
assertThat(lifecycleState.getStep(), equalTo("complete"));
});
}

View File

@ -48,6 +48,7 @@ import org.elasticsearch.xpack.core.ilm.MockAction;
import org.elasticsearch.xpack.core.ilm.MockStep;
import org.elasticsearch.xpack.core.ilm.OperationMode;
import org.elasticsearch.xpack.core.ilm.Phase;
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
import org.elasticsearch.xpack.core.ilm.PhaseExecutionInfo;
import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.ilm.RolloverActionTests;
@ -131,6 +132,38 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
Mockito.verifyZeroInteractions(clusterService);
}
public void testRunPolicyPhaseCompletePolicyStep() {
String policyName = "async_action_policy";
PhaseCompleteStep step = PhaseCompleteStep.finalStep(randomAlphaOfLength(4));
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
runner.runPolicyAfterStateChange(policyName, indexMetaData);
runner.runPeriodicStep(policyName, indexMetaData);
Mockito.verifyZeroInteractions(clusterService);
}
public void testRunPolicyPhaseCompleteWithMoreStepsPolicyStep() {
String policyName = "async_action_policy";
TerminalPolicyStep stop = TerminalPolicyStep.INSTANCE;
PhaseCompleteStep step = new PhaseCompleteStep(new StepKey("cold", "complete", "complete"), stop.getKey());
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
runner.runPolicyAfterStateChange(policyName, indexMetaData);
runner.runPeriodicStep(policyName, indexMetaData);
Mockito.verify(clusterService, times(2)).submitStateUpdateTask(any(), any());
}
public void testRunPolicyErrorStep() {
String policyName = "async_action_policy";
LifecyclePolicy policy = LifecyclePolicyTests.randomTimeseriesLifecyclePolicyWithAllPhases(policyName);