From 3945712c727b0a397cc2eb6f47a2226e006b6dcd Mon Sep 17 00:00:00 2001 From: Andrei Dan Date: Tue, 9 Jun 2020 19:45:22 +0100 Subject: [PATCH] [7.x] ILM add data stream support to the Shrink action (#57616) (#57884) The shrink action creates a shrunken index with the target number of shards. This makes the shrink action data stream aware. If the ILM managed index is part of a data stream the shrink action will make sure to swap the original managed index with the shrunken one as part of the data stream's backing indices and then delete the original index. (cherry picked from commit 99aeed6acf4ae7cbdd97a3bcfe54c5d37ab7a574) Signed-off-by: Andrei Dan --- .../cluster/metadata/DataStream.java | 25 +++ .../cluster/metadata/DataStreamTests.java | 56 ++++++ .../xpack/core/ilm/BranchingStep.java | 27 +-- .../ReplaceDataStreamBackingIndexStep.java | 117 +++++++++++++ .../xpack/core/ilm/ShrinkAction.java | 77 +++++++- ...eplaceDataStreamBackingIndexStepTests.java | 164 ++++++++++++++++++ .../xpack/core/ilm/ShrinkActionTests.java | 93 ++++++++-- .../xpack/TimeSeriesRestDriver.java | 45 +++++ .../xpack/ilm/TimeSeriesDataStreamsIT.java | 65 +++++++ .../ilm/TimeSeriesLifecycleActionsIT.java | 58 +------ 10 files changed, 650 insertions(+), 77 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ReplaceDataStreamBackingIndexStep.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ReplaceDataStreamBackingIndexStepTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 8439f5f4a82..b3666608c92 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -102,6 +102,31 @@ public final class DataStream extends AbstractDiffable implements To return new DataStream(name, timeStampField, backingIndices, generation); } + /** + * Replaces the specified backing index with a new index and returns a new {@code DataStream} instance with + * the modified backing indices. An {@code IllegalArgumentException} is thrown if the index to be replaced + * is not a backing index for this data stream or if it is the {@code DataStream}'s write index. + * + * @param existingBackingIndex the backing index to be replaced + * @param newBackingIndex the new index that will be part of the {@code DataStream} + * @return new {@code DataStream} instance with backing indices that contain replacement index instead of the specified + * existing index. + */ + public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBackingIndex) { + List backingIndices = new ArrayList<>(indices); + int backingIndexPosition = backingIndices.indexOf(existingBackingIndex); + if (backingIndexPosition == -1) { + throw new IllegalArgumentException(String.format(Locale.ROOT, "index [%s] is not part of data stream [%s] ", + existingBackingIndex.getName(), name)); + } + if (generation == (backingIndexPosition + 1)) { + throw new IllegalArgumentException(String.format(Locale.ROOT, "cannot replace backing index [%s] of data stream [%s] because " + + "it is the write index", existingBackingIndex.getName(), name)); + } + backingIndices.set(backingIndexPosition, newBackingIndex); + return new DataStream(name, timeStampField, backingIndices, generation); + } + /** * Generates the name of the index that conforms to the default naming convention for backing indices * on data streams given the specified data stream name and generation. diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 95edb9ec8df..34d1afb26f0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -108,4 +108,60 @@ public class DataStreamTests extends AbstractSerializingTestCase { String expectedBackingIndexName = String.format(Locale.ROOT, ".ds-%s-%06d", dataStreamName, backingIndexNum); assertThat(defaultBackingIndexName, equalTo(expectedBackingIndexName)); } + + public void testReplaceBackingIndex() { + int numBackingIndices = randomIntBetween(2, 32); + int indexToReplace = randomIntBetween(1, numBackingIndices - 1) - 1; + String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + + List indices = new ArrayList<>(numBackingIndices); + for (int i = 1; i <= numBackingIndices; i++) { + indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, i), UUIDs.randomBase64UUID(random()))); + } + DataStream original = new DataStream(dataStreamName, "@timestamp", indices); + + Index newBackingIndex = new Index("replacement-index", UUIDs.randomBase64UUID(random())); + DataStream updated = original.replaceBackingIndex(indices.get(indexToReplace), newBackingIndex); + assertThat(updated.getName(), equalTo(original.getName())); + assertThat(updated.getGeneration(), equalTo(original.getGeneration())); + assertThat(updated.getTimeStampField(), equalTo(original.getTimeStampField())); + assertThat(updated.getIndices().size(), equalTo(numBackingIndices)); + assertThat(updated.getIndices().get(indexToReplace), equalTo(newBackingIndex)); + + for (int i = 0; i < numBackingIndices; i++) { + if (i != indexToReplace) { + assertThat(updated.getIndices().get(i), equalTo(original.getIndices().get(i))); + } + } + } + + public void testReplaceBackingIndexThrowsExceptionIfIndexNotPartOfDataStream() { + int numBackingIndices = randomIntBetween(2, 32); + String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + + List indices = new ArrayList<>(numBackingIndices); + for (int i = 1; i <= numBackingIndices; i++) { + indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, i), UUIDs.randomBase64UUID(random()))); + } + DataStream original = new DataStream(dataStreamName, "@timestamp", indices); + + Index standaloneIndex = new Index("index-foo", UUIDs.randomBase64UUID(random())); + Index newBackingIndex = new Index("replacement-index", UUIDs.randomBase64UUID(random())); + expectThrows(IllegalArgumentException.class, () -> original.replaceBackingIndex(standaloneIndex, newBackingIndex)); + } + + public void testReplaceBackingIndexThrowsExceptionIfReplacingWriteIndex() { + int numBackingIndices = randomIntBetween(2, 32); + int writeIndexPosition = numBackingIndices - 1; + String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + + List indices = new ArrayList<>(numBackingIndices); + for (int i = 1; i <= numBackingIndices; i++) { + indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, i), UUIDs.randomBase64UUID(random()))); + } + DataStream original = new DataStream(dataStreamName, "@timestamp", indices); + + Index newBackingIndex = new Index("replacement-index", UUIDs.randomBase64UUID(random())); + expectThrows(IllegalArgumentException.class, () -> original.replaceBackingIndex(indices.get(writeIndexPosition), newBackingIndex)); + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/BranchingStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/BranchingStep.java index e3f93340a04..1e8ce2fcfee 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/BranchingStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/BranchingStep.java @@ -49,17 +49,22 @@ public class BranchingStep extends ClusterStateActionStep { this.predicateValue = new SetOnce<>(); } - @Override - public ClusterState performAction(Index index, ClusterState clusterState) { - IndexMetadata indexMetadata = clusterState.metadata().index(index); - if (indexMetadata == null) { - // Index must have been since deleted, ignore it - logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", getKey().getAction(), index.getName()); - return clusterState; - } - predicateValue.set(predicate.test(index, clusterState)); - return clusterState; - } + @Override + public boolean isRetryable() { + return true; + } + + @Override + public ClusterState performAction(Index index, ClusterState clusterState) { + IndexMetadata indexMetadata = clusterState.metadata().index(index); + if (indexMetadata == null) { + // Index must have been since deleted, ignore it + logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", getKey().getAction(), index.getName()); + return clusterState; + } + predicateValue.set(predicate.test(index, clusterState)); + return clusterState; + } /** * This method returns the next step to execute based on the predicate. If diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ReplaceDataStreamBackingIndexStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ReplaceDataStreamBackingIndexStep.java new file mode 100644 index 00000000000..01737ed8666 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ReplaceDataStreamBackingIndexStep.java @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ilm; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexAbstraction; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.index.Index; + +import java.util.Locale; +import java.util.Objects; + +/** + * This step replaces a data stream backing index with the target index, as part of the data stream's backing indices. + * Eg. if data stream `foo-stream` is backed by indices [`foo-stream-000001`, `foo-stream-000002`] and we'd like to replace the first + * generation index, `foo-stream-000001`, with `shrink-foo-stream-000001`, after this step the `foo-stream` data stream will contain + * the following indices + *

+ * [`shrink-foo-stream-000001`, `foo-stream-000002`] + *

+ * The `foo-stream-000001` index will continue to exist but will not be part of the data stream anymore. + *

+ * As the last generation is the write index of the data stream, replacing the last generation index is not allowed. + *

+ * This is useful in scenarios following a restore from snapshot operation where the restored index will take the place of the source + * index in the ILM lifecycle or in the case where we shrink an index and the shrunk index will take the place of the original index. + */ +public class ReplaceDataStreamBackingIndexStep extends ClusterStateActionStep { + public static final String NAME = "replace-datastream-backing-index"; + private static final Logger logger = LogManager.getLogger(ReplaceDataStreamBackingIndexStep.class); + + private final String targetIndexPrefix; + + public ReplaceDataStreamBackingIndexStep(StepKey key, StepKey nextStepKey, String targetIndexPrefix) { + super(key, nextStepKey); + this.targetIndexPrefix = targetIndexPrefix; + } + + @Override + public boolean isRetryable() { + return true; + } + + public String getTargetIndexPrefix() { + return targetIndexPrefix; + } + + @Override + public ClusterState performAction(Index index, ClusterState clusterState) { + String originalIndex = index.getName(); + final String targetIndexName = targetIndexPrefix + originalIndex; + + IndexMetadata originalIndexMetadata = clusterState.metadata().index(index); + if (originalIndexMetadata == null) { + // Index must have been since deleted, skip the shrink action + logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", NAME, index.getName()); + return clusterState; + } + + String policyName = originalIndexMetadata.getSettings().get(LifecycleSettings.LIFECYCLE_NAME); + IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(index.getName()); + assert indexAbstraction != null : "invalid cluster metadata. index [" + index.getName() + "] was not found"; + IndexAbstraction.DataStream dataStream = indexAbstraction.getParentDataStream(); + if (dataStream == null) { + String errorMessage = String.format(Locale.ROOT, "index [%s] is not part of a data stream. stopping execution of lifecycle " + + "[%s] until the index is added to a data stream", originalIndex, policyName); + logger.debug(errorMessage); + throw new IllegalStateException(errorMessage); + } + + assert dataStream.getWriteIndex() != null : dataStream.getName() + " has no write index"; + if (dataStream.getWriteIndex().getIndex().getName().equals(originalIndex)) { + String errorMessage = String.format(Locale.ROOT, "index [%s] is the write index for data stream [%s]. stopping execution of " + + "lifecycle [%s] as a data stream's write index cannot be replaced. manually rolling over the index will resume the " + + "execution of the policy as the index will not be the data stream's write index anymore", originalIndex, + dataStream.getName(), policyName); + logger.debug(errorMessage); + throw new IllegalStateException(errorMessage); + } + + IndexMetadata targetIndexMetadata = clusterState.metadata().index(targetIndexName); + if (targetIndexMetadata == null) { + String errorMessage = String.format(Locale.ROOT, "target index [%s] doesn't exist. stopping execution of lifecycle [%s] for" + + " index [%s]", targetIndexName, policyName, originalIndex); + logger.debug(errorMessage); + throw new IllegalStateException(errorMessage); + } + + Metadata.Builder newMetaData = Metadata.builder(clusterState.getMetadata()) + .put(dataStream.getDataStream().replaceBackingIndex(index, targetIndexMetadata.getIndex())); + return ClusterState.builder(clusterState).metadata(newMetaData).build(); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), targetIndexPrefix); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + ReplaceDataStreamBackingIndexStep other = (ReplaceDataStreamBackingIndexStep) obj; + return super.equals(obj) && + Objects.equals(targetIndexPrefix, other.targetIndexPrefix); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ShrinkAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ShrinkAction.java index 62a09f027c7..a3e439788fd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ShrinkAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ShrinkAction.java @@ -5,7 +5,11 @@ */ package org.elasticsearch.xpack.core.ilm; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; @@ -15,12 +19,15 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.Index; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Locale; import java.util.Objects; +import java.util.function.BiPredicate; /** * A {@link LifecycleAction} which shrinks the index. @@ -29,6 +36,10 @@ public class ShrinkAction implements LifecycleAction { public static final String NAME = "shrink"; public static final String SHRUNKEN_INDEX_PREFIX = "shrink-"; public static final ParseField NUMBER_OF_SHARDS_FIELD = new ParseField("number_of_shards"); + public static final String CONDITIONAL_SKIP_SHRINK_STEP = BranchingStep.NAME + "-check-prerequisites"; + public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep.NAME + "-on-datastream-check"; + + private static final Logger logger = LogManager.getLogger(ShrinkAction.class); private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, a -> new ShrinkAction((Integer) a[0])); @@ -85,7 +96,7 @@ public class ShrinkAction implements LifecycleAction { public List toSteps(Client client, String phase, Step.StepKey nextStepKey) { Settings readOnlySettings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true).build(); - StepKey branchingKey = new StepKey(phase, NAME, BranchingStep.NAME); + StepKey preShrinkBranchingKey = new StepKey(phase, NAME, CONDITIONAL_SKIP_SHRINK_STEP); StepKey waitForNoFollowerStepKey = new StepKey(phase, NAME, WaitForNoFollowersStep.NAME); StepKey readOnlyKey = new StepKey(phase, NAME, ReadOnlyAction.NAME); StepKey setSingleNodeKey = new StepKey(phase, NAME, SetSingleNodeAllocateStep.NAME); @@ -93,23 +104,77 @@ public class ShrinkAction implements LifecycleAction { StepKey shrinkKey = new StepKey(phase, NAME, ShrinkStep.NAME); StepKey enoughShardsKey = new StepKey(phase, NAME, ShrunkShardsAllocatedStep.NAME); StepKey copyMetadataKey = new StepKey(phase, NAME, CopyExecutionStateStep.NAME); + StepKey dataStreamCheckBranchingKey = new StepKey(phase, NAME, CONDITIONAL_DATASTREAM_CHECK_KEY); StepKey aliasKey = new StepKey(phase, NAME, ShrinkSetAliasStep.NAME); StepKey isShrunkIndexKey = new StepKey(phase, NAME, ShrunkenIndexCheckStep.NAME); + StepKey replaceDataStreamIndexKey = new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME); + StepKey deleteIndexKey = new StepKey(phase, NAME, DeleteStep.NAME); - BranchingStep conditionalSkipShrinkStep = new BranchingStep(branchingKey, waitForNoFollowerStepKey, nextStepKey, - (index, clusterState) -> clusterState.getMetadata().index(index).getNumberOfShards() == numberOfShards); + BranchingStep conditionalSkipShrinkStep = new BranchingStep(preShrinkBranchingKey, waitForNoFollowerStepKey, nextStepKey, + getSkipShrinkStepPredicate(numberOfShards)); WaitForNoFollowersStep waitForNoFollowersStep = new WaitForNoFollowersStep(waitForNoFollowerStepKey, readOnlyKey, client); UpdateSettingsStep readOnlyStep = new UpdateSettingsStep(readOnlyKey, setSingleNodeKey, client, readOnlySettings); SetSingleNodeAllocateStep setSingleNodeStep = new SetSingleNodeAllocateStep(setSingleNodeKey, allocationRoutedKey, client); CheckShrinkReadyStep checkShrinkReadyStep = new CheckShrinkReadyStep(allocationRoutedKey, shrinkKey); ShrinkStep shrink = new ShrinkStep(shrinkKey, enoughShardsKey, client, numberOfShards, SHRUNKEN_INDEX_PREFIX); ShrunkShardsAllocatedStep allocated = new ShrunkShardsAllocatedStep(enoughShardsKey, copyMetadataKey, SHRUNKEN_INDEX_PREFIX); - CopyExecutionStateStep copyMetadata = new CopyExecutionStateStep(copyMetadataKey, aliasKey, SHRUNKEN_INDEX_PREFIX, - ShrunkenIndexCheckStep.NAME); + CopyExecutionStateStep copyMetadata = new CopyExecutionStateStep(copyMetadataKey, dataStreamCheckBranchingKey, + SHRUNKEN_INDEX_PREFIX, ShrunkenIndexCheckStep.NAME); + // by the time we get to this step we have 2 indices, the source and the shrunken one. we now need to choose an index + // swapping strategy such that the shrunken index takes the place of the source index (which is also deleted). + // if the source index is part of a data stream it's a matter of replacing it with the shrunken index one in the data stream and + // then deleting the source index; otherwise we'll use the alias management api to atomically transfer the aliases from source to + // the shrunken index and delete the source + BranchingStep isDataStreamBranchingStep = new BranchingStep(dataStreamCheckBranchingKey, aliasKey, replaceDataStreamIndexKey, + (index, clusterState) -> { + IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(index.getName()); + assert indexAbstraction != null : "invalid cluster metadata. index [" + index.getName() + "] was not found"; + return indexAbstraction.getParentDataStream() != null; + }); ShrinkSetAliasStep aliasSwapAndDelete = new ShrinkSetAliasStep(aliasKey, isShrunkIndexKey, client, SHRUNKEN_INDEX_PREFIX); + ReplaceDataStreamBackingIndexStep replaceDataStreamBackingIndex = new ReplaceDataStreamBackingIndexStep(replaceDataStreamIndexKey, + deleteIndexKey, SHRUNKEN_INDEX_PREFIX); + DeleteStep deleteSourceIndexStep = new DeleteStep(deleteIndexKey, isShrunkIndexKey, client); ShrunkenIndexCheckStep waitOnShrinkTakeover = new ShrunkenIndexCheckStep(isShrunkIndexKey, nextStepKey, SHRUNKEN_INDEX_PREFIX); return Arrays.asList(conditionalSkipShrinkStep, waitForNoFollowersStep, readOnlyStep, setSingleNodeStep, checkShrinkReadyStep, - shrink, allocated, copyMetadata, aliasSwapAndDelete, waitOnShrinkTakeover); + shrink, allocated, copyMetadata, isDataStreamBranchingStep, aliasSwapAndDelete, waitOnShrinkTakeover, + replaceDataStreamBackingIndex, deleteSourceIndexStep); + } + + static BiPredicate getSkipShrinkStepPredicate(int targetNumberOfShards) { + return (index, clusterState) -> { + IndexMetadata indexMetadata = clusterState.getMetadata().index(index); + if (indexMetadata == null) { + // Index must have been since deleted, skip the shrink action + logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", NAME, index.getName()); + return true; + } + + if (indexMetadata.getNumberOfShards() == targetNumberOfShards) { + logger.debug("skipping [{}] lifecycle action for index [{}] because the index already has the target number of shards", + NAME, index.getName()); + return true; + } + + IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(index.getName()); + assert indexAbstraction != null : "invalid cluster metadata. index [" + index.getName() + "] was not found"; + + if (indexAbstraction.getParentDataStream() != null) { + IndexAbstraction.DataStream dataStream = indexAbstraction.getParentDataStream(); + assert dataStream.getWriteIndex() != null : dataStream.getName() + " has no write index"; + if (dataStream.getWriteIndex().getIndex().getName().equals(index.getName())) { + String policyName = indexMetadata.getSettings().get(LifecycleSettings.LIFECYCLE_NAME); + String errorMessage = String.format(Locale.ROOT, + "index [%s] is the write index for data stream [%s]. stopping execution of lifecycle [%s] as a data stream's " + + "write index cannot be shrunk. manually rolling over the index will resume the execution of the policy " + + "as the index will not be the data stream's write index anymore", + index.getName(), dataStream.getName(), policyName); + logger.debug(errorMessage); + throw new IllegalStateException(errorMessage); + } + } + return false; + }; } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ReplaceDataStreamBackingIndexStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ReplaceDataStreamBackingIndexStepTests.java new file mode 100644 index 00000000000..15e15fcdb82 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ReplaceDataStreamBackingIndexStepTests.java @@ -0,0 +1,164 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ilm; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.index.Index; + +import java.util.List; +import java.util.UUID; + +import static org.elasticsearch.xpack.core.ilm.AbstractStepMasterTimeoutTestCase.emptyClusterState; +import static org.hamcrest.Matchers.is; + +public class ReplaceDataStreamBackingIndexStepTests extends AbstractStepTestCase { + + @Override + protected ReplaceDataStreamBackingIndexStep createRandomInstance() { + return new ReplaceDataStreamBackingIndexStep(randomStepKey(), randomStepKey(), randomAlphaOfLengthBetween(1, 10)); + } + + @Override + protected ReplaceDataStreamBackingIndexStep mutateInstance(ReplaceDataStreamBackingIndexStep instance) { + Step.StepKey key = instance.getKey(); + Step.StepKey nextKey = instance.getNextStepKey(); + String indexPrefix = instance.getTargetIndexPrefix(); + + switch (between(0, 2)) { + case 0: + key = new Step.StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + break; + case 1: + nextKey = new Step.StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + break; + case 2: + indexPrefix = randomValueOtherThan(indexPrefix, () -> randomAlphaOfLengthBetween(1, 10)); + break; + default: + throw new AssertionError("Illegal randomisation branch"); + } + return new ReplaceDataStreamBackingIndexStep(key, nextKey, indexPrefix); + } + + @Override + protected ReplaceDataStreamBackingIndexStep copyInstance(ReplaceDataStreamBackingIndexStep instance) { + return new ReplaceDataStreamBackingIndexStep(instance.getKey(), instance.getNextStepKey(), instance.getTargetIndexPrefix()); + } + + public void testPerformActionThrowsExceptionIfIndexIsNotPartOfDataStream() { + String indexName = randomAlphaOfLength(10); + String policyName = "test-ilm-policy"; + IndexMetadata sourceIndexMetadata = + IndexMetadata.builder(indexName).settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + + ClusterState clusterState = ClusterState.builder(emptyClusterState()).metadata( + Metadata.builder().put(sourceIndexMetadata, true).build() + ).build(); + + expectThrows(IllegalStateException.class, + () -> createRandomInstance().performAction(sourceIndexMetadata.getIndex(), clusterState)); + } + + public void testPerformActionThrowsExceptionIfIndexIsTheDataStreamWriteIndex() { + String dataStreamName = randomAlphaOfLength(10); + String indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); + String policyName = "test-ilm-policy"; + IndexMetadata sourceIndexMetadata = + IndexMetadata.builder(indexName).settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + + ClusterState clusterState = ClusterState.builder(emptyClusterState()).metadata( + Metadata.builder().put(sourceIndexMetadata, true).put(new DataStream(dataStreamName, "timestamp", + org.elasticsearch.common.collect.List.of(sourceIndexMetadata.getIndex()))).build() + ).build(); + + expectThrows(IllegalStateException.class, + () -> createRandomInstance().performAction(sourceIndexMetadata.getIndex(), clusterState)); + } + + public void testPerformActionThrowsExceptionIfTargetIndexIsMissing() { + String dataStreamName = randomAlphaOfLength(10); + String indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); + String policyName = "test-ilm-policy"; + IndexMetadata sourceIndexMetadata = IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)) + .build(); + + String writeIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 2); + IndexMetadata writeIndexMetadata = IndexMetadata.builder(writeIndexName) + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)) + .build(); + + List backingIndices = org.elasticsearch.common.collect.List + .of(sourceIndexMetadata.getIndex(), writeIndexMetadata.getIndex()); + ClusterState clusterState = ClusterState.builder(emptyClusterState()).metadata( + Metadata.builder() + .put(sourceIndexMetadata, true) + .put(writeIndexMetadata, true) + .put(new DataStream(dataStreamName, "timestamp", backingIndices)) + .build() + ).build(); + + expectThrows(IllegalStateException.class, + () -> createRandomInstance().performAction(sourceIndexMetadata.getIndex(), clusterState)); + } + + public void testPerformActionIsNoOpIfIndexIsMissing() { + ClusterState initialState = ClusterState.builder(ClusterName.DEFAULT).build(); + Index missingIndex = new Index("missing", UUID.randomUUID().toString()); + ReplaceDataStreamBackingIndexStep replaceSourceIndexStep = createRandomInstance(); + ClusterState newState = replaceSourceIndexStep.performAction(missingIndex, initialState); + assertThat(newState, is(initialState)); + } + + public void testPerformAction() { + String dataStreamName = randomAlphaOfLength(10); + String indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); + String policyName = "test-ilm-policy"; + IndexMetadata sourceIndexMetadata = IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)) + .build(); + + String writeIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 2); + IndexMetadata writeIndexMetadata = IndexMetadata.builder(writeIndexName) + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)) + .build(); + + String indexPrefix = "test-prefix-"; + String targetIndex = indexPrefix + indexName; + + IndexMetadata targetIndexMetadata = IndexMetadata.builder(targetIndex).settings(settings(Version.CURRENT)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + + List backingIndices = org.elasticsearch.common.collect.List + .of(sourceIndexMetadata.getIndex(), writeIndexMetadata.getIndex()); + ClusterState clusterState = ClusterState.builder(emptyClusterState()).metadata( + Metadata.builder() + .put(sourceIndexMetadata, true) + .put(writeIndexMetadata, true) + .put(new DataStream(dataStreamName, "timestamp", backingIndices)) + .put(targetIndexMetadata, true) + .build() + ).build(); + + ReplaceDataStreamBackingIndexStep replaceSourceIndexStep = + new ReplaceDataStreamBackingIndexStep(randomStepKey(), randomStepKey(), indexPrefix); + ClusterState newState = replaceSourceIndexStep.performAction(sourceIndexMetadata.getIndex(), clusterState); + DataStream updatedDataStream = newState.metadata().dataStreams().get(dataStreamName); + assertThat(updatedDataStream.getIndices().size(), is(2)); + assertThat(updatedDataStream.getIndices().get(0), is(targetIndexMetadata.getIndex())); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ShrinkActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ShrinkActionTests.java index 0e575f8482c..61270eb9f19 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ShrinkActionTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ShrinkActionTests.java @@ -8,18 +8,23 @@ package org.elasticsearch.xpack.core.ilm; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.Index; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.UUID; +import static org.elasticsearch.xpack.core.ilm.ShrinkAction.getSkipShrinkStepPredicate; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; public class ShrinkActionTests extends AbstractActionTestCase { @@ -116,18 +121,66 @@ public class ShrinkActionTests extends AbstractActionTestCase { .setStepTime(0L) .build().asMap()) .numberOfShards(numShards).numberOfReplicas(0))).build(); - ClusterState newState = step.performAction(state.metadata().index(indexName).getIndex(), state); + step.performAction(state.metadata().index(indexName).getIndex(), state); assertThat(step.getNextStepKey(), equalTo(steps.get(1).getKey())); } + public void testNoOpShrinkDoesntFailOnDataStreamWriteIndex() { + String dataStreamName = randomAlphaOfLength(10); + String indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); + String policyName = "test-ilm-policy"; + IndexMetadata sourceIndexMetadata = IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .numberOfShards(1).numberOfReplicas(1) + .build(); + + List backingIndices = org.elasticsearch.common.collect.List.of(sourceIndexMetadata.getIndex()); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata( + Metadata.builder() + .put(sourceIndexMetadata, true) + .put(new DataStream(dataStreamName, "timestamp", backingIndices)) + .build() + ).build(); + + boolean skipShrink = getSkipShrinkStepPredicate(1).test(sourceIndexMetadata.getIndex(), clusterState); + assertThat("shrink is skipped even though it is applied to a data stream's write index because it would be a no-op", + skipShrink, is(true)); + } + + public void testShrinkFailsOnDataStreamWriteIndex() { + String dataStreamName = randomAlphaOfLength(10); + String indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); + String policyName = "test-ilm-policy"; + IndexMetadata sourceIndexMetadata = IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .numberOfShards(5).numberOfReplicas(1) + .build(); + + List backingIndices = org.elasticsearch.common.collect.List.of(sourceIndexMetadata.getIndex()); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata( + Metadata.builder() + .put(sourceIndexMetadata, true) + .put(new DataStream(dataStreamName, "timestamp", backingIndices)) + .build() + ).build(); + + expectThrows(IllegalStateException.class, () -> getSkipShrinkStepPredicate(1).test(sourceIndexMetadata.getIndex(), clusterState)); + } + + public void testShrinkSkipIfIndexIsDeleted() { + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).build(); + Index missingIndex = new Index("missing", UUID.randomUUID().toString()); + assertThat(getSkipShrinkStepPredicate(1).test(missingIndex, clusterState), is(true)); + } + public void testToSteps() { ShrinkAction action = createTestInstance(); String phase = randomAlphaOfLengthBetween(1, 10); StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)); List steps = action.toSteps(null, phase, nextStepKey); - assertThat(steps.size(), equalTo(10)); - StepKey expectedFirstKey = new StepKey(phase, ShrinkAction.NAME, BranchingStep.NAME); + assertThat(steps.size(), equalTo(13)); + StepKey expectedFirstKey = new StepKey(phase, ShrinkAction.NAME, ShrinkAction.CONDITIONAL_SKIP_SHRINK_STEP); StepKey expectedSecondKey = new StepKey(phase, ShrinkAction.NAME, WaitForNoFollowersStep.NAME); StepKey expectedThirdKey = new StepKey(phase, ShrinkAction.NAME, ReadOnlyAction.NAME); StepKey expectedFourthKey = new StepKey(phase, ShrinkAction.NAME, SetSingleNodeAllocateStep.NAME); @@ -135,8 +188,11 @@ public class ShrinkActionTests extends AbstractActionTestCase { StepKey expectedSixthKey = new StepKey(phase, ShrinkAction.NAME, ShrinkStep.NAME); StepKey expectedSeventhKey = new StepKey(phase, ShrinkAction.NAME, ShrunkShardsAllocatedStep.NAME); StepKey expectedEighthKey = new StepKey(phase, ShrinkAction.NAME, CopyExecutionStateStep.NAME); - StepKey expectedNinthKey = new StepKey(phase, ShrinkAction.NAME, ShrinkSetAliasStep.NAME); - StepKey expectedTenthKey = new StepKey(phase, ShrinkAction.NAME, ShrunkenIndexCheckStep.NAME); + StepKey expectedNinthKey = new StepKey(phase, ShrinkAction.NAME, ShrinkAction.CONDITIONAL_DATASTREAM_CHECK_KEY); + StepKey expectedTenthKey = new StepKey(phase, ShrinkAction.NAME, ShrinkSetAliasStep.NAME); + StepKey expectedEleventhKey = new StepKey(phase, ShrinkAction.NAME, ShrunkenIndexCheckStep.NAME); + StepKey expectedTwelveKey = new StepKey(phase, ShrinkAction.NAME, ReplaceDataStreamBackingIndexStep.NAME); + StepKey expectedThirteenKey = new StepKey(phase, ShrinkAction.NAME, DeleteStep.NAME); assertTrue(steps.get(0) instanceof BranchingStep); assertThat(steps.get(0).getKey(), equalTo(expectedFirstKey)); @@ -176,15 +232,30 @@ public class ShrinkActionTests extends AbstractActionTestCase { assertThat(steps.get(7).getNextStepKey(), equalTo(expectedNinthKey)); assertThat(((CopyExecutionStateStep) steps.get(7)).getTargetIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX)); - assertTrue(steps.get(8) instanceof ShrinkSetAliasStep); + assertTrue(steps.get(8) instanceof BranchingStep); assertThat(steps.get(8).getKey(), equalTo(expectedNinthKey)); - assertThat(steps.get(8).getNextStepKey(), equalTo(expectedTenthKey)); - assertThat(((ShrinkSetAliasStep) steps.get(8)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX)); + expectThrows(IllegalStateException.class, () -> steps.get(8).getNextStepKey()); + assertThat(((BranchingStep) steps.get(8)).getNextStepKeyOnFalse(), equalTo(expectedTenthKey)); + assertThat(((BranchingStep) steps.get(8)).getNextStepKeyOnTrue(), equalTo(expectedTwelveKey)); - assertTrue(steps.get(9) instanceof ShrunkenIndexCheckStep); + assertTrue(steps.get(9) instanceof ShrinkSetAliasStep); assertThat(steps.get(9).getKey(), equalTo(expectedTenthKey)); - assertThat(steps.get(9).getNextStepKey(), equalTo(nextStepKey)); - assertThat(((ShrunkenIndexCheckStep) steps.get(9)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX)); + assertThat(steps.get(9).getNextStepKey(), equalTo(expectedEleventhKey)); + assertThat(((ShrinkSetAliasStep) steps.get(9)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX)); + + assertTrue(steps.get(10) instanceof ShrunkenIndexCheckStep); + assertThat(steps.get(10).getKey(), equalTo(expectedEleventhKey)); + assertThat(steps.get(10).getNextStepKey(), equalTo(nextStepKey)); + assertThat(((ShrunkenIndexCheckStep) steps.get(10)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX)); + + assertTrue(steps.get(11) instanceof ReplaceDataStreamBackingIndexStep); + assertThat(steps.get(11).getKey(), equalTo(expectedTwelveKey)); + assertThat(steps.get(11).getNextStepKey(), equalTo(expectedThirteenKey)); + assertThat(((ReplaceDataStreamBackingIndexStep) steps.get(11)).getTargetIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX)); + + assertTrue(steps.get(12) instanceof DeleteStep); + assertThat(steps.get(12).getKey(), equalTo(expectedThirteenKey)); + assertThat(steps.get(12).getNextStepKey(), equalTo(expectedEleventhKey)); } @Override diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java index 96aa70e24a8..37af88ed387 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java @@ -20,13 +20,20 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.xpack.core.ilm.AllocateAction; +import org.elasticsearch.xpack.core.ilm.DeleteAction; +import org.elasticsearch.xpack.core.ilm.ForceMergeAction; import org.elasticsearch.xpack.core.ilm.LifecycleAction; import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; import org.elasticsearch.xpack.core.ilm.Phase; +import org.elasticsearch.xpack.core.ilm.RolloverAction; +import org.elasticsearch.xpack.core.ilm.SetPriorityAction; +import org.elasticsearch.xpack.core.ilm.ShrinkAction; import org.elasticsearch.xpack.core.ilm.Step; import java.io.IOException; import java.io.InputStream; +import java.util.HashMap; import java.util.Locale; import java.util.Map; @@ -124,4 +131,42 @@ public final class TimeSeriesRestDriver { client.performRequest(createIndexTemplateRequest); } + public static void rolloverMaxOneDocCondition(RestClient client, String indexAbstractionName) throws IOException { + Request rolloverRequest = new Request("POST", "/" + indexAbstractionName + "/_rollover"); + rolloverRequest.setJsonEntity("{\n" + + " \"conditions\": {\n" + + " \"max_docs\": \"1\"\n" + + " }\n" + + "}" + ); + client.performRequest(rolloverRequest); + } + + public static void createFullPolicy(RestClient client, String policyName, TimeValue hotTime) throws IOException { + Map hotActions = new HashMap<>(); + hotActions.put(SetPriorityAction.NAME, new SetPriorityAction(100)); + hotActions.put(RolloverAction.NAME, new RolloverAction(null, null, 1L)); + Map warmActions = new HashMap<>(); + warmActions.put(SetPriorityAction.NAME, new SetPriorityAction(50)); + warmActions.put(ForceMergeAction.NAME, new ForceMergeAction(1, null)); + warmActions.put(AllocateAction.NAME, new AllocateAction(1, singletonMap("_name", "integTest-1,integTest-2"), null, null)); + warmActions.put(ShrinkAction.NAME, new ShrinkAction(1)); + Map coldActions = new HashMap<>(); + coldActions.put(SetPriorityAction.NAME, new SetPriorityAction(0)); + coldActions.put(AllocateAction.NAME, new AllocateAction(0, singletonMap("_name", "integTest-3"), null, null)); + Map phases = new HashMap<>(); + phases.put("hot", new Phase("hot", hotTime, hotActions)); + phases.put("warm", new Phase("warm", TimeValue.ZERO, warmActions)); + phases.put("cold", new Phase("cold", TimeValue.ZERO, coldActions)); + phases.put("delete", new Phase("delete", TimeValue.ZERO, singletonMap(DeleteAction.NAME, new DeleteAction()))); + LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policyName, phases); + // PUT policy + XContentBuilder builder = jsonBuilder(); + lifecyclePolicy.toXContent(builder, null); + final StringEntity entity = new StringEntity( + "{ \"policy\":" + Strings.toString(builder) + "}", ContentType.APPLICATION_JSON); + Request request = new Request("PUT", "_ilm/policy/" + policyName); + request.setEntity(entity); + client.performRequest(request); + } } diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesDataStreamsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesDataStreamsIT.java index 1fea6b9c228..94005ea0f6c 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesDataStreamsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesDataStreamsIT.java @@ -6,22 +6,35 @@ package org.elasticsearch.xpack.ilm; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep; import org.elasticsearch.xpack.core.ilm.RolloverAction; +import org.elasticsearch.xpack.core.ilm.ShrinkAction; + +import java.util.concurrent.TimeUnit; import static org.elasticsearch.xpack.TimeSeriesRestDriver.createComposableTemplate; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.createFullPolicy; import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.explainIndex; import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex; import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.rolloverMaxOneDocCondition; +import static org.elasticsearch.xpack.core.ilm.ShrinkAction.CONDITIONAL_SKIP_SHRINK_STEP; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; public class TimeSeriesDataStreamsIT extends ESRestTestCase { + private static final String FAILED_STEP_RETRY_COUNT_FIELD = "failed_step_retry_count"; + public void testRolloverAction() throws Exception { String policyName = "logs-policy"; createNewSingletonPolicy(client(), policyName, "hot", new RolloverAction(null, null, 1L)); @@ -40,4 +53,56 @@ public class TimeSeriesDataStreamsIT extends ESRestTestCase { equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); } + public void testShrinkActionInPolicyWithoutHotPhase() throws Exception { + String policyName = "logs-policy"; + createNewSingletonPolicy(client(), policyName, "warm", new ShrinkAction(1)); + + Settings settings = Settings.builder() + .put(LifecycleSettings.LIFECYCLE_NAME, policyName) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3) + .build(); + Template template = new Template(settings, null, null); + createComposableTemplate(client(), "logs-template", "logs-foo*", template); + + String dataStream = "logs-foo"; + indexDocument(client(), dataStream, true); + + String backingIndexName = DataStream.getDefaultBackingIndexName(dataStream, 1); + String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + backingIndexName; + assertBusy(() -> assertThat( + "original index must wait in the " + CONDITIONAL_SKIP_SHRINK_STEP + " until it is not the write index anymore", + (Integer) explainIndex(client(), backingIndexName).get(FAILED_STEP_RETRY_COUNT_FIELD), greaterThanOrEqualTo(1)), + 30, TimeUnit.SECONDS); + + // Manual rollover the original index such that it's not the write index in the data stream anymore + rolloverMaxOneDocCondition(client(), dataStream); + + assertBusy(() -> assertTrue(indexExists(shrunkenIndex)), 30, TimeUnit.SECONDS); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey()))); + assertThat("the original index must've been deleted", indexExists(backingIndexName), is(false)); + } + + public void testShrinkAfterRollover() throws Exception { + String policyName = "logs-policy"; + createFullPolicy(client(), policyName, TimeValue.ZERO); + + Settings settings = Settings.builder() + .put(LifecycleSettings.LIFECYCLE_NAME, policyName) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3) + .build(); + Template template = new Template(settings, null, null); + createComposableTemplate(client(), "logs-template", "logs-foo*", template); + + String dataStream = "logs-foo"; + indexDocument(client(), dataStream, true); + + String backingIndexName = DataStream.getDefaultBackingIndexName(dataStream, 1); + String rolloverIndex = DataStream.getDefaultBackingIndexName(dataStream, 2); + String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + backingIndexName; + assertBusy(() -> assertTrue("the rollover action created the rollover index", indexExists(rolloverIndex))); + assertBusy(() -> assertFalse("the original index was deleted by the shrink action", indexExists(backingIndexName)), + 60, TimeUnit.SECONDS); + assertBusy(() -> assertFalse("the shrunken index was deleted by the delete action", indexExists(shrunkenIndex)), + 30, TimeUnit.SECONDS); + } } diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java index 9f9647b384c..2984bc3b0c6 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java @@ -67,11 +67,13 @@ import java.util.function.Supplier; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.createFullPolicy; import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy; import static org.elasticsearch.xpack.TimeSeriesRestDriver.explain; import static org.elasticsearch.xpack.TimeSeriesRestDriver.explainIndex; import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex; import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.rolloverMaxOneDocCondition; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -116,7 +118,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase { .put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)); // create policy - createFullPolicy(TimeValue.ZERO); + createFullPolicy(client(), policy, TimeValue.ZERO); // update policy on index updatePolicy(originalIndex, policy); // index document {"foo": "bar"} to trigger rollover @@ -144,7 +146,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase { .put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias")); // create policy - createFullPolicy(TimeValue.timeValueHours(10)); + createFullPolicy(client(), policy, TimeValue.timeValueHours(10)); // update policy on index updatePolicy(originalIndex, policy); @@ -177,7 +179,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase { .put("index.routing.allocation.include._name", "integTest-0") .put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)); - createFullPolicy(TimeValue.timeValueHours(10)); + createFullPolicy(client(), policy, TimeValue.timeValueHours(10)); // update policy on index updatePolicy(originalIndex, policy); @@ -1041,7 +1043,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase { String nonexistantPolicyIndex = index + "-nonexistant-policy"; String unmanagedIndex = index + "-unmanaged"; - createFullPolicy(TimeValue.ZERO); + createFullPolicy(client(), policy, TimeValue.ZERO); { // Create a "shrink-only-policy" @@ -1094,7 +1096,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase { } public void testExplainIndexContainsAutomaticRetriesInformation() throws Exception { - createFullPolicy(TimeValue.ZERO); + createFullPolicy(client(), policy, TimeValue.ZERO); // create index without alias so the rollover action fails and is retried createIndexWithSettingsNoAlias(index, Settings.builder() @@ -1174,14 +1176,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase { client().performRequest(refreshOriginalIndex); // Manual rollover - Request rolloverRequest = new Request("POST", "/" + alias + "/_rollover"); - rolloverRequest.setJsonEntity("{\n" + - " \"conditions\": {\n" + - " \"max_docs\": \"1\"\n" + - " }\n" + - "}" - ); - client().performRequest(rolloverRequest); + rolloverMaxOneDocCondition(client(), alias); assertBusy(() -> assertTrue(indexExists(secondIndex))); // Index another document into the original index so the ILM rollover policy condition is met @@ -1331,14 +1326,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase { // manual rollover the index so the "update-rollover-lifecycle-date" ILM step can continue and finish successfully as the index // will have rollover information now - Request rolloverRequest = new Request("POST", "/" + alias + "/_rollover"); - rolloverRequest.setJsonEntity("{\n" + - " \"conditions\": {\n" + - " \"max_docs\": \"1\"\n" + - " }\n" + - "}" - ); - client().performRequest(rolloverRequest); + rolloverMaxOneDocCondition(client(), alias); assertBusy(() -> assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); } @@ -1813,34 +1801,6 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase { assertEquals(WaitForRolloverReadyStep.NAME, stepKey.getName()); } - private void createFullPolicy(TimeValue hotTime) throws IOException { - Map hotActions = new HashMap<>(); - hotActions.put(SetPriorityAction.NAME, new SetPriorityAction(100)); - hotActions.put(RolloverAction.NAME, new RolloverAction(null, null, 1L)); - Map warmActions = new HashMap<>(); - warmActions.put(SetPriorityAction.NAME, new SetPriorityAction(50)); - warmActions.put(ForceMergeAction.NAME, new ForceMergeAction(1, null)); - warmActions.put(AllocateAction.NAME, new AllocateAction(1, singletonMap("_name", "integTest-1,integTest-2"), null, null)); - warmActions.put(ShrinkAction.NAME, new ShrinkAction(1)); - Map coldActions = new HashMap<>(); - coldActions.put(SetPriorityAction.NAME, new SetPriorityAction(0)); - coldActions.put(AllocateAction.NAME, new AllocateAction(0, singletonMap("_name", "integTest-3"), null, null)); - Map phases = new HashMap<>(); - phases.put("hot", new Phase("hot", hotTime, hotActions)); - phases.put("warm", new Phase("warm", TimeValue.ZERO, warmActions)); - phases.put("cold", new Phase("cold", TimeValue.ZERO, coldActions)); - phases.put("delete", new Phase("delete", TimeValue.ZERO, singletonMap(DeleteAction.NAME, new DeleteAction()))); - LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policy, phases); - // PUT policy - XContentBuilder builder = jsonBuilder(); - lifecyclePolicy.toXContent(builder, null); - final StringEntity entity = new StringEntity( - "{ \"policy\":" + Strings.toString(builder) + "}", ContentType.APPLICATION_JSON); - Request request = new Request("PUT", "_ilm/policy/" + policy); - request.setEntity(entity); - assertOK(client().performRequest(request)); - } - private void createIndexWithSettingsNoAlias(String index, Settings.Builder settings) throws IOException { Request request = new Request("PUT", "/" + index); request.setJsonEntity("{\n \"settings\": " + Strings.toString(settings.build())