[7.x] ILM add data stream support to the Shrink action (#57616) (#57884)

The shrink action creates a shrunken index with the target number of shards.
This makes the shrink action data stream aware. If the ILM managed index is
part of a data stream the shrink action will make sure to swap the original
managed index with the shrunken one as part of the data stream's backing
indices and then delete the original index.

(cherry picked from commit 99aeed6acf4ae7cbdd97a3bcfe54c5d37ab7a574)
Signed-off-by: Andrei Dan <andrei.dan@elastic.co>
This commit is contained in:
Andrei Dan 2020-06-09 19:45:22 +01:00 committed by GitHub
parent ea696198e9
commit 3945712c72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 650 additions and 77 deletions

View File

@ -102,6 +102,31 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
return new DataStream(name, timeStampField, backingIndices, generation); return new DataStream(name, timeStampField, backingIndices, generation);
} }
/**
* Replaces the specified backing index with a new index and returns a new {@code DataStream} instance with
* the modified backing indices. An {@code IllegalArgumentException} is thrown if the index to be replaced
* is not a backing index for this data stream or if it is the {@code DataStream}'s write index.
*
* @param existingBackingIndex the backing index to be replaced
* @param newBackingIndex the new index that will be part of the {@code DataStream}
* @return new {@code DataStream} instance with backing indices that contain replacement index instead of the specified
* existing index.
*/
public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBackingIndex) {
List<Index> backingIndices = new ArrayList<>(indices);
int backingIndexPosition = backingIndices.indexOf(existingBackingIndex);
if (backingIndexPosition == -1) {
throw new IllegalArgumentException(String.format(Locale.ROOT, "index [%s] is not part of data stream [%s] ",
existingBackingIndex.getName(), name));
}
if (generation == (backingIndexPosition + 1)) {
throw new IllegalArgumentException(String.format(Locale.ROOT, "cannot replace backing index [%s] of data stream [%s] because " +
"it is the write index", existingBackingIndex.getName(), name));
}
backingIndices.set(backingIndexPosition, newBackingIndex);
return new DataStream(name, timeStampField, backingIndices, generation);
}
/** /**
* Generates the name of the index that conforms to the default naming convention for backing indices * Generates the name of the index that conforms to the default naming convention for backing indices
* on data streams given the specified data stream name and generation. * on data streams given the specified data stream name and generation.

View File

@ -108,4 +108,60 @@ public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
String expectedBackingIndexName = String.format(Locale.ROOT, ".ds-%s-%06d", dataStreamName, backingIndexNum); String expectedBackingIndexName = String.format(Locale.ROOT, ".ds-%s-%06d", dataStreamName, backingIndexNum);
assertThat(defaultBackingIndexName, equalTo(expectedBackingIndexName)); assertThat(defaultBackingIndexName, equalTo(expectedBackingIndexName));
} }
public void testReplaceBackingIndex() {
int numBackingIndices = randomIntBetween(2, 32);
int indexToReplace = randomIntBetween(1, numBackingIndices - 1) - 1;
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
List<Index> indices = new ArrayList<>(numBackingIndices);
for (int i = 1; i <= numBackingIndices; i++) {
indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, i), UUIDs.randomBase64UUID(random())));
}
DataStream original = new DataStream(dataStreamName, "@timestamp", indices);
Index newBackingIndex = new Index("replacement-index", UUIDs.randomBase64UUID(random()));
DataStream updated = original.replaceBackingIndex(indices.get(indexToReplace), newBackingIndex);
assertThat(updated.getName(), equalTo(original.getName()));
assertThat(updated.getGeneration(), equalTo(original.getGeneration()));
assertThat(updated.getTimeStampField(), equalTo(original.getTimeStampField()));
assertThat(updated.getIndices().size(), equalTo(numBackingIndices));
assertThat(updated.getIndices().get(indexToReplace), equalTo(newBackingIndex));
for (int i = 0; i < numBackingIndices; i++) {
if (i != indexToReplace) {
assertThat(updated.getIndices().get(i), equalTo(original.getIndices().get(i)));
}
}
}
public void testReplaceBackingIndexThrowsExceptionIfIndexNotPartOfDataStream() {
int numBackingIndices = randomIntBetween(2, 32);
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
List<Index> indices = new ArrayList<>(numBackingIndices);
for (int i = 1; i <= numBackingIndices; i++) {
indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, i), UUIDs.randomBase64UUID(random())));
}
DataStream original = new DataStream(dataStreamName, "@timestamp", indices);
Index standaloneIndex = new Index("index-foo", UUIDs.randomBase64UUID(random()));
Index newBackingIndex = new Index("replacement-index", UUIDs.randomBase64UUID(random()));
expectThrows(IllegalArgumentException.class, () -> original.replaceBackingIndex(standaloneIndex, newBackingIndex));
}
public void testReplaceBackingIndexThrowsExceptionIfReplacingWriteIndex() {
int numBackingIndices = randomIntBetween(2, 32);
int writeIndexPosition = numBackingIndices - 1;
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
List<Index> indices = new ArrayList<>(numBackingIndices);
for (int i = 1; i <= numBackingIndices; i++) {
indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, i), UUIDs.randomBase64UUID(random())));
}
DataStream original = new DataStream(dataStreamName, "@timestamp", indices);
Index newBackingIndex = new Index("replacement-index", UUIDs.randomBase64UUID(random()));
expectThrows(IllegalArgumentException.class, () -> original.replaceBackingIndex(indices.get(writeIndexPosition), newBackingIndex));
}
} }

View File

@ -49,6 +49,11 @@ public class BranchingStep extends ClusterStateActionStep {
this.predicateValue = new SetOnce<>(); this.predicateValue = new SetOnce<>();
} }
@Override
public boolean isRetryable() {
return true;
}
@Override @Override
public ClusterState performAction(Index index, ClusterState clusterState) { public ClusterState performAction(Index index, ClusterState clusterState) {
IndexMetadata indexMetadata = clusterState.metadata().index(index); IndexMetadata indexMetadata = clusterState.metadata().index(index);

View File

@ -0,0 +1,117 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ilm;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.index.Index;
import java.util.Locale;
import java.util.Objects;
/**
* This step replaces a data stream backing index with the target index, as part of the data stream's backing indices.
* Eg. if data stream `foo-stream` is backed by indices [`foo-stream-000001`, `foo-stream-000002`] and we'd like to replace the first
* generation index, `foo-stream-000001`, with `shrink-foo-stream-000001`, after this step the `foo-stream` data stream will contain
* the following indices
* <p>
* [`shrink-foo-stream-000001`, `foo-stream-000002`]
* <p>
* The `foo-stream-000001` index will continue to exist but will not be part of the data stream anymore.
* <p>
* As the last generation is the write index of the data stream, replacing the last generation index is not allowed.
* <p>
* This is useful in scenarios following a restore from snapshot operation where the restored index will take the place of the source
* index in the ILM lifecycle or in the case where we shrink an index and the shrunk index will take the place of the original index.
*/
public class ReplaceDataStreamBackingIndexStep extends ClusterStateActionStep {
public static final String NAME = "replace-datastream-backing-index";
private static final Logger logger = LogManager.getLogger(ReplaceDataStreamBackingIndexStep.class);
private final String targetIndexPrefix;
public ReplaceDataStreamBackingIndexStep(StepKey key, StepKey nextStepKey, String targetIndexPrefix) {
super(key, nextStepKey);
this.targetIndexPrefix = targetIndexPrefix;
}
@Override
public boolean isRetryable() {
return true;
}
public String getTargetIndexPrefix() {
return targetIndexPrefix;
}
@Override
public ClusterState performAction(Index index, ClusterState clusterState) {
String originalIndex = index.getName();
final String targetIndexName = targetIndexPrefix + originalIndex;
IndexMetadata originalIndexMetadata = clusterState.metadata().index(index);
if (originalIndexMetadata == null) {
// Index must have been since deleted, skip the shrink action
logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", NAME, index.getName());
return clusterState;
}
String policyName = originalIndexMetadata.getSettings().get(LifecycleSettings.LIFECYCLE_NAME);
IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(index.getName());
assert indexAbstraction != null : "invalid cluster metadata. index [" + index.getName() + "] was not found";
IndexAbstraction.DataStream dataStream = indexAbstraction.getParentDataStream();
if (dataStream == null) {
String errorMessage = String.format(Locale.ROOT, "index [%s] is not part of a data stream. stopping execution of lifecycle " +
"[%s] until the index is added to a data stream", originalIndex, policyName);
logger.debug(errorMessage);
throw new IllegalStateException(errorMessage);
}
assert dataStream.getWriteIndex() != null : dataStream.getName() + " has no write index";
if (dataStream.getWriteIndex().getIndex().getName().equals(originalIndex)) {
String errorMessage = String.format(Locale.ROOT, "index [%s] is the write index for data stream [%s]. stopping execution of " +
"lifecycle [%s] as a data stream's write index cannot be replaced. manually rolling over the index will resume the " +
"execution of the policy as the index will not be the data stream's write index anymore", originalIndex,
dataStream.getName(), policyName);
logger.debug(errorMessage);
throw new IllegalStateException(errorMessage);
}
IndexMetadata targetIndexMetadata = clusterState.metadata().index(targetIndexName);
if (targetIndexMetadata == null) {
String errorMessage = String.format(Locale.ROOT, "target index [%s] doesn't exist. stopping execution of lifecycle [%s] for" +
" index [%s]", targetIndexName, policyName, originalIndex);
logger.debug(errorMessage);
throw new IllegalStateException(errorMessage);
}
Metadata.Builder newMetaData = Metadata.builder(clusterState.getMetadata())
.put(dataStream.getDataStream().replaceBackingIndex(index, targetIndexMetadata.getIndex()));
return ClusterState.builder(clusterState).metadata(newMetaData).build();
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), targetIndexPrefix);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
ReplaceDataStreamBackingIndexStep other = (ReplaceDataStreamBackingIndexStep) obj;
return super.equals(obj) &&
Objects.equals(targetIndexPrefix, other.targetIndexPrefix);
}
}

