An index with an ILM policy that has a rollover action in one of the phases was rolled over when the ILM conditions dictated regardless if it was already rolled over (eg. manually after modifying an index template in order to force the creation of a new index that uses the new mappings). This changes this behaviour and has ILM check if the index it's about to roll has not been rolled over in the meantime. (cherry picked from commit 37d6106feeb9f9369519117c88a9e7e30f3ac797) Signed-off-by: Andrei Dan <andrei.dan@elastic.co>
This commit is contained in:
parent
36cabbae80
commit
4506b37ed5
|
@ -49,6 +49,13 @@ public class RolloverStep extends AsyncActionStep {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (indexMetaData.getRolloverInfos().get(rolloverAlias) != null) {
|
||||||
|
logger.info("index [{}] was already rolled over for alias [{}], not attempting to roll over again",
|
||||||
|
indexMetaData.getIndex().getName(), rolloverAlias);
|
||||||
|
listener.onResponse(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (indexMetaData.getAliases().containsKey(rolloverAlias) == false) {
|
if (indexMetaData.getAliases().containsKey(rolloverAlias) == false) {
|
||||||
listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
|
listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
|
||||||
"%s [%s] does not point to index [%s]", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, rolloverAlias,
|
"%s [%s] does not point to index [%s]", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, rolloverAlias,
|
||||||
|
|
|
@ -53,6 +53,13 @@ public class WaitForRolloverReadyStep extends AsyncWaitStep {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (indexMetaData.getRolloverInfos().get(rolloverAlias) != null) {
|
||||||
|
logger.info("index [{}] was already rolled over for alias [{}], not attempting to roll over again",
|
||||||
|
indexMetaData.getIndex().getName(), rolloverAlias);
|
||||||
|
listener.onResponse(true, new WaitForRolloverReadyStep.EmptyInfo());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// The order of the following checks is important in ways which may not be obvious.
|
// The order of the following checks is important in ways which may not be obvious.
|
||||||
|
|
||||||
// First, figure out if 1) The configured alias points to this index, and if so,
|
// First, figure out if 1) The configured alias points to this index, and if so,
|
||||||
|
|
|
@ -8,6 +8,8 @@ package org.elasticsearch.xpack.core.ilm;
|
||||||
import org.apache.lucene.util.SetOnce;
|
import org.apache.lucene.util.SetOnce;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.admin.indices.rollover.MaxSizeCondition;
|
||||||
|
import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
|
||||||
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
|
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
|
||||||
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
|
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
|
||||||
import org.elasticsearch.client.AdminClient;
|
import org.elasticsearch.client.AdminClient;
|
||||||
|
@ -15,6 +17,7 @@ import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.client.IndicesAdminClient;
|
import org.elasticsearch.client.IndicesAdminClient;
|
||||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
|
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
@ -25,6 +28,7 @@ import java.util.Collections;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.core.Is.is;
|
||||||
|
|
||||||
public class RolloverStepTests extends AbstractStepTestCase<RolloverStep> {
|
public class RolloverStepTests extends AbstractStepTestCase<RolloverStep> {
|
||||||
|
|
||||||
|
@ -154,6 +158,39 @@ public class RolloverStepTests extends AbstractStepTestCase<RolloverStep> {
|
||||||
assertEquals(true, actionCompleted.get());
|
assertEquals(true, actionCompleted.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testPerformActionSkipsRolloverForAlreadyRolledIndex() {
|
||||||
|
String rolloverAlias = randomAlphaOfLength(5);
|
||||||
|
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10))
|
||||||
|
.putAlias(AliasMetaData.builder(rolloverAlias))
|
||||||
|
.settings(settings(Version.CURRENT).put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, rolloverAlias))
|
||||||
|
.putRolloverInfo(new RolloverInfo(rolloverAlias,
|
||||||
|
Collections.singletonList(new MaxSizeCondition(new ByteSizeValue(2L))),
|
||||||
|
System.currentTimeMillis())
|
||||||
|
)
|
||||||
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
|
|
||||||
|
RolloverStep step = createRandomInstance();
|
||||||
|
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||||
|
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||||
|
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||||
|
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||||
|
|
||||||
|
step.performAction(indexMetaData, null, null, new AsyncActionStep.Listener() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onResponse(boolean complete) {
|
||||||
|
assertThat(complete, is(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
throw new AssertionError("Unexpected method call", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Mockito.verify(indicesClient, Mockito.never()).rolloverIndex(Mockito.any(), Mockito.any());
|
||||||
|
}
|
||||||
|
|
||||||
public void testPerformActionFailure() {
|
public void testPerformActionFailure() {
|
||||||
String alias = randomAlphaOfLength(5);
|
String alias = randomAlphaOfLength(5);
|
||||||
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10))
|
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10))
|
||||||
|
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.indices.rollover.Condition;
|
||||||
import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
|
import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
|
||||||
import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition;
|
import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition;
|
||||||
import org.elasticsearch.action.admin.indices.rollover.MaxSizeCondition;
|
import org.elasticsearch.action.admin.indices.rollover.MaxSizeCondition;
|
||||||
|
import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
|
||||||
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
|
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
|
||||||
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
|
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
|
||||||
import org.elasticsearch.client.AdminClient;
|
import org.elasticsearch.client.AdminClient;
|
||||||
|
@ -29,6 +30,7 @@ import org.mockito.Mockito;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -36,6 +38,7 @@ import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
|
||||||
public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForRolloverReadyStep> {
|
public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForRolloverReadyStep> {
|
||||||
|
|
||||||
|
@ -173,6 +176,64 @@ public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForR
|
||||||
Mockito.verify(indicesClient, Mockito.only()).rolloverIndex(Mockito.any(), Mockito.any());
|
Mockito.verify(indicesClient, Mockito.only()).rolloverIndex(Mockito.any(), Mockito.any());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testEvaluateDoesntTriggerRolloverForIndexManuallyRolledOnLifecycleRolloverAlias() {
|
||||||
|
String rolloverAlias = randomAlphaOfLength(5);
|
||||||
|
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10))
|
||||||
|
.putAlias(AliasMetaData.builder(rolloverAlias))
|
||||||
|
.settings(settings(Version.CURRENT).put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, rolloverAlias))
|
||||||
|
.putRolloverInfo(new RolloverInfo(rolloverAlias, Collections.singletonList(new MaxSizeCondition(new ByteSizeValue(2L))),
|
||||||
|
System.currentTimeMillis()))
|
||||||
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
|
|
||||||
|
WaitForRolloverReadyStep step = createRandomInstance();
|
||||||
|
IndicesAdminClient indicesClient = indicesAdminClientMock();
|
||||||
|
|
||||||
|
step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onResponse(boolean complete, ToXContentObject informationContext) {
|
||||||
|
assertThat(complete, is(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
throw new AssertionError("Unexpected method call", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Mockito.verify(indicesClient, Mockito.never()).rolloverIndex(Mockito.any(), Mockito.any());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testEvaluateTriggersRolloverForIndexManuallyRolledOnDifferentAlias() {
|
||||||
|
String rolloverAlias = randomAlphaOfLength(5);
|
||||||
|
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10))
|
||||||
|
.putAlias(AliasMetaData.builder(rolloverAlias))
|
||||||
|
.settings(settings(Version.CURRENT).put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, rolloverAlias))
|
||||||
|
.putRolloverInfo(new RolloverInfo(randomAlphaOfLength(5),
|
||||||
|
Collections.singletonList(new MaxSizeCondition(new ByteSizeValue(2L))),
|
||||||
|
System.currentTimeMillis())
|
||||||
|
)
|
||||||
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
|
|
||||||
|
WaitForRolloverReadyStep step = createRandomInstance();
|
||||||
|
IndicesAdminClient indicesClient = indicesAdminClientMock();
|
||||||
|
|
||||||
|
step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onResponse(boolean complete, ToXContentObject informationContext) {
|
||||||
|
assertThat(complete, is(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
throw new AssertionError("Unexpected method call", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Mockito.verify(indicesClient, Mockito.only()).rolloverIndex(Mockito.any(), Mockito.any());
|
||||||
|
}
|
||||||
|
|
||||||
public void testPerformActionWithIndexingComplete() {
|
public void testPerformActionWithIndexingComplete() {
|
||||||
String alias = randomAlphaOfLength(5);
|
String alias = randomAlphaOfLength(5);
|
||||||
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10))
|
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10))
|
||||||
|
@ -399,4 +460,12 @@ public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForR
|
||||||
"%s [%s] does not point to index [%s]", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias,
|
"%s [%s] does not point to index [%s]", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias,
|
||||||
indexMetaData.getIndex().getName())));
|
indexMetaData.getIndex().getName())));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private IndicesAdminClient indicesAdminClientMock() {
|
||||||
|
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||||
|
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||||
|
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||||
|
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||||
|
return indicesClient;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -769,20 +769,6 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
||||||
client().performRequest(addPolicyRequest);
|
client().performRequest(addPolicyRequest);
|
||||||
assertBusy(() -> assertTrue((boolean) explainIndex(originalIndex).getOrDefault("managed", false)));
|
assertBusy(() -> assertTrue((boolean) explainIndex(originalIndex).getOrDefault("managed", false)));
|
||||||
|
|
||||||
// Wait for rollover to error
|
|
||||||
assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(new StepKey("hot", RolloverAction.NAME, ErrorStep.NAME))));
|
|
||||||
|
|
||||||
// Set indexing complete
|
|
||||||
Request setIndexingCompleteRequest = new Request("PUT", "/" + originalIndex + "/_settings");
|
|
||||||
setIndexingCompleteRequest.setJsonEntity("{\n" +
|
|
||||||
" \"index.lifecycle.indexing_complete\": true\n" +
|
|
||||||
"}");
|
|
||||||
client().performRequest(setIndexingCompleteRequest);
|
|
||||||
|
|
||||||
// Retry policy
|
|
||||||
Request retryRequest = new Request("POST", "/" + originalIndex + "/_ilm/retry");
|
|
||||||
client().performRequest(retryRequest);
|
|
||||||
|
|
||||||
// Wait for everything to be copacetic
|
// Wait for everything to be copacetic
|
||||||
assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(TerminalPolicyStep.KEY)));
|
assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(TerminalPolicyStep.KEY)));
|
||||||
}
|
}
|
||||||
|
@ -884,6 +870,79 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testILMRolloverOnManuallyRolledIndex() throws Exception {
|
||||||
|
String originalIndex = index + "-000001";
|
||||||
|
String secondIndex = index + "-000002";
|
||||||
|
String thirdIndex = index + "-000003";
|
||||||
|
|
||||||
|
// Configure ILM to run every second
|
||||||
|
Request updateLifecylePollSetting = new Request("PUT", "_cluster/settings");
|
||||||
|
updateLifecylePollSetting.setJsonEntity("{" +
|
||||||
|
" \"transient\": {\n" +
|
||||||
|
"\"indices.lifecycle.poll_interval\" : \"1s\" \n" +
|
||||||
|
" }\n" +
|
||||||
|
"}");
|
||||||
|
client().performRequest(updateLifecylePollSetting);
|
||||||
|
|
||||||
|
// Set up a policy with rollover
|
||||||
|
createNewSingletonPolicy("hot", new RolloverAction(null, null, 2L));
|
||||||
|
Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes");
|
||||||
|
createIndexTemplate.setJsonEntity("{" +
|
||||||
|
"\"index_patterns\": [\""+ index + "-*\"], \n" +
|
||||||
|
" \"settings\": {\n" +
|
||||||
|
" \"number_of_shards\": 1,\n" +
|
||||||
|
" \"number_of_replicas\": 0,\n" +
|
||||||
|
" \"index.lifecycle.name\": \"" + policy+ "\", \n" +
|
||||||
|
" \"index.lifecycle.rollover_alias\": \"alias\"\n" +
|
||||||
|
" }\n" +
|
||||||
|
"}");
|
||||||
|
client().performRequest(createIndexTemplate);
|
||||||
|
|
||||||
|
createIndexWithSettings(
|
||||||
|
originalIndex,
|
||||||
|
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0),
|
||||||
|
true
|
||||||
|
);
|
||||||
|
|
||||||
|
// Index a document
|
||||||
|
index(client(), originalIndex, "1", "foo", "bar");
|
||||||
|
Request refreshOriginalIndex = new Request("POST", "/" + originalIndex + "/_refresh");
|
||||||
|
client().performRequest(refreshOriginalIndex);
|
||||||
|
|
||||||
|
// Manual rollover
|
||||||
|
Request rolloverRequest = new Request("POST", "/alias/_rollover");
|
||||||
|
rolloverRequest.setJsonEntity("{\n" +
|
||||||
|
" \"conditions\": {\n" +
|
||||||
|
" \"max_docs\": \"1\"\n" +
|
||||||
|
" }\n" +
|
||||||
|
"}"
|
||||||
|
);
|
||||||
|
client().performRequest(rolloverRequest);
|
||||||
|
assertBusy(() -> assertTrue(indexExists(secondIndex)));
|
||||||
|
|
||||||
|
// Index another document into the original index so the ILM rollover policy condition is met
|
||||||
|
index(client(), originalIndex, "2", "foo", "bar");
|
||||||
|
client().performRequest(refreshOriginalIndex);
|
||||||
|
|
||||||
|
// Wait for the rollover policy to execute
|
||||||
|
assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(TerminalPolicyStep.KEY)));
|
||||||
|
|
||||||
|
// ILM should manage the second index after attempting (and skipping) rolling the original index
|
||||||
|
assertBusy(() -> assertTrue((boolean) explainIndex(secondIndex).getOrDefault("managed", true)));
|
||||||
|
|
||||||
|
// index some documents to trigger an ILM rollover
|
||||||
|
index(client(), "alias", "1", "foo", "bar");
|
||||||
|
index(client(), "alias", "2", "foo", "bar");
|
||||||
|
index(client(), "alias", "3", "foo", "bar");
|
||||||
|
Request refreshSecondIndex = new Request("POST", "/" + secondIndex + "/_refresh");
|
||||||
|
client().performRequest(refreshSecondIndex).getStatusLine();
|
||||||
|
|
||||||
|
// ILM should rollover the second index even though it skipped the first one
|
||||||
|
assertBusy(() -> assertThat(getStepKeyForIndex(secondIndex), equalTo(TerminalPolicyStep.KEY)));
|
||||||
|
assertBusy(() -> assertTrue(indexExists(thirdIndex)));
|
||||||
|
}
|
||||||
|
|
||||||
private void createFullPolicy(TimeValue hotTime) throws IOException {
|
private void createFullPolicy(TimeValue hotTime) throws IOException {
|
||||||
Map<String, LifecycleAction> hotActions = new HashMap<>();
|
Map<String, LifecycleAction> hotActions = new HashMap<>();
|
||||||
hotActions.put(SetPriorityAction.NAME, new SetPriorityAction(100));
|
hotActions.put(SetPriorityAction.NAME, new SetPriorityAction(100));
|
||||||
|
|
Loading…
Reference in New Issue