Inject Unfollow before Rollover and Shrink (#37625)
We inject an Unfollow action before Shrink because the Shrink action cannot be safely used on a following index, as it may not be fully caught up with the leader index before the "original" following index is deleted and replaced with a non-following Shrunken index. The Unfollow action will verify that 1) the index is marked as "complete", and 2) all operations up to this point have been replicated from the leader to the follower before explicitly disconnecting the follower from the leader. Injecting an Unfollow action before the Rollover action is done mainly as a convenience: This allow users to use the same lifecycle policy on both the leader and follower cluster without having to explictly modify the policy to unfollow the index, while doing what we expect users to want in most cases.
This commit is contained in:
parent
19529da2db
commit
49bd8715ff
|
@ -107,6 +107,7 @@ integTestCluster {
|
||||||
// Truststore settings are not used since TLS is not enabled. Included for testing the get certificates API
|
// Truststore settings are not used since TLS is not enabled. Included for testing the get certificates API
|
||||||
setting 'xpack.security.http.ssl.certificate_authorities', 'testnode.crt'
|
setting 'xpack.security.http.ssl.certificate_authorities', 'testnode.crt'
|
||||||
setting 'xpack.security.transport.ssl.truststore.path', 'testnode.jks'
|
setting 'xpack.security.transport.ssl.truststore.path', 'testnode.jks'
|
||||||
|
setting 'indices.lifecycle.poll_interval', '1000ms'
|
||||||
keystoreSetting 'xpack.security.transport.ssl.truststore.secure_password', 'testnode'
|
keystoreSetting 'xpack.security.transport.ssl.truststore.secure_password', 'testnode'
|
||||||
setupCommand 'setupDummyUser',
|
setupCommand 'setupDummyUser',
|
||||||
'bin/elasticsearch-users',
|
'bin/elasticsearch-users',
|
||||||
|
|
|
@ -184,31 +184,37 @@ public class IndexLifecycleIT extends ESRestHighLevelClientTestCase {
|
||||||
|
|
||||||
createIndex("squash", Settings.EMPTY);
|
createIndex("squash", Settings.EMPTY);
|
||||||
|
|
||||||
ExplainLifecycleRequest req = new ExplainLifecycleRequest("foo-01", "baz-01", "squash");
|
// The injected Unfollow step will run pretty rapidly here, so we need
|
||||||
ExplainLifecycleResponse response = execute(req, highLevelClient().indexLifecycle()::explainLifecycle,
|
// to wait for it to settle into the "stable" step of waiting to be
|
||||||
|
// ready to roll over
|
||||||
|
assertBusy(() -> {
|
||||||
|
ExplainLifecycleRequest req = new ExplainLifecycleRequest("foo-01", "baz-01", "squash");
|
||||||
|
ExplainLifecycleResponse response = execute(req, highLevelClient().indexLifecycle()::explainLifecycle,
|
||||||
highLevelClient().indexLifecycle()::explainLifecycleAsync);
|
highLevelClient().indexLifecycle()::explainLifecycleAsync);
|
||||||
Map<String, IndexLifecycleExplainResponse> indexResponses = response.getIndexResponses();
|
Map<String, IndexLifecycleExplainResponse> indexResponses = response.getIndexResponses();
|
||||||
assertEquals(3, indexResponses.size());
|
assertEquals(3, indexResponses.size());
|
||||||
IndexLifecycleExplainResponse fooResponse = indexResponses.get("foo-01");
|
IndexLifecycleExplainResponse fooResponse = indexResponses.get("foo-01");
|
||||||
assertNotNull(fooResponse);
|
assertNotNull(fooResponse);
|
||||||
assertTrue(fooResponse.managedByILM());
|
assertTrue(fooResponse.managedByILM());
|
||||||
assertEquals("foo-01", fooResponse.getIndex());
|
assertEquals("foo-01", fooResponse.getIndex());
|
||||||
assertEquals("hot", fooResponse.getPhase());
|
assertEquals("hot", fooResponse.getPhase());
|
||||||
assertEquals("rollover", fooResponse.getAction());
|
assertEquals("rollover", fooResponse.getAction());
|
||||||
assertEquals("check-rollover-ready", fooResponse.getStep());
|
assertEquals("check-rollover-ready", fooResponse.getStep());
|
||||||
assertEquals(new PhaseExecutionInfo(policy.getName(), new Phase("", hotPhase.getMinimumAge(), hotPhase.getActions()),
|
assertEquals(new PhaseExecutionInfo(policy.getName(), new Phase("", hotPhase.getMinimumAge(), hotPhase.getActions()),
|
||||||
1L, expectedPolicyModifiedDate), fooResponse.getPhaseExecutionInfo());
|
1L, expectedPolicyModifiedDate), fooResponse.getPhaseExecutionInfo());
|
||||||
IndexLifecycleExplainResponse bazResponse = indexResponses.get("baz-01");
|
IndexLifecycleExplainResponse bazResponse = indexResponses.get("baz-01");
|
||||||
assertNotNull(bazResponse);
|
assertNotNull(bazResponse);
|
||||||
assertTrue(bazResponse.managedByILM());
|
assertTrue(bazResponse.managedByILM());
|
||||||
assertEquals("baz-01", bazResponse.getIndex());
|
assertEquals("baz-01", bazResponse.getIndex());
|
||||||
assertEquals("hot", bazResponse.getPhase());
|
assertEquals("hot", bazResponse.getPhase());
|
||||||
assertEquals("rollover", bazResponse.getAction());
|
assertEquals("rollover", bazResponse.getAction());
|
||||||
assertEquals("check-rollover-ready", bazResponse.getStep());
|
assertEquals("check-rollover-ready", bazResponse.getStep());
|
||||||
IndexLifecycleExplainResponse squashResponse = indexResponses.get("squash");
|
IndexLifecycleExplainResponse squashResponse = indexResponses.get("squash");
|
||||||
assertNotNull(squashResponse);
|
assertNotNull(squashResponse);
|
||||||
assertFalse(squashResponse.managedByILM());
|
assertFalse(squashResponse.managedByILM());
|
||||||
assertEquals("squash", squashResponse.getIndex());
|
assertEquals("squash", squashResponse.getIndex());
|
||||||
|
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDeleteLifecycle() throws IOException {
|
public void testDeleteLifecycle() throws IOException {
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.client.documentation;
|
||||||
import org.apache.http.util.EntityUtils;
|
import org.apache.http.util.EntityUtils;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.LatchedActionListener;
|
import org.elasticsearch.action.LatchedActionListener;
|
||||||
|
import org.elasticsearch.action.admin.indices.alias.Alias;
|
||||||
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
|
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
|
||||||
import org.elasticsearch.client.RequestOptions;
|
import org.elasticsearch.client.RequestOptions;
|
||||||
import org.elasticsearch.client.Response;
|
import org.elasticsearch.client.Response;
|
||||||
|
@ -38,17 +39,17 @@ import org.elasticsearch.client.indexlifecycle.LifecycleAction;
|
||||||
import org.elasticsearch.client.indexlifecycle.LifecycleManagementStatusRequest;
|
import org.elasticsearch.client.indexlifecycle.LifecycleManagementStatusRequest;
|
||||||
import org.elasticsearch.client.indexlifecycle.LifecycleManagementStatusResponse;
|
import org.elasticsearch.client.indexlifecycle.LifecycleManagementStatusResponse;
|
||||||
import org.elasticsearch.client.indexlifecycle.LifecyclePolicy;
|
import org.elasticsearch.client.indexlifecycle.LifecyclePolicy;
|
||||||
import org.elasticsearch.client.indexlifecycle.OperationMode;
|
|
||||||
import org.elasticsearch.client.indexlifecycle.LifecyclePolicyMetadata;
|
import org.elasticsearch.client.indexlifecycle.LifecyclePolicyMetadata;
|
||||||
|
import org.elasticsearch.client.indexlifecycle.OperationMode;
|
||||||
import org.elasticsearch.client.indexlifecycle.Phase;
|
import org.elasticsearch.client.indexlifecycle.Phase;
|
||||||
import org.elasticsearch.client.indexlifecycle.PutLifecyclePolicyRequest;
|
import org.elasticsearch.client.indexlifecycle.PutLifecyclePolicyRequest;
|
||||||
import org.elasticsearch.client.indexlifecycle.RemoveIndexLifecyclePolicyRequest;
|
import org.elasticsearch.client.indexlifecycle.RemoveIndexLifecyclePolicyRequest;
|
||||||
import org.elasticsearch.client.indexlifecycle.RemoveIndexLifecyclePolicyResponse;
|
import org.elasticsearch.client.indexlifecycle.RemoveIndexLifecyclePolicyResponse;
|
||||||
import org.elasticsearch.client.indexlifecycle.RetryLifecyclePolicyRequest;
|
import org.elasticsearch.client.indexlifecycle.RetryLifecyclePolicyRequest;
|
||||||
import org.elasticsearch.client.indexlifecycle.RolloverAction;
|
import org.elasticsearch.client.indexlifecycle.RolloverAction;
|
||||||
|
import org.elasticsearch.client.indexlifecycle.ShrinkAction;
|
||||||
import org.elasticsearch.client.indexlifecycle.StartILMRequest;
|
import org.elasticsearch.client.indexlifecycle.StartILMRequest;
|
||||||
import org.elasticsearch.client.indexlifecycle.StopILMRequest;
|
import org.elasticsearch.client.indexlifecycle.StopILMRequest;
|
||||||
import org.elasticsearch.client.indexlifecycle.ShrinkAction;
|
|
||||||
import org.elasticsearch.client.indices.CreateIndexRequest;
|
import org.elasticsearch.client.indices.CreateIndexRequest;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
|
@ -337,11 +338,13 @@ public class ILMDocumentationIT extends ESRestHighLevelClientTestCase {
|
||||||
new PutLifecyclePolicyRequest(policy);
|
new PutLifecyclePolicyRequest(policy);
|
||||||
client.indexLifecycle().putLifecyclePolicy(putRequest, RequestOptions.DEFAULT);
|
client.indexLifecycle().putLifecyclePolicy(putRequest, RequestOptions.DEFAULT);
|
||||||
|
|
||||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest("my_index")
|
CreateIndexRequest createIndexRequest = new CreateIndexRequest("my_index-1")
|
||||||
.settings(Settings.builder()
|
.settings(Settings.builder()
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||||
.put("index.lifecycle.name", "my_policy")
|
.put("index.lifecycle.name", "my_policy")
|
||||||
|
.put("index.lifecycle.rollover_alias", "my_alias")
|
||||||
.build());
|
.build());
|
||||||
|
createIndexRequest.alias(new Alias("my_alias").writeIndex(true));
|
||||||
client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
|
client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
|
||||||
CreateIndexRequest createOtherIndexRequest = new CreateIndexRequest("other_index")
|
CreateIndexRequest createOtherIndexRequest = new CreateIndexRequest("other_index")
|
||||||
.settings(Settings.builder()
|
.settings(Settings.builder()
|
||||||
|
@ -352,58 +355,62 @@ public class ILMDocumentationIT extends ESRestHighLevelClientTestCase {
|
||||||
|
|
||||||
// wait for the policy to become active
|
// wait for the policy to become active
|
||||||
assertBusy(() -> assertNotNull(client.indexLifecycle()
|
assertBusy(() -> assertNotNull(client.indexLifecycle()
|
||||||
.explainLifecycle(new ExplainLifecycleRequest("my_index"), RequestOptions.DEFAULT)
|
.explainLifecycle(new ExplainLifecycleRequest("my_index-1"), RequestOptions.DEFAULT)
|
||||||
.getIndexResponses().get("my_index").getAction()));
|
.getIndexResponses().get("my_index-1").getAction()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// tag::ilm-explain-lifecycle-request
|
// tag::ilm-explain-lifecycle-request
|
||||||
ExplainLifecycleRequest request =
|
ExplainLifecycleRequest request =
|
||||||
new ExplainLifecycleRequest("my_index", "other_index"); // <1>
|
new ExplainLifecycleRequest("my_index-1", "other_index"); // <1>
|
||||||
// end::ilm-explain-lifecycle-request
|
// end::ilm-explain-lifecycle-request
|
||||||
|
|
||||||
// tag::ilm-explain-lifecycle-execute
|
|
||||||
ExplainLifecycleResponse response = client.indexLifecycle()
|
|
||||||
.explainLifecycle(request, RequestOptions.DEFAULT);
|
|
||||||
// end::ilm-explain-lifecycle-execute
|
|
||||||
assertNotNull(response);
|
|
||||||
|
|
||||||
// tag::ilm-explain-lifecycle-response
|
assertBusy(() -> {
|
||||||
Map<String, IndexLifecycleExplainResponse> indices =
|
// tag::ilm-explain-lifecycle-execute
|
||||||
response.getIndexResponses();
|
ExplainLifecycleResponse response = client.indexLifecycle()
|
||||||
IndexLifecycleExplainResponse myIndex = indices.get("my_index");
|
.explainLifecycle(request, RequestOptions.DEFAULT);
|
||||||
String policyName = myIndex.getPolicyName(); // <1>
|
// end::ilm-explain-lifecycle-execute
|
||||||
boolean isManaged = myIndex.managedByILM(); // <2>
|
assertNotNull(response);
|
||||||
|
|
||||||
String phase = myIndex.getPhase(); // <3>
|
// tag::ilm-explain-lifecycle-response
|
||||||
long phaseTime = myIndex.getPhaseTime(); // <4>
|
Map<String, IndexLifecycleExplainResponse> indices =
|
||||||
String action = myIndex.getAction(); // <5>
|
response.getIndexResponses();
|
||||||
long actionTime = myIndex.getActionTime();
|
IndexLifecycleExplainResponse myIndex = indices.get("my_index-1");
|
||||||
String step = myIndex.getStep(); // <6>
|
String policyName = myIndex.getPolicyName(); // <1>
|
||||||
long stepTime = myIndex.getStepTime();
|
boolean isManaged = myIndex.managedByILM(); // <2>
|
||||||
|
|
||||||
String failedStep = myIndex.getFailedStep(); // <7>
|
String phase = myIndex.getPhase(); // <3>
|
||||||
// end::ilm-explain-lifecycle-response
|
long phaseTime = myIndex.getPhaseTime(); // <4>
|
||||||
assertEquals("my_policy", policyName);
|
String action = myIndex.getAction(); // <5>
|
||||||
assertTrue(isManaged);
|
long actionTime = myIndex.getActionTime();
|
||||||
|
String step = myIndex.getStep(); // <6>
|
||||||
|
long stepTime = myIndex.getStepTime();
|
||||||
|
|
||||||
assertEquals("hot", phase);
|
String failedStep = myIndex.getFailedStep(); // <7>
|
||||||
assertNotEquals(0, phaseTime);
|
// end::ilm-explain-lifecycle-response
|
||||||
assertEquals("rollover", action);
|
|
||||||
assertNotEquals(0, actionTime);
|
|
||||||
assertEquals("check-rollover-ready", step);
|
|
||||||
assertNotEquals(0, stepTime);
|
|
||||||
|
|
||||||
assertNull(failedStep);
|
assertEquals("my_policy", policyName);
|
||||||
|
assertTrue(isManaged);
|
||||||
|
|
||||||
IndexLifecycleExplainResponse otherIndex = indices.get("other_index");
|
assertEquals("hot", phase);
|
||||||
assertFalse(otherIndex.managedByILM());
|
assertNotEquals(0, phaseTime);
|
||||||
assertNull(otherIndex.getPolicyName());
|
assertEquals("rollover", action);
|
||||||
assertNull(otherIndex.getPhase());
|
assertNotEquals(0, actionTime);
|
||||||
assertNull(otherIndex.getAction());
|
assertEquals("check-rollover-ready", step);
|
||||||
assertNull(otherIndex.getStep());
|
assertNotEquals(0, stepTime);
|
||||||
assertNull(otherIndex.getFailedStep());
|
|
||||||
assertNull(otherIndex.getPhaseExecutionInfo());
|
assertNull(failedStep);
|
||||||
assertNull(otherIndex.getStepInfo());
|
|
||||||
|
IndexLifecycleExplainResponse otherIndex = indices.get("other_index");
|
||||||
|
assertFalse(otherIndex.managedByILM());
|
||||||
|
assertNull(otherIndex.getPolicyName());
|
||||||
|
assertNull(otherIndex.getPhase());
|
||||||
|
assertNull(otherIndex.getAction());
|
||||||
|
assertNull(otherIndex.getStep());
|
||||||
|
assertNull(otherIndex.getFailedStep());
|
||||||
|
assertNull(otherIndex.getPhaseExecutionInfo());
|
||||||
|
assertNull(otherIndex.getStepInfo());
|
||||||
|
});
|
||||||
|
|
||||||
// tag::ilm-explain-lifecycle-execute-listener
|
// tag::ilm-explain-lifecycle-execute-listener
|
||||||
ActionListener<ExplainLifecycleResponse> listener =
|
ActionListener<ExplainLifecycleResponse> listener =
|
||||||
|
|
|
@ -353,6 +353,13 @@ index format must match pattern '^.*-\\d+$', for example (`logs-000001`).
|
||||||
The managed index must set `index.lifecycle.rollover_alias` as the
|
The managed index must set `index.lifecycle.rollover_alias` as the
|
||||||
alias to rollover. The index must also be the write index for the alias.
|
alias to rollover. The index must also be the write index for the alias.
|
||||||
|
|
||||||
|
[IMPORTANT]
|
||||||
|
If a policy using the Rollover action is used on a <<ccr-put-follow,follower
|
||||||
|
index>>, policy execution will wait until the leader index rolls over (or has
|
||||||
|
<<skipping-rollover, otherwise been marked as complete>>), then convert the
|
||||||
|
follower index into a regular index as if <<ilm-unfollow-action,the Unfollow
|
||||||
|
action>> had been used instead of rolling over.
|
||||||
|
|
||||||
For example, if an index to be managed has an alias `my_data`. The managed
|
For example, if an index to be managed has an alias `my_data`. The managed
|
||||||
index "my_index" must be the write index for the alias. For more information, read
|
index "my_index" must be the write index for the alias. For more information, read
|
||||||
<<indices-rollover-is-write-index,Write Index Alias Behavior>>.
|
<<indices-rollover-is-write-index,Write Index Alias Behavior>>.
|
||||||
|
@ -578,6 +585,13 @@ PUT _ilm/policy/my_policy
|
||||||
|
|
||||||
NOTE: Index will be be made read-only when this action is run
|
NOTE: Index will be be made read-only when this action is run
|
||||||
(see: <<dynamic-index-settings,index.blocks.write>>)
|
(see: <<dynamic-index-settings,index.blocks.write>>)
|
||||||
|
[IMPORTANT]
|
||||||
|
If a policy using the Shrink action is used on a <<ccr-put-follow,follower
|
||||||
|
index>>, policy execution will wait until the leader index rolls over (or has
|
||||||
|
<<skipping-rollover, otherwise been marked as complete>>), then convert the
|
||||||
|
follower index into a regular index as if <<ilm-unfollow-action,the Unfollow
|
||||||
|
action>> had been used before shrink is applied, as shrink cannot be safely
|
||||||
|
applied to follower indices.
|
||||||
|
|
||||||
This action shrinks an existing index into a new index with fewer primary
|
This action shrinks an existing index into a new index with fewer primary
|
||||||
shards. It calls the <<indices-shrink-index,Shrink API>> to shrink the index.
|
shards. It calls the <<indices-shrink-index,Shrink API>> to shrink the index.
|
||||||
|
@ -622,11 +636,27 @@ PUT _ilm/policy/my_policy
|
||||||
[[ilm-unfollow-action]]
|
[[ilm-unfollow-action]]
|
||||||
==== Unfollow
|
==== Unfollow
|
||||||
|
|
||||||
|
[IMPORTANT]
|
||||||
|
This action may be used explicitly, as shown below, but this action is also run
|
||||||
|
before <<ilm-rollover-action,the Rollover action>> and <<ilm-shrink-action,the
|
||||||
|
Shrink action>> as described in the documentation for those actions.
|
||||||
|
|
||||||
This action turns a {ref}/ccr-apis.html[ccr] follower index
|
This action turns a {ref}/ccr-apis.html[ccr] follower index
|
||||||
into a regular index. This can be desired when moving follower
|
into a regular index. This can be desired when moving follower
|
||||||
indices into the next phase. Also certain actions like shrink
|
indices into the next phase. Also certain actions like shrink
|
||||||
and rollover can then be performed safely on follower indices.
|
and rollover can then be performed safely on follower indices.
|
||||||
|
|
||||||
|
This action will wait until is it safe to convert a follower index into a
|
||||||
|
regular index. In particular, the following conditions must be met:
|
||||||
|
|
||||||
|
* The leader index must have `index.lifecycle.indexing_complete` set to `true`.
|
||||||
|
This happens automatically if the leader index is rolled over using
|
||||||
|
<<ilm-rollover-action,the Rollover action>>, or may be set manually using
|
||||||
|
the <<indices-update-settings,Index Settings API>>.
|
||||||
|
* All operations performed on the leader index must have been replicated to the
|
||||||
|
follower index. This ensures that no operations will be lost when the index is
|
||||||
|
converted into a regular index.
|
||||||
|
|
||||||
If the unfollow action encounters a follower index then
|
If the unfollow action encounters a follower index then
|
||||||
the following operations will be performed on it:
|
the following operations will be performed on it:
|
||||||
|
|
||||||
|
|
|
@ -123,6 +123,7 @@ When the rollover is performed, the newly-created index is set as the write
|
||||||
index for the rolled over alias. Documents sent to the alias are indexed into
|
index for the rolled over alias. Documents sent to the alias are indexed into
|
||||||
the new index, enabling indexing to continue uninterrupted.
|
the new index, enabling indexing to continue uninterrupted.
|
||||||
|
|
||||||
|
[[skipping-rollover]]
|
||||||
=== Skipping Rollover
|
=== Skipping Rollover
|
||||||
|
|
||||||
The `index.lifecycle.indexing_complete` setting indicates to {ilm} whether this
|
The `index.lifecycle.indexing_complete` setting indicates to {ilm} whether this
|
||||||
|
|
|
@ -25,7 +25,6 @@ import java.util.List;
|
||||||
*/
|
*/
|
||||||
public class ReadOnlyAction implements LifecycleAction {
|
public class ReadOnlyAction implements LifecycleAction {
|
||||||
public static final String NAME = "readonly";
|
public static final String NAME = "readonly";
|
||||||
public static final ReadOnlyAction INSTANCE = new ReadOnlyAction();
|
|
||||||
|
|
||||||
private static final ObjectParser<ReadOnlyAction, Void> PARSER = new ObjectParser<>(NAME, false, ReadOnlyAction::new);
|
private static final ObjectParser<ReadOnlyAction, Void> PARSER = new ObjectParser<>(NAME, false, ReadOnlyAction::new);
|
||||||
|
|
||||||
|
|
|
@ -6,14 +6,12 @@
|
||||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||||
|
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
|
||||||
import org.elasticsearch.common.util.set.Sets;
|
import org.elasticsearch.common.util.set.Sets;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -44,8 +42,6 @@ public class TimeseriesLifecycleType implements LifecycleType {
|
||||||
static final Set<String> VALID_WARM_ACTIONS = Sets.newHashSet(ORDERED_VALID_WARM_ACTIONS);
|
static final Set<String> VALID_WARM_ACTIONS = Sets.newHashSet(ORDERED_VALID_WARM_ACTIONS);
|
||||||
static final Set<String> VALID_COLD_ACTIONS = Sets.newHashSet(ORDERED_VALID_COLD_ACTIONS);
|
static final Set<String> VALID_COLD_ACTIONS = Sets.newHashSet(ORDERED_VALID_COLD_ACTIONS);
|
||||||
static final Set<String> VALID_DELETE_ACTIONS = Sets.newHashSet(ORDERED_VALID_DELETE_ACTIONS);
|
static final Set<String> VALID_DELETE_ACTIONS = Sets.newHashSet(ORDERED_VALID_DELETE_ACTIONS);
|
||||||
private static final Phase EMPTY_WARM_PHASE = new Phase("warm", TimeValue.ZERO,
|
|
||||||
Collections.singletonMap("readonly", ReadOnlyAction.INSTANCE));
|
|
||||||
private static Map<String, Set<String>> ALLOWED_ACTIONS = new HashMap<>();
|
private static Map<String, Set<String>> ALLOWED_ACTIONS = new HashMap<>();
|
||||||
|
|
||||||
static {
|
static {
|
||||||
|
@ -72,6 +68,13 @@ public class TimeseriesLifecycleType implements LifecycleType {
|
||||||
for (String phaseName : VALID_PHASES) {
|
for (String phaseName : VALID_PHASES) {
|
||||||
Phase phase = phases.get(phaseName);
|
Phase phase = phases.get(phaseName);
|
||||||
if (phase != null) {
|
if (phase != null) {
|
||||||
|
Map<String, LifecycleAction> actions = phase.getActions();
|
||||||
|
if (actions.containsKey(UnfollowAction.NAME) == false
|
||||||
|
&& (actions.containsKey(RolloverAction.NAME) || actions.containsKey(ShrinkAction.NAME))) {
|
||||||
|
Map<String, LifecycleAction> actionMap = new HashMap<>(phase.getActions());
|
||||||
|
actionMap.put(UnfollowAction.NAME, new UnfollowAction());
|
||||||
|
phase = new Phase(phase.getName(), phase.getMinimumAge(), actionMap);
|
||||||
|
}
|
||||||
orderedPhases.add(phase);
|
orderedPhases.add(phase);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -152,6 +152,32 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
|
||||||
assertTrue(isSorted(TimeseriesLifecycleType.INSTANCE.getOrderedPhases(phaseMap), Phase::getName, VALID_PHASES));
|
assertTrue(isSorted(TimeseriesLifecycleType.INSTANCE.getOrderedPhases(phaseMap), Phase::getName, VALID_PHASES));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testUnfollowInjections() {
|
||||||
|
assertTrue(isUnfollowInjected("hot", RolloverAction.NAME));
|
||||||
|
assertTrue(isUnfollowInjected("warm", ShrinkAction.NAME));
|
||||||
|
|
||||||
|
assertFalse(isUnfollowInjected("hot", SetPriorityAction.NAME));
|
||||||
|
assertFalse(isUnfollowInjected("warm", SetPriorityAction.NAME));
|
||||||
|
assertFalse(isUnfollowInjected("warm", AllocateAction.NAME));
|
||||||
|
assertFalse(isUnfollowInjected("warm", ReadOnlyAction.NAME));
|
||||||
|
assertFalse(isUnfollowInjected("warm", ForceMergeAction.NAME));
|
||||||
|
assertFalse(isUnfollowInjected("cold", SetPriorityAction.NAME));
|
||||||
|
assertFalse(isUnfollowInjected("cold", AllocateAction.NAME));
|
||||||
|
assertFalse(isUnfollowInjected("cold", FreezeAction.NAME));
|
||||||
|
assertFalse(isUnfollowInjected("delete", DeleteAction.NAME));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isUnfollowInjected(String phaseName, String actionName) {
|
||||||
|
Map<String, Phase> phaseMap = new HashMap<>();
|
||||||
|
Map<String, LifecycleAction> actionsMap = new HashMap<>();
|
||||||
|
actionsMap.put(actionName, getTestAction(actionName));
|
||||||
|
Phase warmPhase = new Phase(phaseName, TimeValue.ZERO, actionsMap);
|
||||||
|
phaseMap.put(phaseName, warmPhase);
|
||||||
|
List<Phase> phases = TimeseriesLifecycleType.INSTANCE.getOrderedPhases(phaseMap);
|
||||||
|
Phase processedWarmPhase = phases.stream().filter(phase -> phase.getName().equals(phaseName)).findFirst().get();
|
||||||
|
return processedWarmPhase.getActions().containsKey("unfollow");
|
||||||
|
}
|
||||||
|
|
||||||
public void testGetOrderedActionsInvalidPhase() {
|
public void testGetOrderedActionsInvalidPhase() {
|
||||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> TimeseriesLifecycleType.INSTANCE
|
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> TimeseriesLifecycleType.INSTANCE
|
||||||
|
|
|
@ -274,6 +274,97 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testUnfollowInjectedBeforeShrink() throws Exception {
|
||||||
|
final String indexName = "shrink-test";
|
||||||
|
final String shrunkenIndexName = "shrink-" + indexName;
|
||||||
|
final String policyName = "shrink-test-policy";
|
||||||
|
|
||||||
|
if ("leader".equals(targetCluster)) {
|
||||||
|
Settings indexSettings = Settings.builder()
|
||||||
|
.put("index.soft_deletes.enabled", true)
|
||||||
|
.put("index.number_of_shards", 3)
|
||||||
|
.put("index.number_of_replicas", 0)
|
||||||
|
.put("index.lifecycle.name", policyName) // this policy won't exist on the leader, that's fine
|
||||||
|
.build();
|
||||||
|
createIndex(indexName, indexSettings, "", "");
|
||||||
|
ensureGreen(indexName);
|
||||||
|
} else if ("follow".equals(targetCluster)) {
|
||||||
|
// Create a policy with just a Shrink action on the follower
|
||||||
|
final XContentBuilder builder = jsonBuilder();
|
||||||
|
builder.startObject();
|
||||||
|
{
|
||||||
|
builder.startObject("policy");
|
||||||
|
{
|
||||||
|
builder.startObject("phases");
|
||||||
|
{
|
||||||
|
builder.startObject("warm");
|
||||||
|
{
|
||||||
|
builder.startObject("actions");
|
||||||
|
{
|
||||||
|
builder.startObject("shrink");
|
||||||
|
{
|
||||||
|
builder.field("number_of_shards", 1);
|
||||||
|
}
|
||||||
|
builder.endObject();
|
||||||
|
}
|
||||||
|
builder.endObject();
|
||||||
|
}
|
||||||
|
builder.endObject();
|
||||||
|
|
||||||
|
// Sometimes throw in an extraneous unfollow just to check it doesn't break anything
|
||||||
|
if (randomBoolean()) {
|
||||||
|
builder.startObject("cold");
|
||||||
|
{
|
||||||
|
builder.startObject("actions");
|
||||||
|
{
|
||||||
|
builder.startObject("unfollow");
|
||||||
|
builder.endObject();
|
||||||
|
}
|
||||||
|
builder.endObject();
|
||||||
|
}
|
||||||
|
builder.endObject();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
builder.endObject();
|
||||||
|
}
|
||||||
|
builder.endObject();
|
||||||
|
}
|
||||||
|
builder.endObject();
|
||||||
|
|
||||||
|
final Request request = new Request("PUT", "_ilm/policy/" + policyName);
|
||||||
|
request.setJsonEntity(Strings.toString(builder));
|
||||||
|
assertOK(client().performRequest(request));
|
||||||
|
|
||||||
|
// Follow the index
|
||||||
|
followIndex(indexName, indexName);
|
||||||
|
// Make sure it actually took
|
||||||
|
assertBusy(() -> assertTrue(indexExists(indexName)));
|
||||||
|
// This should now be in the "warm" phase waiting for the index to be ready to unfollow
|
||||||
|
assertBusy(() -> assertILMPolicy(client(), indexName, policyName, "warm", "unfollow", "wait-for-indexing-complete"));
|
||||||
|
|
||||||
|
// Set the indexing_complete flag on the leader so the index will actually unfollow
|
||||||
|
try (RestClient leaderClient = buildLeaderClient()) {
|
||||||
|
updateIndexSettings(leaderClient, indexName, Settings.builder()
|
||||||
|
.put("index.lifecycle.indexing_complete", true)
|
||||||
|
.build()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the setting to get replicated
|
||||||
|
assertBusy(() -> assertThat(getIndexSetting(client(), indexName, "index.lifecycle.indexing_complete"), equalTo("true")));
|
||||||
|
|
||||||
|
// We can't reliably check that the index is unfollowed, because ILM
|
||||||
|
// moves through the unfollow and shrink actions so fast that the
|
||||||
|
// index often disappears between assertBusy checks
|
||||||
|
|
||||||
|
// Wait for the index to continue with its lifecycle and be shrunk
|
||||||
|
assertBusy(() -> assertTrue(indexExists(shrunkenIndexName)));
|
||||||
|
|
||||||
|
// Wait for the index to complete its policy
|
||||||
|
assertBusy(() -> assertILMPolicy(client(), shrunkenIndexName, policyName, "completed", "completed", "completed"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static void putILMPolicy(String name, String maxSize, Integer maxDocs, TimeValue maxAge) throws IOException {
|
private static void putILMPolicy(String name, String maxSize, Integer maxDocs, TimeValue maxAge) throws IOException {
|
||||||
final Request request = new Request("PUT", "_ilm/policy/" + name);
|
final Request request = new Request("PUT", "_ilm/policy/" + name);
|
||||||
XContentBuilder builder = jsonBuilder();
|
XContentBuilder builder = jsonBuilder();
|
||||||
|
@ -299,7 +390,7 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
|
||||||
}
|
}
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
}
|
}
|
||||||
{
|
if (randomBoolean()) {
|
||||||
builder.startObject("unfollow");
|
builder.startObject("unfollow");
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
}
|
}
|
||||||
|
@ -310,6 +401,11 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
|
||||||
{
|
{
|
||||||
builder.startObject("actions");
|
builder.startObject("actions");
|
||||||
{
|
{
|
||||||
|
// Sometimes throw in an extraneous unfollow just to check it doesn't break anything
|
||||||
|
if (randomBoolean()) {
|
||||||
|
builder.startObject("unfollow");
|
||||||
|
builder.endObject();
|
||||||
|
}
|
||||||
builder.startObject("readonly");
|
builder.startObject("readonly");
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
}
|
}
|
||||||
|
@ -338,13 +434,26 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void assertILMPolicy(RestClient client, String index, String policy, String expectedPhase) throws IOException {
|
private static void assertILMPolicy(RestClient client, String index, String policy, String expectedPhase) throws IOException {
|
||||||
|
assertILMPolicy(client, index, policy, expectedPhase, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void assertILMPolicy(RestClient client, String index, String policy, String expectedPhase,
|
||||||
|
String expectedAction, String expectedStep) throws IOException {
|
||||||
final Request request = new Request("GET", "/" + index + "/_ilm/explain");
|
final Request request = new Request("GET", "/" + index + "/_ilm/explain");
|
||||||
Map<String, Object> response = toMap(client.performRequest(request));
|
Map<String, Object> response = toMap(client.performRequest(request));
|
||||||
LOGGER.info("response={}", response);
|
LOGGER.info("response={}", response);
|
||||||
Map<?, ?> explanation = (Map<?, ?>) ((Map<?, ?>) response.get("indices")).get(index);
|
Map<?, ?> explanation = (Map<?, ?>) ((Map<?, ?>) response.get("indices")).get(index);
|
||||||
assertThat(explanation.get("managed"), is(true));
|
assertThat(explanation.get("managed"), is(true));
|
||||||
assertThat(explanation.get("policy"), equalTo(policy));
|
assertThat(explanation.get("policy"), equalTo(policy));
|
||||||
assertThat(explanation.get("phase"), equalTo(expectedPhase));
|
if (expectedPhase != null) {
|
||||||
|
assertThat(explanation.get("phase"), equalTo(expectedPhase));
|
||||||
|
}
|
||||||
|
if (expectedAction != null) {
|
||||||
|
assertThat(explanation.get("action"), equalTo(expectedAction));
|
||||||
|
}
|
||||||
|
if (expectedStep != null) {
|
||||||
|
assertThat(explanation.get("step"), equalTo(expectedStep));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void updateIndexSettings(RestClient client, String index, Settings settings) throws IOException {
|
private static void updateIndexSettings(RestClient client, String index, Settings settings) throws IOException {
|
||||||
|
|
|
@ -759,6 +759,42 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
||||||
assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(TerminalPolicyStep.KEY)));
|
assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(TerminalPolicyStep.KEY)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testMoveToInjectedStep() throws Exception {
|
||||||
|
String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + index;
|
||||||
|
createNewSingletonPolicy("warm", new ShrinkAction(1), TimeValue.timeValueHours(12));
|
||||||
|
|
||||||
|
createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3)
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||||
|
.put(LifecycleSettings.LIFECYCLE_NAME, policy)
|
||||||
|
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias"));
|
||||||
|
|
||||||
|
assertBusy(() -> assertThat(getStepKeyForIndex(index), equalTo(new StepKey("new", "complete", "complete"))));
|
||||||
|
|
||||||
|
// Move to a step from the injected unfollow action
|
||||||
|
Request moveToStepRequest = new Request("POST", "_ilm/move/" + index);
|
||||||
|
moveToStepRequest.setJsonEntity("{\n" +
|
||||||
|
" \"current_step\": { \n" +
|
||||||
|
" \"phase\": \"new\",\n" +
|
||||||
|
" \"action\": \"complete\",\n" +
|
||||||
|
" \"name\": \"complete\"\n" +
|
||||||
|
" },\n" +
|
||||||
|
" \"next_step\": { \n" +
|
||||||
|
" \"phase\": \"warm\",\n" +
|
||||||
|
" \"action\": \"unfollow\",\n" +
|
||||||
|
" \"name\": \"wait-for-indexing-complete\"\n" +
|
||||||
|
" }\n" +
|
||||||
|
"}");
|
||||||
|
// If we get an OK on this request we have successfully moved to the injected step
|
||||||
|
assertOK(client().performRequest(moveToStepRequest));
|
||||||
|
|
||||||
|
// Make sure we actually move on to and execute the shrink action
|
||||||
|
assertBusy(() -> {
|
||||||
|
assertTrue(indexExists(shrunkenIndex));
|
||||||
|
assertTrue(aliasExists(shrunkenIndex, index));
|
||||||
|
assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(TerminalPolicyStep.KEY));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private void createFullPolicy(TimeValue hotTime) throws IOException {
|
private void createFullPolicy(TimeValue hotTime) throws IOException {
|
||||||
Map<String, LifecycleAction> hotActions = new HashMap<>();
|
Map<String, LifecycleAction> hotActions = new HashMap<>();
|
||||||
hotActions.put(SetPriorityAction.NAME, new SetPriorityAction(100));
|
hotActions.put(SetPriorityAction.NAME, new SetPriorityAction(100));
|
||||||
|
|
Loading…
Reference in New Issue