View File

@ -5,7 +5,11 @@
*/ */
package org.elasticsearch.xpack.core.ilm; package org.elasticsearch.xpack.core.ilm;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
@ -15,12 +19,15 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Objects; import java.util.Objects;
import java.util.function.BiPredicate;
/** /**
* A {@link LifecycleAction} which shrinks the index. * A {@link LifecycleAction} which shrinks the index.
@ -29,6 +36,10 @@ public class ShrinkAction implements LifecycleAction {
public static final String NAME = "shrink"; public static final String NAME = "shrink";
public static final String SHRUNKEN_INDEX_PREFIX = "shrink-"; public static final String SHRUNKEN_INDEX_PREFIX = "shrink-";
public static final ParseField NUMBER_OF_SHARDS_FIELD = new ParseField("number_of_shards"); public static final ParseField NUMBER_OF_SHARDS_FIELD = new ParseField("number_of_shards");
public static final String CONDITIONAL_SKIP_SHRINK_STEP = BranchingStep.NAME + "-check-prerequisites";
public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep.NAME + "-on-datastream-check";
private static final Logger logger = LogManager.getLogger(ShrinkAction.class);
private static final ConstructingObjectParser<ShrinkAction, Void> PARSER = private static final ConstructingObjectParser<ShrinkAction, Void> PARSER =
new ConstructingObjectParser<>(NAME, a -> new ShrinkAction((Integer) a[0])); new ConstructingObjectParser<>(NAME, a -> new ShrinkAction((Integer) a[0]));
@ -85,7 +96,7 @@ public class ShrinkAction implements LifecycleAction {
public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey) { public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey) {
Settings readOnlySettings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true).build(); Settings readOnlySettings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true).build();
StepKey branchingKey = new StepKey(phase, NAME, BranchingStep.NAME); StepKey preShrinkBranchingKey = new StepKey(phase, NAME, CONDITIONAL_SKIP_SHRINK_STEP);
StepKey waitForNoFollowerStepKey = new StepKey(phase, NAME, WaitForNoFollowersStep.NAME); StepKey waitForNoFollowerStepKey = new StepKey(phase, NAME, WaitForNoFollowersStep.NAME);
StepKey readOnlyKey = new StepKey(phase, NAME, ReadOnlyAction.NAME); StepKey readOnlyKey = new StepKey(phase, NAME, ReadOnlyAction.NAME);
StepKey setSingleNodeKey = new StepKey(phase, NAME, SetSingleNodeAllocateStep.NAME); StepKey setSingleNodeKey = new StepKey(phase, NAME, SetSingleNodeAllocateStep.NAME);
@ -93,23 +104,77 @@ public class ShrinkAction implements LifecycleAction {
StepKey shrinkKey = new StepKey(phase, NAME, ShrinkStep.NAME); StepKey shrinkKey = new StepKey(phase, NAME, ShrinkStep.NAME);
StepKey enoughShardsKey = new StepKey(phase, NAME, ShrunkShardsAllocatedStep.NAME); StepKey enoughShardsKey = new StepKey(phase, NAME, ShrunkShardsAllocatedStep.NAME);
StepKey copyMetadataKey = new StepKey(phase, NAME, CopyExecutionStateStep.NAME); StepKey copyMetadataKey = new StepKey(phase, NAME, CopyExecutionStateStep.NAME);
StepKey dataStreamCheckBranchingKey = new StepKey(phase, NAME, CONDITIONAL_DATASTREAM_CHECK_KEY);
StepKey aliasKey = new StepKey(phase, NAME, ShrinkSetAliasStep.NAME); StepKey aliasKey = new StepKey(phase, NAME, ShrinkSetAliasStep.NAME);
StepKey isShrunkIndexKey = new StepKey(phase, NAME, ShrunkenIndexCheckStep.NAME); StepKey isShrunkIndexKey = new StepKey(phase, NAME, ShrunkenIndexCheckStep.NAME);
StepKey replaceDataStreamIndexKey = new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME);
StepKey deleteIndexKey = new StepKey(phase, NAME, DeleteStep.NAME);
BranchingStep conditionalSkipShrinkStep = new BranchingStep(branchingKey, waitForNoFollowerStepKey, nextStepKey, BranchingStep conditionalSkipShrinkStep = new BranchingStep(preShrinkBranchingKey, waitForNoFollowerStepKey, nextStepKey,
(index, clusterState) -> clusterState.getMetadata().index(index).getNumberOfShards() == numberOfShards); getSkipShrinkStepPredicate(numberOfShards));
WaitForNoFollowersStep waitForNoFollowersStep = new WaitForNoFollowersStep(waitForNoFollowerStepKey, readOnlyKey, client); WaitForNoFollowersStep waitForNoFollowersStep = new WaitForNoFollowersStep(waitForNoFollowerStepKey, readOnlyKey, client);
UpdateSettingsStep readOnlyStep = new UpdateSettingsStep(readOnlyKey, setSingleNodeKey, client, readOnlySettings); UpdateSettingsStep readOnlyStep = new UpdateSettingsStep(readOnlyKey, setSingleNodeKey, client, readOnlySettings);
SetSingleNodeAllocateStep setSingleNodeStep = new SetSingleNodeAllocateStep(setSingleNodeKey, allocationRoutedKey, client); SetSingleNodeAllocateStep setSingleNodeStep = new SetSingleNodeAllocateStep(setSingleNodeKey, allocationRoutedKey, client);
CheckShrinkReadyStep checkShrinkReadyStep = new CheckShrinkReadyStep(allocationRoutedKey, shrinkKey); CheckShrinkReadyStep checkShrinkReadyStep = new CheckShrinkReadyStep(allocationRoutedKey, shrinkKey);
ShrinkStep shrink = new ShrinkStep(shrinkKey, enoughShardsKey, client, numberOfShards, SHRUNKEN_INDEX_PREFIX); ShrinkStep shrink = new ShrinkStep(shrinkKey, enoughShardsKey, client, numberOfShards, SHRUNKEN_INDEX_PREFIX);
ShrunkShardsAllocatedStep allocated = new ShrunkShardsAllocatedStep(enoughShardsKey, copyMetadataKey, SHRUNKEN_INDEX_PREFIX); ShrunkShardsAllocatedStep allocated = new ShrunkShardsAllocatedStep(enoughShardsKey, copyMetadataKey, SHRUNKEN_INDEX_PREFIX);
CopyExecutionStateStep copyMetadata = new CopyExecutionStateStep(copyMetadataKey, aliasKey, SHRUNKEN_INDEX_PREFIX, CopyExecutionStateStep copyMetadata = new CopyExecutionStateStep(copyMetadataKey, dataStreamCheckBranchingKey,
ShrunkenIndexCheckStep.NAME); SHRUNKEN_INDEX_PREFIX, ShrunkenIndexCheckStep.NAME);
// by the time we get to this step we have 2 indices, the source and the shrunken one. we now need to choose an index
// swapping strategy such that the shrunken index takes the place of the source index (which is also deleted).
// if the source index is part of a data stream it's a matter of replacing it with the shrunken index one in the data stream and
// then deleting the source index; otherwise we'll use the alias management api to atomically transfer the aliases from source to
// the shrunken index and delete the source
BranchingStep isDataStreamBranchingStep = new BranchingStep(dataStreamCheckBranchingKey, aliasKey, replaceDataStreamIndexKey,
(index, clusterState) -> {
IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(index.getName());
assert indexAbstraction != null : "invalid cluster metadata. index [" + index.getName() + "] was not found";
return indexAbstraction.getParentDataStream() != null;
});
ShrinkSetAliasStep aliasSwapAndDelete = new ShrinkSetAliasStep(aliasKey, isShrunkIndexKey, client, SHRUNKEN_INDEX_PREFIX); ShrinkSetAliasStep aliasSwapAndDelete = new ShrinkSetAliasStep(aliasKey, isShrunkIndexKey, client, SHRUNKEN_INDEX_PREFIX);
ReplaceDataStreamBackingIndexStep replaceDataStreamBackingIndex = new ReplaceDataStreamBackingIndexStep(replaceDataStreamIndexKey,
deleteIndexKey, SHRUNKEN_INDEX_PREFIX);
DeleteStep deleteSourceIndexStep = new DeleteStep(deleteIndexKey, isShrunkIndexKey, client);
ShrunkenIndexCheckStep waitOnShrinkTakeover = new ShrunkenIndexCheckStep(isShrunkIndexKey, nextStepKey, SHRUNKEN_INDEX_PREFIX); ShrunkenIndexCheckStep waitOnShrinkTakeover = new ShrunkenIndexCheckStep(isShrunkIndexKey, nextStepKey, SHRUNKEN_INDEX_PREFIX);
return Arrays.asList(conditionalSkipShrinkStep, waitForNoFollowersStep, readOnlyStep, setSingleNodeStep, checkShrinkReadyStep, return Arrays.asList(conditionalSkipShrinkStep, waitForNoFollowersStep, readOnlyStep, setSingleNodeStep, checkShrinkReadyStep,
shrink, allocated, copyMetadata, aliasSwapAndDelete, waitOnShrinkTakeover); shrink, allocated, copyMetadata, isDataStreamBranchingStep, aliasSwapAndDelete, waitOnShrinkTakeover,
replaceDataStreamBackingIndex, deleteSourceIndexStep);
}
static BiPredicate<Index, ClusterState> getSkipShrinkStepPredicate(int targetNumberOfShards) {
return (index, clusterState) -> {
IndexMetadata indexMetadata = clusterState.getMetadata().index(index);
if (indexMetadata == null) {
// Index must have been since deleted, skip the shrink action
logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", NAME, index.getName());
return true;
}
if (indexMetadata.getNumberOfShards() == targetNumberOfShards) {
logger.debug("skipping [{}] lifecycle action for index [{}] because the index already has the target number of shards",
NAME, index.getName());
return true;
}
IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(index.getName());
assert indexAbstraction != null : "invalid cluster metadata. index [" + index.getName() + "] was not found";
if (indexAbstraction.getParentDataStream() != null) {
IndexAbstraction.DataStream dataStream = indexAbstraction.getParentDataStream();
assert dataStream.getWriteIndex() != null : dataStream.getName() + " has no write index";
if (dataStream.getWriteIndex().getIndex().getName().equals(index.getName())) {
String policyName = indexMetadata.getSettings().get(LifecycleSettings.LIFECYCLE_NAME);
String errorMessage = String.format(Locale.ROOT,
"index [%s] is the write index for data stream [%s]. stopping execution of lifecycle [%s] as a data stream's " +
"write index cannot be shrunk. manually rolling over the index will resume the execution of the policy " +
"as the index will not be the data stream's write index anymore",
index.getName(), dataStream.getName(), policyName);
logger.debug(errorMessage);
throw new IllegalStateException(errorMessage);
}
}
return false;
};
} }
@Override @Override

View File

@ -0,0 +1,164 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ilm;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.index.Index;
import java.util.List;
import java.util.UUID;
import static org.elasticsearch.xpack.core.ilm.AbstractStepMasterTimeoutTestCase.emptyClusterState;
import static org.hamcrest.Matchers.is;
public class ReplaceDataStreamBackingIndexStepTests extends AbstractStepTestCase<ReplaceDataStreamBackingIndexStep> {
@Override
protected ReplaceDataStreamBackingIndexStep createRandomInstance() {
return new ReplaceDataStreamBackingIndexStep(randomStepKey(), randomStepKey(), randomAlphaOfLengthBetween(1, 10));
}
@Override
protected ReplaceDataStreamBackingIndexStep mutateInstance(ReplaceDataStreamBackingIndexStep instance) {
Step.StepKey key = instance.getKey();
Step.StepKey nextKey = instance.getNextStepKey();
String indexPrefix = instance.getTargetIndexPrefix();
switch (between(0, 2)) {
case 0:
key = new Step.StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
break;
case 1:
nextKey = new Step.StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
break;
case 2:
indexPrefix = randomValueOtherThan(indexPrefix, () -> randomAlphaOfLengthBetween(1, 10));
break;
default:
throw new AssertionError("Illegal randomisation branch");
}
return new ReplaceDataStreamBackingIndexStep(key, nextKey, indexPrefix);
}
@Override
protected ReplaceDataStreamBackingIndexStep copyInstance(ReplaceDataStreamBackingIndexStep instance) {
return new ReplaceDataStreamBackingIndexStep(instance.getKey(), instance.getNextStepKey(), instance.getTargetIndexPrefix());
}
public void testPerformActionThrowsExceptionIfIndexIsNotPartOfDataStream() {
String indexName = randomAlphaOfLength(10);
String policyName = "test-ilm-policy";
IndexMetadata sourceIndexMetadata =
IndexMetadata.builder(indexName).settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
ClusterState clusterState = ClusterState.builder(emptyClusterState()).metadata(
Metadata.builder().put(sourceIndexMetadata, true).build()
).build();
expectThrows(IllegalStateException.class,
() -> createRandomInstance().performAction(sourceIndexMetadata.getIndex(), clusterState));
}
public void testPerformActionThrowsExceptionIfIndexIsTheDataStreamWriteIndex() {
String dataStreamName = randomAlphaOfLength(10);
String indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
String policyName = "test-ilm-policy";
IndexMetadata sourceIndexMetadata =
IndexMetadata.builder(indexName).settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
ClusterState clusterState = ClusterState.builder(emptyClusterState()).metadata(
Metadata.builder().put(sourceIndexMetadata, true).put(new DataStream(dataStreamName, "timestamp",
org.elasticsearch.common.collect.List.of(sourceIndexMetadata.getIndex()))).build()
).build();
expectThrows(IllegalStateException.class,
() -> createRandomInstance().performAction(sourceIndexMetadata.getIndex(), clusterState));
}
public void testPerformActionThrowsExceptionIfTargetIndexIsMissing() {
String dataStreamName = randomAlphaOfLength(10);
String indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
String policyName = "test-ilm-policy";
IndexMetadata sourceIndexMetadata = IndexMetadata.builder(indexName)
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5))
.build();
String writeIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 2);
IndexMetadata writeIndexMetadata = IndexMetadata.builder(writeIndexName)
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5))
.build();
List<Index> backingIndices = org.elasticsearch.common.collect.List
.of(sourceIndexMetadata.getIndex(), writeIndexMetadata.getIndex());
ClusterState clusterState = ClusterState.builder(emptyClusterState()).metadata(
Metadata.builder()
.put(sourceIndexMetadata, true)
.put(writeIndexMetadata, true)
.put(new DataStream(dataStreamName, "timestamp", backingIndices))
.build()
).build();
expectThrows(IllegalStateException.class,
() -> createRandomInstance().performAction(sourceIndexMetadata.getIndex(), clusterState));
}
public void testPerformActionIsNoOpIfIndexIsMissing() {
ClusterState initialState = ClusterState.builder(ClusterName.DEFAULT).build();
Index missingIndex = new Index("missing", UUID.randomUUID().toString());
ReplaceDataStreamBackingIndexStep replaceSourceIndexStep = createRandomInstance();
ClusterState newState = replaceSourceIndexStep.performAction(missingIndex, initialState);
assertThat(newState, is(initialState));
}
public void testPerformAction() {
String dataStreamName = randomAlphaOfLength(10);
String indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
String policyName = "test-ilm-policy";
IndexMetadata sourceIndexMetadata = IndexMetadata.builder(indexName)
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5))
.build();
String writeIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 2);
IndexMetadata writeIndexMetadata = IndexMetadata.builder(writeIndexName)
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5))
.build();
String indexPrefix = "test-prefix-";
String targetIndex = indexPrefix + indexName;
IndexMetadata targetIndexMetadata = IndexMetadata.builder(targetIndex).settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
List<Index> backingIndices = org.elasticsearch.common.collect.List
.of(sourceIndexMetadata.getIndex(), writeIndexMetadata.getIndex());
ClusterState clusterState = ClusterState.builder(emptyClusterState()).metadata(
Metadata.builder()
.put(sourceIndexMetadata, true)
.put(writeIndexMetadata, true)
.put(new DataStream(dataStreamName, "timestamp", backingIndices))
.put(targetIndexMetadata, true)
.build()
).build();
ReplaceDataStreamBackingIndexStep replaceSourceIndexStep =
new ReplaceDataStreamBackingIndexStep(randomStepKey(), randomStepKey(), indexPrefix);
ClusterState newState = replaceSourceIndexStep.performAction(sourceIndexMetadata.getIndex(), clusterState);
DataStream updatedDataStream = newState.metadata().dataStreams().get(dataStreamName);
assertThat(updatedDataStream.getIndices().size(), is(2));
assertThat(updatedDataStream.getIndices().get(0), is(targetIndexMetadata.getIndex()));
}
}

View File

@ -8,18 +8,23 @@ package org.elasticsearch.xpack.core.ilm;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.UUID;
import static org.elasticsearch.xpack.core.ilm.ShrinkAction.getSkipShrinkStepPredicate;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class ShrinkActionTests extends AbstractActionTestCase<ShrinkAction> { public class ShrinkActionTests extends AbstractActionTestCase<ShrinkAction> {
@ -116,18 +121,66 @@ public class ShrinkActionTests extends AbstractActionTestCase<ShrinkAction> {
.setStepTime(0L) .setStepTime(0L)
.build().asMap()) .build().asMap())
.numberOfShards(numShards).numberOfReplicas(0))).build(); .numberOfShards(numShards).numberOfReplicas(0))).build();
ClusterState newState = step.performAction(state.metadata().index(indexName).getIndex(), state); step.performAction(state.metadata().index(indexName).getIndex(), state);
assertThat(step.getNextStepKey(), equalTo(steps.get(1).getKey())); assertThat(step.getNextStepKey(), equalTo(steps.get(1).getKey()));
} }
public void testNoOpShrinkDoesntFailOnDataStreamWriteIndex() {
String dataStreamName = randomAlphaOfLength(10);
String indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
String policyName = "test-ilm-policy";
IndexMetadata sourceIndexMetadata = IndexMetadata.builder(indexName)
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName))
.numberOfShards(1).numberOfReplicas(1)
.build();
List<Index> backingIndices = org.elasticsearch.common.collect.List.of(sourceIndexMetadata.getIndex());
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(
Metadata.builder()
.put(sourceIndexMetadata, true)
.put(new DataStream(dataStreamName, "timestamp", backingIndices))
.build()
).build();
boolean skipShrink = getSkipShrinkStepPredicate(1).test(sourceIndexMetadata.getIndex(), clusterState);
assertThat("shrink is skipped even though it is applied to a data stream's write index because it would be a no-op",
skipShrink, is(true));
}
public void testShrinkFailsOnDataStreamWriteIndex() {
String dataStreamName = randomAlphaOfLength(10);
String indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
String policyName = "test-ilm-policy";
IndexMetadata sourceIndexMetadata = IndexMetadata.builder(indexName)
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName))
.numberOfShards(5).numberOfReplicas(1)
.build();
List<Index> backingIndices = org.elasticsearch.common.collect.List.of(sourceIndexMetadata.getIndex());
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(
Metadata.builder()
.put(sourceIndexMetadata, true)
.put(new DataStream(dataStreamName, "timestamp", backingIndices))
.build()
).build();
expectThrows(IllegalStateException.class, () -> getSkipShrinkStepPredicate(1).test(sourceIndexMetadata.getIndex(), clusterState));
}
public void testShrinkSkipIfIndexIsDeleted() {
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).build();
Index missingIndex = new Index("missing", UUID.randomUUID().toString());
assertThat(getSkipShrinkStepPredicate(1).test(missingIndex, clusterState), is(true));
}
public void testToSteps() { public void testToSteps() {
ShrinkAction action = createTestInstance(); ShrinkAction action = createTestInstance();
String phase = randomAlphaOfLengthBetween(1, 10); String phase = randomAlphaOfLengthBetween(1, 10);
StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
randomAlphaOfLengthBetween(1, 10)); randomAlphaOfLengthBetween(1, 10));
List<Step> steps = action.toSteps(null, phase, nextStepKey); List<Step> steps = action.toSteps(null, phase, nextStepKey);
assertThat(steps.size(), equalTo(10)); assertThat(steps.size(), equalTo(13));
StepKey expectedFirstKey = new StepKey(phase, ShrinkAction.NAME, BranchingStep.NAME); StepKey expectedFirstKey = new StepKey(phase, ShrinkAction.NAME, ShrinkAction.CONDITIONAL_SKIP_SHRINK_STEP);
StepKey expectedSecondKey = new StepKey(phase, ShrinkAction.NAME, WaitForNoFollowersStep.NAME); StepKey expectedSecondKey = new StepKey(phase, ShrinkAction.NAME, WaitForNoFollowersStep.NAME);
StepKey expectedThirdKey = new StepKey(phase, ShrinkAction.NAME, ReadOnlyAction.NAME); StepKey expectedThirdKey = new StepKey(phase, ShrinkAction.NAME, ReadOnlyAction.NAME);
StepKey expectedFourthKey = new StepKey(phase, ShrinkAction.NAME, SetSingleNodeAllocateStep.NAME); StepKey expectedFourthKey = new StepKey(phase, ShrinkAction.NAME, SetSingleNodeAllocateStep.NAME);
@ -135,8 +188,11 @@ public class ShrinkActionTests extends AbstractActionTestCase<ShrinkAction> {
StepKey expectedSixthKey = new StepKey(phase, ShrinkAction.NAME, ShrinkStep.NAME); StepKey expectedSixthKey = new StepKey(phase, ShrinkAction.NAME, ShrinkStep.NAME);
StepKey expectedSeventhKey = new StepKey(phase, ShrinkAction.NAME, ShrunkShardsAllocatedStep.NAME); StepKey expectedSeventhKey = new StepKey(phase, ShrinkAction.NAME, ShrunkShardsAllocatedStep.NAME);
StepKey expectedEighthKey = new StepKey(phase, ShrinkAction.NAME, CopyExecutionStateStep.NAME); StepKey expectedEighthKey = new StepKey(phase, ShrinkAction.NAME, CopyExecutionStateStep.NAME);
StepKey expectedNinthKey = new StepKey(phase, ShrinkAction.NAME, ShrinkSetAliasStep.NAME); StepKey expectedNinthKey = new StepKey(phase, ShrinkAction.NAME, ShrinkAction.CONDITIONAL_DATASTREAM_CHECK_KEY);
StepKey expectedTenthKey = new StepKey(phase, ShrinkAction.NAME, ShrunkenIndexCheckStep.NAME); StepKey expectedTenthKey = new StepKey(phase, ShrinkAction.NAME, ShrinkSetAliasStep.NAME);
StepKey expectedEleventhKey = new StepKey(phase, ShrinkAction.NAME, ShrunkenIndexCheckStep.NAME);
StepKey expectedTwelveKey = new StepKey(phase, ShrinkAction.NAME, ReplaceDataStreamBackingIndexStep.NAME);
StepKey expectedThirteenKey = new StepKey(phase, ShrinkAction.NAME, DeleteStep.NAME);
assertTrue(steps.get(0) instanceof BranchingStep); assertTrue(steps.get(0) instanceof BranchingStep);
assertThat(steps.get(0).getKey(), equalTo(expectedFirstKey)); assertThat(steps.get(0).getKey(), equalTo(expectedFirstKey));
@ -176,15 +232,30 @@ public class ShrinkActionTests extends AbstractActionTestCase<ShrinkAction> {
assertThat(steps.get(7).getNextStepKey(), equalTo(expectedNinthKey)); assertThat(steps.get(7).getNextStepKey(), equalTo(expectedNinthKey));
assertThat(((CopyExecutionStateStep) steps.get(7)).getTargetIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX)); assertThat(((CopyExecutionStateStep) steps.get(7)).getTargetIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
assertTrue(steps.get(8) instanceof ShrinkSetAliasStep); assertTrue(steps.get(8) instanceof BranchingStep);
assertThat(steps.get(8).getKey(), equalTo(expectedNinthKey)); assertThat(steps.get(8).getKey(), equalTo(expectedNinthKey));
assertThat(steps.get(8).getNextStepKey(), equalTo(expectedTenthKey)); expectThrows(IllegalStateException.class, () -> steps.get(8).getNextStepKey());
assertThat(((ShrinkSetAliasStep) steps.get(8)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX)); assertThat(((BranchingStep) steps.get(8)).getNextStepKeyOnFalse(), equalTo(expectedTenthKey));
assertThat(((BranchingStep) steps.get(8)).getNextStepKeyOnTrue(), equalTo(expectedTwelveKey));
assertTrue(steps.get(9) instanceof ShrunkenIndexCheckStep); assertTrue(steps.get(9) instanceof ShrinkSetAliasStep);
assertThat(steps.get(9).getKey(), equalTo(expectedTenthKey)); assertThat(steps.get(9).getKey(), equalTo(expectedTenthKey));
assertThat(steps.get(9).getNextStepKey(), equalTo(nextStepKey)); assertThat(steps.get(9).getNextStepKey(), equalTo(expectedEleventhKey));
assertThat(((ShrunkenIndexCheckStep) steps.get(9)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX)); assertThat(((ShrinkSetAliasStep) steps.get(9)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
assertTrue(steps.get(10) instanceof ShrunkenIndexCheckStep);
assertThat(steps.get(10).getKey(), equalTo(expectedEleventhKey));
assertThat(steps.get(10).getNextStepKey(), equalTo(nextStepKey));
assertThat(((ShrunkenIndexCheckStep) steps.get(10)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
assertTrue(steps.get(11) instanceof ReplaceDataStreamBackingIndexStep);
assertThat(steps.get(11).getKey(), equalTo(expectedTwelveKey));
assertThat(steps.get(11).getNextStepKey(), equalTo(expectedThirteenKey));
assertThat(((ReplaceDataStreamBackingIndexStep) steps.get(11)).getTargetIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
assertTrue(steps.get(12) instanceof DeleteStep);
assertThat(steps.get(12).getKey(), equalTo(expectedThirteenKey));
assertThat(steps.get(12).getNextStepKey(), equalTo(expectedEleventhKey));
} }
@Override @Override

View File

@ -20,13 +20,20 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.core.ilm.AllocateAction;
import org.elasticsearch.xpack.core.ilm.DeleteAction;
import org.elasticsearch.xpack.core.ilm.ForceMergeAction;
import org.elasticsearch.xpack.core.ilm.LifecycleAction; import org.elasticsearch.xpack.core.ilm.LifecycleAction;
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
import org.elasticsearch.xpack.core.ilm.Phase; import org.elasticsearch.xpack.core.ilm.Phase;
import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
import org.elasticsearch.xpack.core.ilm.Step; import org.elasticsearch.xpack.core.ilm.Step;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.HashMap;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
@ -124,4 +131,42 @@ public final class TimeSeriesRestDriver {
client.performRequest(createIndexTemplateRequest); client.performRequest(createIndexTemplateRequest);
} }
public static void rolloverMaxOneDocCondition(RestClient client, String indexAbstractionName) throws IOException {
Request rolloverRequest = new Request("POST", "/" + indexAbstractionName + "/_rollover");
rolloverRequest.setJsonEntity("{\n" +
" \"conditions\": {\n" +
" \"max_docs\": \"1\"\n" +
" }\n" +
"}"
);
client.performRequest(rolloverRequest);
}
public static void createFullPolicy(RestClient client, String policyName, TimeValue hotTime) throws IOException {
Map<String, LifecycleAction> hotActions = new HashMap<>();
hotActions.put(SetPriorityAction.NAME, new SetPriorityAction(100));
hotActions.put(RolloverAction.NAME, new RolloverAction(null, null, 1L));
Map<String, LifecycleAction> warmActions = new HashMap<>();
warmActions.put(SetPriorityAction.NAME, new SetPriorityAction(50));
warmActions.put(ForceMergeAction.NAME, new ForceMergeAction(1, null));
warmActions.put(AllocateAction.NAME, new AllocateAction(1, singletonMap("_name", "integTest-1,integTest-2"), null, null));
warmActions.put(ShrinkAction.NAME, new ShrinkAction(1));
Map<String, LifecycleAction> coldActions = new HashMap<>();
coldActions.put(SetPriorityAction.NAME, new SetPriorityAction(0));
coldActions.put(AllocateAction.NAME, new AllocateAction(0, singletonMap("_name", "integTest-3"), null, null));
Map<String, Phase> phases = new HashMap<>();
phases.put("hot", new Phase("hot", hotTime, hotActions));
phases.put("warm", new Phase("warm", TimeValue.ZERO, warmActions));
phases.put("cold", new Phase("cold", TimeValue.ZERO, coldActions));
phases.put("delete", new Phase("delete", TimeValue.ZERO, singletonMap(DeleteAction.NAME, new DeleteAction())));
LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policyName, phases);
// PUT policy
XContentBuilder builder = jsonBuilder();
lifecyclePolicy.toXContent(builder, null);
final StringEntity entity = new StringEntity(
"{ \"policy\":" + Strings.toString(builder) + "}", ContentType.APPLICATION_JSON);
Request request = new Request("PUT", "_ilm/policy/" + policyName);
request.setEntity(entity);
client.performRequest(request);
}
} }

View File

@ -6,22 +6,35 @@
package org.elasticsearch.xpack.ilm; package org.elasticsearch.xpack.ilm;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep; import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
import org.elasticsearch.xpack.core.ilm.RolloverAction; import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createComposableTemplate; import static org.elasticsearch.xpack.TimeSeriesRestDriver.createComposableTemplate;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createFullPolicy;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy; import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.explainIndex;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex; import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument; import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.rolloverMaxOneDocCondition;
import static org.elasticsearch.xpack.core.ilm.ShrinkAction.CONDITIONAL_SKIP_SHRINK_STEP;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
public class TimeSeriesDataStreamsIT extends ESRestTestCase { public class TimeSeriesDataStreamsIT extends ESRestTestCase {
private static final String FAILED_STEP_RETRY_COUNT_FIELD = "failed_step_retry_count";
public void testRolloverAction() throws Exception { public void testRolloverAction() throws Exception {
String policyName = "logs-policy"; String policyName = "logs-policy";
createNewSingletonPolicy(client(), policyName, "hot", new RolloverAction(null, null, 1L)); createNewSingletonPolicy(client(), policyName, "hot", new RolloverAction(null, null, 1L));
@ -40,4 +53,56 @@ public class TimeSeriesDataStreamsIT extends ESRestTestCase {
equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); equalTo(PhaseCompleteStep.finalStep("hot").getKey())));
} }
public void testShrinkActionInPolicyWithoutHotPhase() throws Exception {
String policyName = "logs-policy";
createNewSingletonPolicy(client(), policyName, "warm", new ShrinkAction(1));
Settings settings = Settings.builder()
.put(LifecycleSettings.LIFECYCLE_NAME, policyName)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3)
.build();
Template template = new Template(settings, null, null);
createComposableTemplate(client(), "logs-template", "logs-foo*", template);
String dataStream = "logs-foo";
indexDocument(client(), dataStream, true);
String backingIndexName = DataStream.getDefaultBackingIndexName(dataStream, 1);
String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + backingIndexName;
assertBusy(() -> assertThat(
"original index must wait in the " + CONDITIONAL_SKIP_SHRINK_STEP + " until it is not the write index anymore",
(Integer) explainIndex(client(), backingIndexName).get(FAILED_STEP_RETRY_COUNT_FIELD), greaterThanOrEqualTo(1)),
30, TimeUnit.SECONDS);
// Manual rollover the original index such that it's not the write index in the data stream anymore
rolloverMaxOneDocCondition(client(), dataStream);
assertBusy(() -> assertTrue(indexExists(shrunkenIndex)), 30, TimeUnit.SECONDS);
assertBusy(() -> assertThat(getStepKeyForIndex(client(), shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey())));
assertThat("the original index must've been deleted", indexExists(backingIndexName), is(false));
}
public void testShrinkAfterRollover() throws Exception {
String policyName = "logs-policy";
createFullPolicy(client(), policyName, TimeValue.ZERO);
Settings settings = Settings.builder()
.put(LifecycleSettings.LIFECYCLE_NAME, policyName)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3)
.build();
Template template = new Template(settings, null, null);
createComposableTemplate(client(), "logs-template", "logs-foo*", template);
String dataStream = "logs-foo";
indexDocument(client(), dataStream, true);
String backingIndexName = DataStream.getDefaultBackingIndexName(dataStream, 1);
String rolloverIndex = DataStream.getDefaultBackingIndexName(dataStream, 2);
String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + backingIndexName;
assertBusy(() -> assertTrue("the rollover action created the rollover index", indexExists(rolloverIndex)));
assertBusy(() -> assertFalse("the original index was deleted by the shrink action", indexExists(backingIndexName)),
60, TimeUnit.SECONDS);
assertBusy(() -> assertFalse("the shrunken index was deleted by the delete action", indexExists(shrunkenIndex)),
30, TimeUnit.SECONDS);
}
} }

View File

@ -67,11 +67,13 @@ import java.util.function.Supplier;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createFullPolicy;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy; import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.explain; import static org.elasticsearch.xpack.TimeSeriesRestDriver.explain;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.explainIndex; import static org.elasticsearch.xpack.TimeSeriesRestDriver.explainIndex;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex; import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument; import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.rolloverMaxOneDocCondition;
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -116,7 +118,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)); .put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias));
// create policy // create policy
createFullPolicy(TimeValue.ZERO); createFullPolicy(client(), policy, TimeValue.ZERO);
// update policy on index // update policy on index
updatePolicy(originalIndex, policy); updatePolicy(originalIndex, policy);
// index document {"foo": "bar"} to trigger rollover // index document {"foo": "bar"} to trigger rollover
@ -144,7 +146,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias")); .put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias"));
// create policy // create policy
createFullPolicy(TimeValue.timeValueHours(10)); createFullPolicy(client(), policy, TimeValue.timeValueHours(10));
// update policy on index // update policy on index
updatePolicy(originalIndex, policy); updatePolicy(originalIndex, policy);
@ -177,7 +179,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
.put("index.routing.allocation.include._name", "integTest-0") .put("index.routing.allocation.include._name", "integTest-0")
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)); .put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias));
createFullPolicy(TimeValue.timeValueHours(10)); createFullPolicy(client(), policy, TimeValue.timeValueHours(10));
// update policy on index // update policy on index
updatePolicy(originalIndex, policy); updatePolicy(originalIndex, policy);
@ -1041,7 +1043,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
String nonexistantPolicyIndex = index + "-nonexistant-policy"; String nonexistantPolicyIndex = index + "-nonexistant-policy";
String unmanagedIndex = index + "-unmanaged"; String unmanagedIndex = index + "-unmanaged";
createFullPolicy(TimeValue.ZERO); createFullPolicy(client(), policy, TimeValue.ZERO);
{ {
// Create a "shrink-only-policy" // Create a "shrink-only-policy"
@ -1094,7 +1096,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
} }
public void testExplainIndexContainsAutomaticRetriesInformation() throws Exception { public void testExplainIndexContainsAutomaticRetriesInformation() throws Exception {
createFullPolicy(TimeValue.ZERO); createFullPolicy(client(), policy, TimeValue.ZERO);
// create index without alias so the rollover action fails and is retried // create index without alias so the rollover action fails and is retried
createIndexWithSettingsNoAlias(index, Settings.builder() createIndexWithSettingsNoAlias(index, Settings.builder()
@ -1174,14 +1176,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
client().performRequest(refreshOriginalIndex); client().performRequest(refreshOriginalIndex);
// Manual rollover // Manual rollover
Request rolloverRequest = new Request("POST", "/" + alias + "/_rollover"); rolloverMaxOneDocCondition(client(), alias);
rolloverRequest.setJsonEntity("{\n" +
" \"conditions\": {\n" +
" \"max_docs\": \"1\"\n" +
" }\n" +
"}"
);
client().performRequest(rolloverRequest);
assertBusy(() -> assertTrue(indexExists(secondIndex))); assertBusy(() -> assertTrue(indexExists(secondIndex)));
// Index another document into the original index so the ILM rollover policy condition is met // Index another document into the original index so the ILM rollover policy condition is met
@ -1331,14 +1326,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
// manual rollover the index so the "update-rollover-lifecycle-date" ILM step can continue and finish successfully as the index // manual rollover the index so the "update-rollover-lifecycle-date" ILM step can continue and finish successfully as the index
// will have rollover information now // will have rollover information now
Request rolloverRequest = new Request("POST", "/" + alias + "/_rollover"); rolloverMaxOneDocCondition(client(), alias);
rolloverRequest.setJsonEntity("{\n" +
" \"conditions\": {\n" +
" \"max_docs\": \"1\"\n" +
" }\n" +
"}"
);
client().performRequest(rolloverRequest);
assertBusy(() -> assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); assertBusy(() -> assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep("hot").getKey())));
} }
@ -1813,34 +1801,6 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
assertEquals(WaitForRolloverReadyStep.NAME, stepKey.getName()); assertEquals(WaitForRolloverReadyStep.NAME, stepKey.getName());
} }
private void createFullPolicy(TimeValue hotTime) throws IOException {
Map<String, LifecycleAction> hotActions = new HashMap<>();
hotActions.put(SetPriorityAction.NAME, new SetPriorityAction(100));
hotActions.put(RolloverAction.NAME, new RolloverAction(null, null, 1L));
Map<String, LifecycleAction> warmActions = new HashMap<>();
warmActions.put(SetPriorityAction.NAME, new SetPriorityAction(50));
warmActions.put(ForceMergeAction.NAME, new ForceMergeAction(1, null));
warmActions.put(AllocateAction.NAME, new AllocateAction(1, singletonMap("_name", "integTest-1,integTest-2"), null, null));
warmActions.put(ShrinkAction.NAME, new ShrinkAction(1));
Map<String, LifecycleAction> coldActions = new HashMap<>();
coldActions.put(SetPriorityAction.NAME, new SetPriorityAction(0));
coldActions.put(AllocateAction.NAME, new AllocateAction(0, singletonMap("_name", "integTest-3"), null, null));
Map<String, Phase> phases = new HashMap<>();
phases.put("hot", new Phase("hot", hotTime, hotActions));
phases.put("warm", new Phase("warm", TimeValue.ZERO, warmActions));
phases.put("cold", new Phase("cold", TimeValue.ZERO, coldActions));
phases.put("delete", new Phase("delete", TimeValue.ZERO, singletonMap(DeleteAction.NAME, new DeleteAction())));
LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policy, phases);
// PUT policy
XContentBuilder builder = jsonBuilder();
lifecyclePolicy.toXContent(builder, null);
final StringEntity entity = new StringEntity(
"{ \"policy\":" + Strings.toString(builder) + "}", ContentType.APPLICATION_JSON);
Request request = new Request("PUT", "_ilm/policy/" + policy);
request.setEntity(entity);
assertOK(client().performRequest(request));
}
private void createIndexWithSettingsNoAlias(String index, Settings.Builder settings) throws IOException { private void createIndexWithSettingsNoAlias(String index, Settings.Builder settings) throws IOException {
Request request = new Request("PUT", "/" + index); Request request = new Request("PUT", "/" + index);
request.setJsonEntity("{\n \"settings\": " + Strings.toString(settings.build()) request.setJsonEntity("{\n \"settings\": " + Strings.toString(settings.build())