This changes the actions that would attempt to make the managed index read only to check if the managed index is the write index of a data stream before proceeding. The updated actions are shrink, readonly, freeze and forcemerge. (cherry picked from commit c906f631833fee8628f898917a8613a1f436c6b1) Signed-off-by: Andrei Dan <andrei.dan@elastic.co>
This commit is contained in:
parent
abc72c1a27
commit
caa5d3abe0
|
@ -120,6 +120,7 @@ public class ForceMergeAction implements LifecycleAction {
|
|||
|
||||
final boolean codecChange = codec != null && codec.equals(CodecService.BEST_COMPRESSION_CODEC);
|
||||
|
||||
StepKey checkNotWriteIndex = new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME);
|
||||
StepKey readOnlyKey = new StepKey(phase, NAME, ReadOnlyAction.NAME);
|
||||
|
||||
StepKey closeKey = new StepKey(phase, NAME, CloseIndexStep.NAME);
|
||||
|
@ -130,6 +131,8 @@ public class ForceMergeAction implements LifecycleAction {
|
|||
StepKey forceMergeKey = new StepKey(phase, NAME, ForceMergeStep.NAME);
|
||||
StepKey countKey = new StepKey(phase, NAME, SegmentCountStep.NAME);
|
||||
|
||||
CheckNotDataStreamWriteIndexStep checkNotWriteIndexStep = new CheckNotDataStreamWriteIndexStep(checkNotWriteIndex,
|
||||
readOnlyKey);
|
||||
UpdateSettingsStep readOnlyStep =
|
||||
new UpdateSettingsStep(readOnlyKey, codecChange ? closeKey : forceMergeKey, client, readOnlySettings);
|
||||
|
||||
|
@ -144,6 +147,7 @@ public class ForceMergeAction implements LifecycleAction {
|
|||
SegmentCountStep segmentCountStep = new SegmentCountStep(countKey, nextStepKey, client, maxNumSegments);
|
||||
|
||||
List<Step> mergeSteps = new ArrayList<>();
|
||||
mergeSteps.add(checkNotWriteIndexStep);
|
||||
mergeSteps.add(readOnlyStep);
|
||||
|
||||
if (codecChange) {
|
||||
|
|
|
@ -59,9 +59,13 @@ public class FreezeAction implements LifecycleAction {
|
|||
|
||||
@Override
|
||||
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
|
||||
StepKey checkNotWriteIndex = new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME);
|
||||
StepKey freezeStepKey = new StepKey(phase, NAME, FreezeStep.NAME);
|
||||
|
||||
CheckNotDataStreamWriteIndexStep checkNoWriteIndexStep = new CheckNotDataStreamWriteIndexStep(checkNotWriteIndex,
|
||||
freezeStepKey);
|
||||
FreezeStep freezeStep = new FreezeStep(freezeStepKey, nextStepKey, client);
|
||||
return Arrays.asList(freezeStep);
|
||||
return Arrays.asList(checkNoWriteIndexStep, freezeStep);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -14,9 +14,10 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
@ -59,10 +60,14 @@ public class ReadOnlyAction implements LifecycleAction {
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey) {
|
||||
Step.StepKey key = new Step.StepKey(phase, NAME, NAME);
|
||||
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
|
||||
StepKey checkNotWriteIndex = new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME);
|
||||
StepKey readOnlyKey = new StepKey(phase, NAME, NAME);
|
||||
CheckNotDataStreamWriteIndexStep checkNotWriteIndexStep = new CheckNotDataStreamWriteIndexStep(checkNotWriteIndex,
|
||||
readOnlyKey);
|
||||
Settings readOnlySettings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true).build();
|
||||
return Collections.singletonList(new UpdateSettingsStep(key, nextStepKey, client, readOnlySettings));
|
||||
UpdateSettingsStep readOnlyStep = new UpdateSettingsStep(readOnlyKey, nextStepKey, client, readOnlySettings);
|
||||
return Arrays.asList(checkNotWriteIndexStep, readOnlyStep);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -8,7 +8,6 @@ package org.elasticsearch.xpack.core.ilm;
|
|||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexAbstraction;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
|
@ -19,15 +18,12 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Objects;
|
||||
import java.util.function.BiPredicate;
|
||||
|
||||
/**
|
||||
* A {@link LifecycleAction} which shrinks the index.
|
||||
|
@ -97,6 +93,7 @@ public class ShrinkAction implements LifecycleAction {
|
|||
Settings readOnlySettings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true).build();
|
||||
|
||||
StepKey preShrinkBranchingKey = new StepKey(phase, NAME, CONDITIONAL_SKIP_SHRINK_STEP);
|
||||
StepKey checkNotWriteIndex = new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME);
|
||||
StepKey waitForNoFollowerStepKey = new StepKey(phase, NAME, WaitForNoFollowersStep.NAME);
|
||||
StepKey readOnlyKey = new StepKey(phase, NAME, ReadOnlyAction.NAME);
|
||||
StepKey setSingleNodeKey = new StepKey(phase, NAME, SetSingleNodeAllocateStep.NAME);
|
||||
|
@ -110,8 +107,10 @@ public class ShrinkAction implements LifecycleAction {
|
|||
StepKey replaceDataStreamIndexKey = new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME);
|
||||
StepKey deleteIndexKey = new StepKey(phase, NAME, DeleteStep.NAME);
|
||||
|
||||
BranchingStep conditionalSkipShrinkStep = new BranchingStep(preShrinkBranchingKey, waitForNoFollowerStepKey, nextStepKey,
|
||||
getSkipShrinkStepPredicate(numberOfShards));
|
||||
BranchingStep conditionalSkipShrinkStep = new BranchingStep(preShrinkBranchingKey, checkNotWriteIndex, nextStepKey,
|
||||
(index, clusterState) -> clusterState.getMetadata().index(index).getNumberOfShards() == numberOfShards);
|
||||
CheckNotDataStreamWriteIndexStep checkNotWriteIndexStep = new CheckNotDataStreamWriteIndexStep(checkNotWriteIndex,
|
||||
waitForNoFollowerStepKey);
|
||||
WaitForNoFollowersStep waitForNoFollowersStep = new WaitForNoFollowersStep(waitForNoFollowerStepKey, readOnlyKey, client);
|
||||
UpdateSettingsStep readOnlyStep = new UpdateSettingsStep(readOnlyKey, setSingleNodeKey, client, readOnlySettings);
|
||||
SetSingleNodeAllocateStep setSingleNodeStep = new SetSingleNodeAllocateStep(setSingleNodeKey, allocationRoutedKey, client);
|
||||
|
@ -136,45 +135,11 @@ public class ShrinkAction implements LifecycleAction {
|
|||
deleteIndexKey, SHRUNKEN_INDEX_PREFIX);
|
||||
DeleteStep deleteSourceIndexStep = new DeleteStep(deleteIndexKey, isShrunkIndexKey, client);
|
||||
ShrunkenIndexCheckStep waitOnShrinkTakeover = new ShrunkenIndexCheckStep(isShrunkIndexKey, nextStepKey, SHRUNKEN_INDEX_PREFIX);
|
||||
return Arrays.asList(conditionalSkipShrinkStep, waitForNoFollowersStep, readOnlyStep, setSingleNodeStep, checkShrinkReadyStep,
|
||||
shrink, allocated, copyMetadata, isDataStreamBranchingStep, aliasSwapAndDelete, waitOnShrinkTakeover,
|
||||
return Arrays.asList(conditionalSkipShrinkStep, checkNotWriteIndexStep, waitForNoFollowersStep, readOnlyStep, setSingleNodeStep,
|
||||
checkShrinkReadyStep, shrink, allocated, copyMetadata, isDataStreamBranchingStep, aliasSwapAndDelete, waitOnShrinkTakeover,
|
||||
replaceDataStreamBackingIndex, deleteSourceIndexStep);
|
||||
}
|
||||
|
||||
static BiPredicate<Index, ClusterState> getSkipShrinkStepPredicate(int targetNumberOfShards) {
|
||||
return (index, clusterState) -> {
|
||||
IndexMetadata indexMetadata = clusterState.getMetadata().index(index);
|
||||
if (indexMetadata == null) {
|
||||
// Index must have been since deleted, skip the shrink action
|
||||
logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", NAME, index.getName());
|
||||
return true;
|
||||
}
|
||||
|
||||
if (indexMetadata.getNumberOfShards() == targetNumberOfShards) {
|
||||
logger.debug("skipping [{}] lifecycle action for index [{}] because the index already has the target number of shards",
|
||||
NAME, index.getName());
|
||||
return true;
|
||||
}
|
||||
|
||||
IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(index.getName());
|
||||
assert indexAbstraction != null : "invalid cluster metadata. index [" + index.getName() + "] was not found";
|
||||
|
||||
if (indexAbstraction.getParentDataStream() != null) {
|
||||
IndexAbstraction.DataStream dataStream = indexAbstraction.getParentDataStream();
|
||||
assert dataStream.getWriteIndex() != null : dataStream.getName() + " has no write index";
|
||||
if (dataStream.getWriteIndex().getIndex().equals(index)) {
|
||||
String policyName = indexMetadata.getSettings().get(LifecycleSettings.LIFECYCLE_NAME);
|
||||
String errorMessage = String.format(Locale.ROOT, "index [%s] is the write index for data stream [%s], pausing " +
|
||||
"ILM execution of lifecycle [%s] until this index is no longer the write index for the data stream via manual or " +
|
||||
"automated rollover", index.getName(), dataStream.getName(), policyName);
|
||||
logger.debug(errorMessage);
|
||||
throw new IllegalStateException(errorMessage);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
|
|
|
@ -65,17 +65,21 @@ public class ForceMergeActionTests extends AbstractActionTestCase<ForceMergeActi
|
|||
StepKey nextStepKey = new StepKey(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10));
|
||||
List<Step> steps = instance.toSteps(null, phase, nextStepKey);
|
||||
assertNotNull(steps);
|
||||
assertEquals(3, steps.size());
|
||||
UpdateSettingsStep firstStep = (UpdateSettingsStep) steps.get(0);
|
||||
ForceMergeStep secondStep = (ForceMergeStep) steps.get(1);
|
||||
SegmentCountStep thirdStep = (SegmentCountStep) steps.get(2);
|
||||
assertThat(firstStep.getKey(), equalTo(new StepKey(phase, ForceMergeAction.NAME, ReadOnlyAction.NAME)));
|
||||
assertThat(firstStep.getNextStepKey(), equalTo(new StepKey(phase, ForceMergeAction.NAME, ForceMergeStep.NAME)));
|
||||
assertTrue(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(firstStep.getSettings()));
|
||||
assertThat(secondStep.getKey(), equalTo(new StepKey(phase, ForceMergeAction.NAME, ForceMergeStep.NAME)));
|
||||
assertThat(secondStep.getNextStepKey(), equalTo(thirdStep.getKey()));
|
||||
assertThat(thirdStep.getKey(), equalTo(new StepKey(phase, ForceMergeAction.NAME, SegmentCountStep.NAME)));
|
||||
assertThat(thirdStep.getNextStepKey(), equalTo(nextStepKey));
|
||||
assertEquals(4, steps.size());
|
||||
CheckNotDataStreamWriteIndexStep firstStep = (CheckNotDataStreamWriteIndexStep) steps.get(0);
|
||||
UpdateSettingsStep secondStep = (UpdateSettingsStep) steps.get(1);
|
||||
ForceMergeStep thirdStep = (ForceMergeStep) steps.get(2);
|
||||
SegmentCountStep fourthStep = (SegmentCountStep) steps.get(3);
|
||||
|
||||
assertThat(firstStep.getKey(), equalTo(new StepKey(phase, ForceMergeAction.NAME, CheckNotDataStreamWriteIndexStep.NAME)));
|
||||
assertThat(firstStep.getNextStepKey(), equalTo(new StepKey(phase, ForceMergeAction.NAME, ReadOnlyAction.NAME)));
|
||||
assertThat(secondStep.getKey(), equalTo(new StepKey(phase, ForceMergeAction.NAME, ReadOnlyAction.NAME)));
|
||||
assertThat(secondStep.getNextStepKey(), equalTo(new StepKey(phase, ForceMergeAction.NAME, ForceMergeStep.NAME)));
|
||||
assertTrue(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(secondStep.getSettings()));
|
||||
assertThat(thirdStep.getKey(), equalTo(new StepKey(phase, ForceMergeAction.NAME, ForceMergeStep.NAME)));
|
||||
assertThat(thirdStep.getNextStepKey(), equalTo(fourthStep.getKey()));
|
||||
assertThat(fourthStep.getKey(), equalTo(new StepKey(phase, ForceMergeAction.NAME, SegmentCountStep.NAME)));
|
||||
assertThat(fourthStep.getNextStepKey(), equalTo(nextStepKey));
|
||||
}
|
||||
|
||||
private void assertBestCompression(ForceMergeAction instance) {
|
||||
|
@ -83,10 +87,11 @@ public class ForceMergeActionTests extends AbstractActionTestCase<ForceMergeActi
|
|||
StepKey nextStepKey = new StepKey(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10));
|
||||
List<Step> steps = instance.toSteps(null, phase, nextStepKey);
|
||||
assertNotNull(steps);
|
||||
assertEquals(7, steps.size());
|
||||
assertEquals(8, steps.size());
|
||||
List<Tuple<StepKey, StepKey>> stepKeys = steps.stream()
|
||||
.map(s -> new Tuple<>(s.getKey(), s.getNextStepKey()))
|
||||
.collect(Collectors.toList());
|
||||
StepKey checkNotWriteIndex = new StepKey(phase, ForceMergeAction.NAME, CheckNotDataStreamWriteIndexStep.NAME);
|
||||
StepKey readOnly = new StepKey(phase, ForceMergeAction.NAME, ReadOnlyAction.NAME);
|
||||
StepKey closeIndex = new StepKey(phase, ForceMergeAction.NAME, CloseIndexStep.NAME);
|
||||
StepKey updateCodec = new StepKey(phase, ForceMergeAction.NAME, UpdateSettingsStep.NAME);
|
||||
|
@ -95,6 +100,7 @@ public class ForceMergeActionTests extends AbstractActionTestCase<ForceMergeActi
|
|||
StepKey forceMerge = new StepKey(phase, ForceMergeAction.NAME, ForceMergeStep.NAME);
|
||||
StepKey segmentCount = new StepKey(phase, ForceMergeAction.NAME, SegmentCountStep.NAME);
|
||||
assertThat(stepKeys, contains(
|
||||
new Tuple<>(checkNotWriteIndex, readOnly),
|
||||
new Tuple<>(readOnly, closeIndex),
|
||||
new Tuple<>(closeIndex, updateCodec),
|
||||
new Tuple<>(updateCodec, openIndex),
|
||||
|
@ -103,11 +109,11 @@ public class ForceMergeActionTests extends AbstractActionTestCase<ForceMergeActi
|
|||
new Tuple<>(forceMerge, segmentCount),
|
||||
new Tuple<>(segmentCount, nextStepKey)));
|
||||
|
||||
UpdateSettingsStep firstStep = (UpdateSettingsStep) steps.get(0);
|
||||
UpdateSettingsStep thirdStep = (UpdateSettingsStep) steps.get(2);
|
||||
UpdateSettingsStep secondStep = (UpdateSettingsStep) steps.get(1);
|
||||
UpdateSettingsStep fourthStep = (UpdateSettingsStep) steps.get(3);
|
||||
|
||||
assertTrue(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(firstStep.getSettings()));
|
||||
assertThat(thirdStep.getSettings().get(EngineConfig.INDEX_CODEC_SETTING.getKey()), equalTo(CodecService.BEST_COMPRESSION_CODEC));
|
||||
assertTrue(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(secondStep.getSettings()));
|
||||
assertThat(fourthStep.getSettings().get(EngineConfig.INDEX_CODEC_SETTING.getKey()), equalTo(CodecService.BEST_COMPRESSION_CODEC));
|
||||
}
|
||||
|
||||
public void testMissingMaxNumSegments() throws IOException {
|
||||
|
|
|
@ -12,6 +12,8 @@ import org.elasticsearch.xpack.core.ilm.Step.StepKey;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class FreezeActionTests extends AbstractActionTestCase<FreezeAction> {
|
||||
|
||||
@Override
|
||||
|
@ -36,10 +38,17 @@ public class FreezeActionTests extends AbstractActionTestCase<FreezeAction> {
|
|||
randomAlphaOfLengthBetween(1, 10));
|
||||
List<Step> steps = action.toSteps(null, phase, nextStepKey);
|
||||
assertNotNull(steps);
|
||||
assertEquals(1, steps.size());
|
||||
StepKey expectedFirstStepKey = new StepKey(phase, FreezeAction.NAME, FreezeStep.NAME);
|
||||
FreezeStep firstStep = (FreezeStep) steps.get(0);
|
||||
assertEquals(expectedFirstStepKey, firstStep.getKey());
|
||||
assertEquals(nextStepKey, firstStep.getNextStepKey());
|
||||
assertEquals(2, steps.size());
|
||||
StepKey expectedFirstStepKey = new StepKey(phase, FreezeAction.NAME, CheckNotDataStreamWriteIndexStep.NAME);
|
||||
StepKey expectedSecondStepKey = new StepKey(phase, FreezeAction.NAME, FreezeStep.NAME);
|
||||
|
||||
CheckNotDataStreamWriteIndexStep firstStep = (CheckNotDataStreamWriteIndexStep) steps.get(0);
|
||||
FreezeStep secondStep = (FreezeStep) steps.get(1);
|
||||
|
||||
assertThat(firstStep.getKey(), equalTo(expectedFirstStepKey));
|
||||
assertThat(firstStep.getNextStepKey(), equalTo(expectedSecondStepKey));
|
||||
|
||||
assertEquals(expectedSecondStepKey, secondStep.getKey());
|
||||
assertEquals(nextStepKey, secondStep.getNextStepKey());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,13 +38,19 @@ public class ReadOnlyActionTests extends AbstractActionTestCase<ReadOnlyAction>
|
|||
randomAlphaOfLengthBetween(1, 10));
|
||||
List<Step> steps = action.toSteps(null, phase, nextStepKey);
|
||||
assertNotNull(steps);
|
||||
assertEquals(1, steps.size());
|
||||
StepKey expectedFirstStepKey = new StepKey(phase, ReadOnlyAction.NAME, ReadOnlyAction.NAME);
|
||||
UpdateSettingsStep firstStep = (UpdateSettingsStep) steps.get(0);
|
||||
assertEquals(2, steps.size());
|
||||
StepKey expectedFirstStepKey = new StepKey(phase, ReadOnlyAction.NAME, CheckNotDataStreamWriteIndexStep.NAME);
|
||||
StepKey expectedSecondStepKey = new StepKey(phase, ReadOnlyAction.NAME, ReadOnlyAction.NAME);
|
||||
CheckNotDataStreamWriteIndexStep firstStep = (CheckNotDataStreamWriteIndexStep) steps.get(0);
|
||||
UpdateSettingsStep secondStep = (UpdateSettingsStep) steps.get(1);
|
||||
|
||||
assertThat(firstStep.getKey(), equalTo(expectedFirstStepKey));
|
||||
assertThat(firstStep.getNextStepKey(), equalTo(nextStepKey));
|
||||
assertThat(firstStep.getSettings().size(), equalTo(1));
|
||||
assertTrue(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(firstStep.getSettings()));
|
||||
assertThat(firstStep.getNextStepKey(), equalTo(expectedSecondStepKey));
|
||||
|
||||
assertThat(secondStep.getKey(), equalTo(expectedSecondStepKey));
|
||||
assertThat(secondStep.getNextStepKey(), equalTo(nextStepKey));
|
||||
assertThat(secondStep.getSettings().size(), equalTo(1));
|
||||
assertTrue(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(secondStep.getSettings()));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -8,23 +8,18 @@ package org.elasticsearch.xpack.core.ilm;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.DataStream;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ilm.ShrinkAction.getSkipShrinkStepPredicate;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class ShrinkActionTests extends AbstractActionTestCase<ShrinkAction> {
|
||||
|
||||
|
@ -125,74 +120,27 @@ public class ShrinkActionTests extends AbstractActionTestCase<ShrinkAction> {
|
|||
assertThat(step.getNextStepKey(), equalTo(steps.get(1).getKey()));
|
||||
}
|
||||
|
||||
public void testNoOpShrinkDoesntFailOnDataStreamWriteIndex() {
|
||||
String dataStreamName = randomAlphaOfLength(10);
|
||||
String indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
|
||||
String policyName = "test-ilm-policy";
|
||||
IndexMetadata sourceIndexMetadata = IndexMetadata.builder(indexName)
|
||||
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName))
|
||||
.numberOfShards(1).numberOfReplicas(1)
|
||||
.build();
|
||||
|
||||
List<Index> backingIndices = org.elasticsearch.common.collect.List.of(sourceIndexMetadata.getIndex());
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(
|
||||
Metadata.builder()
|
||||
.put(sourceIndexMetadata, true)
|
||||
.put(new DataStream(dataStreamName, "timestamp", backingIndices))
|
||||
.build()
|
||||
).build();
|
||||
|
||||
boolean skipShrink = getSkipShrinkStepPredicate(1).test(sourceIndexMetadata.getIndex(), clusterState);
|
||||
assertThat("shrink is skipped even though it is applied to a data stream's write index because it would be a no-op",
|
||||
skipShrink, is(true));
|
||||
}
|
||||
|
||||
public void testShrinkFailsOnDataStreamWriteIndex() {
|
||||
String dataStreamName = randomAlphaOfLength(10);
|
||||
String indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
|
||||
String policyName = "test-ilm-policy";
|
||||
IndexMetadata sourceIndexMetadata = IndexMetadata.builder(indexName)
|
||||
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName))
|
||||
.numberOfShards(5).numberOfReplicas(1)
|
||||
.build();
|
||||
|
||||
List<Index> backingIndices = org.elasticsearch.common.collect.List.of(sourceIndexMetadata.getIndex());
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(
|
||||
Metadata.builder()
|
||||
.put(sourceIndexMetadata, true)
|
||||
.put(new DataStream(dataStreamName, "timestamp", backingIndices))
|
||||
.build()
|
||||
).build();
|
||||
|
||||
expectThrows(IllegalStateException.class, () -> getSkipShrinkStepPredicate(1).test(sourceIndexMetadata.getIndex(), clusterState));
|
||||
}
|
||||
|
||||
public void testShrinkSkipIfIndexIsDeleted() {
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).build();
|
||||
Index missingIndex = new Index("missing", UUID.randomUUID().toString());
|
||||
assertThat(getSkipShrinkStepPredicate(1).test(missingIndex, clusterState), is(true));
|
||||
}
|
||||
|
||||
public void testToSteps() {
|
||||
ShrinkAction action = createTestInstance();
|
||||
String phase = randomAlphaOfLengthBetween(1, 10);
|
||||
StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
|
||||
randomAlphaOfLengthBetween(1, 10));
|
||||
List<Step> steps = action.toSteps(null, phase, nextStepKey);
|
||||
assertThat(steps.size(), equalTo(13));
|
||||
assertThat(steps.size(), equalTo(14));
|
||||
StepKey expectedFirstKey = new StepKey(phase, ShrinkAction.NAME, ShrinkAction.CONDITIONAL_SKIP_SHRINK_STEP);
|
||||
StepKey expectedSecondKey = new StepKey(phase, ShrinkAction.NAME, WaitForNoFollowersStep.NAME);
|
||||
StepKey expectedThirdKey = new StepKey(phase, ShrinkAction.NAME, ReadOnlyAction.NAME);
|
||||
StepKey expectedFourthKey = new StepKey(phase, ShrinkAction.NAME, SetSingleNodeAllocateStep.NAME);
|
||||
StepKey expectedFifthKey = new StepKey(phase, ShrinkAction.NAME, CheckShrinkReadyStep.NAME);
|
||||
StepKey expectedSixthKey = new StepKey(phase, ShrinkAction.NAME, ShrinkStep.NAME);
|
||||
StepKey expectedSeventhKey = new StepKey(phase, ShrinkAction.NAME, ShrunkShardsAllocatedStep.NAME);
|
||||
StepKey expectedEighthKey = new StepKey(phase, ShrinkAction.NAME, CopyExecutionStateStep.NAME);
|
||||
StepKey expectedNinthKey = new StepKey(phase, ShrinkAction.NAME, ShrinkAction.CONDITIONAL_DATASTREAM_CHECK_KEY);
|
||||
StepKey expectedTenthKey = new StepKey(phase, ShrinkAction.NAME, ShrinkSetAliasStep.NAME);
|
||||
StepKey expectedEleventhKey = new StepKey(phase, ShrinkAction.NAME, ShrunkenIndexCheckStep.NAME);
|
||||
StepKey expectedTwelveKey = new StepKey(phase, ShrinkAction.NAME, ReplaceDataStreamBackingIndexStep.NAME);
|
||||
StepKey expectedThirteenKey = new StepKey(phase, ShrinkAction.NAME, DeleteStep.NAME);
|
||||
StepKey expectedSecondKey = new StepKey(phase, ShrinkAction.NAME, CheckNotDataStreamWriteIndexStep.NAME);
|
||||
StepKey expectedThirdKey = new StepKey(phase, ShrinkAction.NAME, WaitForNoFollowersStep.NAME);
|
||||
StepKey expectedFourthKey = new StepKey(phase, ShrinkAction.NAME, ReadOnlyAction.NAME);
|
||||
StepKey expectedFifthKey = new StepKey(phase, ShrinkAction.NAME, SetSingleNodeAllocateStep.NAME);
|
||||
StepKey expectedSixthKey = new StepKey(phase, ShrinkAction.NAME, CheckShrinkReadyStep.NAME);
|
||||
StepKey expectedSeventhKey = new StepKey(phase, ShrinkAction.NAME, ShrinkStep.NAME);
|
||||
StepKey expectedEighthKey = new StepKey(phase, ShrinkAction.NAME, ShrunkShardsAllocatedStep.NAME);
|
||||
StepKey expectedNinthKey = new StepKey(phase, ShrinkAction.NAME, CopyExecutionStateStep.NAME);
|
||||
StepKey expectedTenthKey = new StepKey(phase, ShrinkAction.NAME, ShrinkAction.CONDITIONAL_DATASTREAM_CHECK_KEY);
|
||||
StepKey expectedEleventhKey = new StepKey(phase, ShrinkAction.NAME, ShrinkSetAliasStep.NAME);
|
||||
StepKey expectedTwelveKey = new StepKey(phase, ShrinkAction.NAME, ShrunkenIndexCheckStep.NAME);
|
||||
StepKey expectedThirteenKey = new StepKey(phase, ShrinkAction.NAME, ReplaceDataStreamBackingIndexStep.NAME);
|
||||
StepKey expectedFourteenKey = new StepKey(phase, ShrinkAction.NAME, DeleteStep.NAME);
|
||||
|
||||
assertTrue(steps.get(0) instanceof BranchingStep);
|
||||
assertThat(steps.get(0).getKey(), equalTo(expectedFirstKey));
|
||||
|
@ -200,62 +148,66 @@ public class ShrinkActionTests extends AbstractActionTestCase<ShrinkAction> {
|
|||
assertThat(((BranchingStep) steps.get(0)).getNextStepKeyOnFalse(), equalTo(expectedSecondKey));
|
||||
assertThat(((BranchingStep) steps.get(0)).getNextStepKeyOnTrue(), equalTo(nextStepKey));
|
||||
|
||||
assertTrue(steps.get(1) instanceof WaitForNoFollowersStep);
|
||||
assertTrue(steps.get(1) instanceof CheckNotDataStreamWriteIndexStep);
|
||||
assertThat(steps.get(1).getKey(), equalTo(expectedSecondKey));
|
||||
assertThat(steps.get(1).getNextStepKey(), equalTo(expectedThirdKey));
|
||||
|
||||
assertTrue(steps.get(2) instanceof UpdateSettingsStep);
|
||||
assertTrue(steps.get(2) instanceof WaitForNoFollowersStep);
|
||||
assertThat(steps.get(2).getKey(), equalTo(expectedThirdKey));
|
||||
assertThat(steps.get(2).getNextStepKey(), equalTo(expectedFourthKey));
|
||||
assertTrue(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(((UpdateSettingsStep)steps.get(2)).getSettings()));
|
||||
|
||||
assertTrue(steps.get(3) instanceof SetSingleNodeAllocateStep);
|
||||
assertTrue(steps.get(3) instanceof UpdateSettingsStep);
|
||||
assertThat(steps.get(3).getKey(), equalTo(expectedFourthKey));
|
||||
assertThat(steps.get(3).getNextStepKey(), equalTo(expectedFifthKey));
|
||||
assertTrue(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(((UpdateSettingsStep)steps.get(3)).getSettings()));
|
||||
|
||||
assertTrue(steps.get(4) instanceof CheckShrinkReadyStep);
|
||||
assertTrue(steps.get(4) instanceof SetSingleNodeAllocateStep);
|
||||
assertThat(steps.get(4).getKey(), equalTo(expectedFifthKey));
|
||||
assertThat(steps.get(4).getNextStepKey(), equalTo(expectedSixthKey));
|
||||
|
||||
assertTrue(steps.get(5) instanceof ShrinkStep);
|
||||
assertTrue(steps.get(5) instanceof CheckShrinkReadyStep);
|
||||
assertThat(steps.get(5).getKey(), equalTo(expectedSixthKey));
|
||||
assertThat(steps.get(5).getNextStepKey(), equalTo(expectedSeventhKey));
|
||||
assertThat(((ShrinkStep) steps.get(5)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
|
||||
|
||||
assertTrue(steps.get(6) instanceof ShrunkShardsAllocatedStep);
|
||||
assertTrue(steps.get(6) instanceof ShrinkStep);
|
||||
assertThat(steps.get(6).getKey(), equalTo(expectedSeventhKey));
|
||||
assertThat(steps.get(6).getNextStepKey(), equalTo(expectedEighthKey));
|
||||
assertThat(((ShrunkShardsAllocatedStep) steps.get(6)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
|
||||
assertThat(((ShrinkStep) steps.get(6)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
|
||||
|
||||
assertTrue(steps.get(7) instanceof CopyExecutionStateStep);
|
||||
assertTrue(steps.get(7) instanceof ShrunkShardsAllocatedStep);
|
||||
assertThat(steps.get(7).getKey(), equalTo(expectedEighthKey));
|
||||
assertThat(steps.get(7).getNextStepKey(), equalTo(expectedNinthKey));
|
||||
assertThat(((CopyExecutionStateStep) steps.get(7)).getTargetIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
|
||||
assertThat(((ShrunkShardsAllocatedStep) steps.get(7)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
|
||||
|
||||
assertTrue(steps.get(8) instanceof BranchingStep);
|
||||
assertTrue(steps.get(8) instanceof CopyExecutionStateStep);
|
||||
assertThat(steps.get(8).getKey(), equalTo(expectedNinthKey));
|
||||
expectThrows(IllegalStateException.class, () -> steps.get(8).getNextStepKey());
|
||||
assertThat(((BranchingStep) steps.get(8)).getNextStepKeyOnFalse(), equalTo(expectedTenthKey));
|
||||
assertThat(((BranchingStep) steps.get(8)).getNextStepKeyOnTrue(), equalTo(expectedTwelveKey));
|
||||
assertThat(steps.get(8).getNextStepKey(), equalTo(expectedTenthKey));
|
||||
assertThat(((CopyExecutionStateStep) steps.get(8)).getTargetIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
|
||||
|
||||
assertTrue(steps.get(9) instanceof ShrinkSetAliasStep);
|
||||
assertTrue(steps.get(9) instanceof BranchingStep);
|
||||
assertThat(steps.get(9).getKey(), equalTo(expectedTenthKey));
|
||||
assertThat(steps.get(9).getNextStepKey(), equalTo(expectedEleventhKey));
|
||||
assertThat(((ShrinkSetAliasStep) steps.get(9)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
|
||||
expectThrows(IllegalStateException.class, () -> steps.get(9).getNextStepKey());
|
||||
assertThat(((BranchingStep) steps.get(9)).getNextStepKeyOnFalse(), equalTo(expectedEleventhKey));
|
||||
assertThat(((BranchingStep) steps.get(9)).getNextStepKeyOnTrue(), equalTo(expectedThirteenKey));
|
||||
|
||||
assertTrue(steps.get(10) instanceof ShrunkenIndexCheckStep);
|
||||
assertTrue(steps.get(10) instanceof ShrinkSetAliasStep);
|
||||
assertThat(steps.get(10).getKey(), equalTo(expectedEleventhKey));
|
||||
assertThat(steps.get(10).getNextStepKey(), equalTo(nextStepKey));
|
||||
assertThat(((ShrunkenIndexCheckStep) steps.get(10)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
|
||||
assertThat(steps.get(10).getNextStepKey(), equalTo(expectedTwelveKey));
|
||||
assertThat(((ShrinkSetAliasStep) steps.get(10)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
|
||||
|
||||
assertTrue(steps.get(11) instanceof ReplaceDataStreamBackingIndexStep);
|
||||
assertTrue(steps.get(11) instanceof ShrunkenIndexCheckStep);
|
||||
assertThat(steps.get(11).getKey(), equalTo(expectedTwelveKey));
|
||||
assertThat(steps.get(11).getNextStepKey(), equalTo(expectedThirteenKey));
|
||||
assertThat(((ReplaceDataStreamBackingIndexStep) steps.get(11)).getTargetIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
|
||||
assertThat(steps.get(11).getNextStepKey(), equalTo(nextStepKey));
|
||||
assertThat(((ShrunkenIndexCheckStep) steps.get(11)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
|
||||
|
||||
assertTrue(steps.get(12) instanceof DeleteStep);
|
||||
assertTrue(steps.get(12) instanceof ReplaceDataStreamBackingIndexStep);
|
||||
assertThat(steps.get(12).getKey(), equalTo(expectedThirteenKey));
|
||||
assertThat(steps.get(12).getNextStepKey(), equalTo(expectedEleventhKey));
|
||||
assertThat(steps.get(12).getNextStepKey(), equalTo(expectedFourteenKey));
|
||||
assertThat(((ReplaceDataStreamBackingIndexStep) steps.get(12)).getTargetIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
|
||||
|
||||
assertTrue(steps.get(13) instanceof DeleteStep);
|
||||
assertThat(steps.get(13).getKey(), equalTo(expectedFourteenKey));
|
||||
assertThat(steps.get(13).getNextStepKey(), equalTo(expectedTwelveKey));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.ilm.Step;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
@ -187,4 +188,20 @@ public final class TimeSeriesRestDriver {
|
|||
.endObject()));
|
||||
client.performRequest(request);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static Map<String, Object> getOnlyIndexSettings(RestClient client, String index) throws IOException {
|
||||
Request request = new Request("GET", "/" + index + "/_settings");
|
||||
request.addParameter("flat_settings", "true");
|
||||
Response response = client.performRequest(request);
|
||||
try (InputStream is = response.getEntity().getContent()) {
|
||||
Map<String, Object> responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
|
||||
Map<String, Object> indexSettings = (Map<String, Object>) responseMap.get(index);
|
||||
if (indexSettings == null) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
return (Map<String, Object>) indexSettings.get("settings");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -12,14 +12,20 @@ import org.elasticsearch.cluster.metadata.Template;
|
|||
import org.elasticsearch.common.compress.CompressedXContent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
import org.elasticsearch.xpack.core.ilm.CheckNotDataStreamWriteIndexStep;
|
||||
import org.elasticsearch.xpack.core.ilm.ForceMergeAction;
|
||||
import org.elasticsearch.xpack.core.ilm.FreezeAction;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
|
||||
import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
|
||||
import org.elasticsearch.xpack.core.ilm.RolloverAction;
|
||||
import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction;
|
||||
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createComposableTemplate;
|
||||
|
@ -27,32 +33,29 @@ import static org.elasticsearch.xpack.TimeSeriesRestDriver.createFullPolicy;
|
|||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy;
|
||||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createSnapshotRepo;
|
||||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.explainIndex;
|
||||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.getOnlyIndexSettings;
|
||||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex;
|
||||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument;
|
||||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.rolloverMaxOneDocCondition;
|
||||
import static org.elasticsearch.xpack.core.ilm.ShrinkAction.CONDITIONAL_SKIP_SHRINK_STEP;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class TimeSeriesDataStreamsIT extends ESRestTestCase {
|
||||
|
||||
private static final String FAILED_STEP_RETRY_COUNT_FIELD = "failed_step_retry_count";
|
||||
public static final String TIMESTAMP_MAPPING = "{\n" +
|
||||
" \"properties\": {\n" +
|
||||
" \"@timestamp\": {\n" +
|
||||
" \"type\": \"date\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }";
|
||||
|
||||
public void testRolloverAction() throws Exception {
|
||||
String policyName = "logs-policy";
|
||||
createNewSingletonPolicy(client(), policyName, "hot", new RolloverAction(null, null, 1L));
|
||||
|
||||
String mapping = "{\n" +
|
||||
" \"properties\": {\n" +
|
||||
" \"@timestamp\": {\n" +
|
||||
" \"type\": \"date\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }";
|
||||
Settings lifecycleNameSetting = Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policyName).build();
|
||||
Template template = new Template(lifecycleNameSetting, new CompressedXContent(mapping), null);
|
||||
createComposableTemplate(client(), "logs-template", "logs-foo*", template);
|
||||
createComposableTemplate(client(), "logs-template", "logs-foo*", getTemplate(policyName));
|
||||
|
||||
String dataStream = "logs-foo";
|
||||
indexDocument(client(), dataStream, true);
|
||||
|
@ -68,19 +71,7 @@ public class TimeSeriesDataStreamsIT extends ESRestTestCase {
|
|||
String policyName = "logs-policy";
|
||||
createNewSingletonPolicy(client(), policyName, "warm", new ShrinkAction(1));
|
||||
|
||||
String mapping = "{\n" +
|
||||
" \"properties\": {\n" +
|
||||
" \"@timestamp\": {\n" +
|
||||
" \"type\": \"date\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }";
|
||||
Settings settings = Settings.builder()
|
||||
.put(LifecycleSettings.LIFECYCLE_NAME, policyName)
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3)
|
||||
.build();
|
||||
Template template = new Template(settings, new CompressedXContent(mapping), null);
|
||||
createComposableTemplate(client(), "logs-template", "logs-foo*", template);
|
||||
createComposableTemplate(client(), "logs-template", "logs-foo*", getTemplate(policyName));
|
||||
|
||||
String dataStream = "logs-foo";
|
||||
indexDocument(client(), dataStream, true);
|
||||
|
@ -88,9 +79,8 @@ public class TimeSeriesDataStreamsIT extends ESRestTestCase {
|
|||
String backingIndexName = DataStream.getDefaultBackingIndexName(dataStream, 1);
|
||||
String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + backingIndexName;
|
||||
assertBusy(() -> assertThat(
|
||||
"original index must wait in the " + CONDITIONAL_SKIP_SHRINK_STEP + " until it is not the write index anymore",
|
||||
(Integer) explainIndex(client(), backingIndexName).get(FAILED_STEP_RETRY_COUNT_FIELD), greaterThanOrEqualTo(1)),
|
||||
30, TimeUnit.SECONDS);
|
||||
"original index must wait in the " + CheckNotDataStreamWriteIndexStep.NAME + " until it is not the write index anymore",
|
||||
explainIndex(client(), backingIndexName).get("step"), is(CheckNotDataStreamWriteIndexStep.NAME)), 30, TimeUnit.SECONDS);
|
||||
|
||||
// Manual rollover the original index such that it's not the write index in the data stream anymore
|
||||
rolloverMaxOneDocCondition(client(), dataStream);
|
||||
|
@ -104,19 +94,7 @@ public class TimeSeriesDataStreamsIT extends ESRestTestCase {
|
|||
String policyName = "logs-policy";
|
||||
createFullPolicy(client(), policyName, TimeValue.ZERO);
|
||||
|
||||
String mapping = "{\n" +
|
||||
" \"properties\": {\n" +
|
||||
" \"@timestamp\": {\n" +
|
||||
" \"type\": \"date\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }";
|
||||
Settings settings = Settings.builder()
|
||||
.put(LifecycleSettings.LIFECYCLE_NAME, policyName)
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3)
|
||||
.build();
|
||||
Template template = new Template(settings, new CompressedXContent(mapping), null);
|
||||
createComposableTemplate(client(), "logs-template", "logs-foo*", template);
|
||||
createComposableTemplate(client(), "logs-template", "logs-foo*", getTemplate(policyName));
|
||||
|
||||
String dataStream = "logs-foo";
|
||||
indexDocument(client(), dataStream, true);
|
||||
|
@ -137,19 +115,7 @@ public class TimeSeriesDataStreamsIT extends ESRestTestCase {
|
|||
String policyName = "logs-policy";
|
||||
createNewSingletonPolicy(client(), policyName, "cold", new SearchableSnapshotAction(snapshotRepo));
|
||||
|
||||
String mapping = "{\n" +
|
||||
" \"properties\": {\n" +
|
||||
" \"@timestamp\": {\n" +
|
||||
" \"type\": \"date\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }";
|
||||
Settings settings = Settings.builder()
|
||||
.put(LifecycleSettings.LIFECYCLE_NAME, policyName)
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3)
|
||||
.build();
|
||||
Template template = new Template(settings, new CompressedXContent(mapping), null);
|
||||
createComposableTemplate(client(), "logs-template", "logs-foo*", template);
|
||||
createComposableTemplate(client(), "logs-template", "logs-foo*", getTemplate(policyName));
|
||||
String dataStream = "logs-foo";
|
||||
indexDocument(client(), dataStream, true);
|
||||
|
||||
|
@ -169,4 +135,85 @@ public class TimeSeriesDataStreamsIT extends ESRestTestCase {
|
|||
assertBusy(() -> assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME)), 30,
|
||||
TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public void testReadOnlyAction() throws Exception {
|
||||
String policyName = "logs-policy";
|
||||
createNewSingletonPolicy(client(), policyName, "warm", new ReadOnlyAction());
|
||||
|
||||
createComposableTemplate(client(), "logs-template", "logs-foo*", getTemplate(policyName));
|
||||
String dataStream = "logs-foo";
|
||||
indexDocument(client(), dataStream, true);
|
||||
|
||||
String backingIndexName = DataStream.getDefaultBackingIndexName(dataStream, 1);
|
||||
assertBusy(() -> assertThat(
|
||||
"index must wait in the " + CheckNotDataStreamWriteIndexStep.NAME + " until it is not the write index anymore",
|
||||
explainIndex(client(), backingIndexName).get("step"), is(CheckNotDataStreamWriteIndexStep.NAME)),
|
||||
30, TimeUnit.SECONDS);
|
||||
|
||||
// Manual rollover the original index such that it's not the write index in the data stream anymore
|
||||
rolloverMaxOneDocCondition(client(), dataStream);
|
||||
|
||||
assertBusy(() -> assertThat(explainIndex(client(), backingIndexName).get("step"), is(PhaseCompleteStep.NAME)), 30,
|
||||
TimeUnit.SECONDS);
|
||||
assertThat(getOnlyIndexSettings(client(), backingIndexName).get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey()),
|
||||
equalTo("true"));
|
||||
}
|
||||
|
||||
public void testFreezeAction() throws Exception {
|
||||
String policyName = "logs-policy";
|
||||
createNewSingletonPolicy(client(), policyName, "cold", new FreezeAction());
|
||||
|
||||
createComposableTemplate(client(), "logs-template", "logs-foo*", getTemplate(policyName));
|
||||
String dataStream = "logs-foo";
|
||||
indexDocument(client(), dataStream, true);
|
||||
|
||||
String backingIndexName = DataStream.getDefaultBackingIndexName(dataStream, 1);
|
||||
assertBusy(() -> assertThat(
|
||||
"index must wait in the " + CheckNotDataStreamWriteIndexStep.NAME + " until it is not the write index anymore",
|
||||
explainIndex(client(), backingIndexName).get("step"), is(CheckNotDataStreamWriteIndexStep.NAME)),
|
||||
30, TimeUnit.SECONDS);
|
||||
|
||||
// Manual rollover the original index such that it's not the write index in the data stream anymore
|
||||
rolloverMaxOneDocCondition(client(), dataStream);
|
||||
|
||||
assertBusy(() -> assertThat(explainIndex(client(), backingIndexName).get("step"), is(PhaseCompleteStep.NAME)), 30,
|
||||
TimeUnit.SECONDS);
|
||||
|
||||
Map<String, Object> settings = getOnlyIndexSettings(client(), backingIndexName);
|
||||
assertThat(settings.get(IndexMetadata.SETTING_BLOCKS_WRITE), equalTo("true"));
|
||||
assertThat(settings.get(IndexSettings.INDEX_SEARCH_THROTTLED.getKey()), equalTo("true"));
|
||||
assertThat(settings.get("index.frozen"), equalTo("true"));
|
||||
}
|
||||
|
||||
public void testForceMergeAction() throws Exception {
|
||||
String policyName = "logs-policy";
|
||||
createNewSingletonPolicy(client(), policyName, "warm", new ForceMergeAction(1, null));
|
||||
|
||||
createComposableTemplate(client(), "logs-template", "logs-foo*", getTemplate(policyName));
|
||||
String dataStream = "logs-foo";
|
||||
indexDocument(client(), dataStream, true);
|
||||
|
||||
String backingIndexName = DataStream.getDefaultBackingIndexName(dataStream, 1);
|
||||
assertBusy(() -> assertThat(
|
||||
"index must wait in the " + CheckNotDataStreamWriteIndexStep.NAME + " until it is not the write index anymore",
|
||||
explainIndex(client(), backingIndexName).get("step"), is(CheckNotDataStreamWriteIndexStep.NAME)),
|
||||
30, TimeUnit.SECONDS);
|
||||
|
||||
// Manual rollover the original index such that it's not the write index in the data stream anymore
|
||||
rolloverMaxOneDocCondition(client(), dataStream);
|
||||
|
||||
assertBusy(() -> assertThat(explainIndex(client(), backingIndexName).get("step"), is(PhaseCompleteStep.NAME)), 30,
|
||||
TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private static Template getTemplate(String policyName) throws IOException {
|
||||
return new Template(getLifcycleSettings(policyName), new CompressedXContent(TIMESTAMP_MAPPING), null);
|
||||
}
|
||||
|
||||
private static Settings getLifcycleSettings(String policyName) {
|
||||
return Settings.builder()
|
||||
.put(LifecycleSettings.LIFECYCLE_NAME, policyName)
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,7 +57,6 @@ import org.junit.Before;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
@ -72,6 +71,7 @@ import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPol
|
|||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createSnapshotRepo;
|
||||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.explain;
|
||||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.explainIndex;
|
||||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.getOnlyIndexSettings;
|
||||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex;
|
||||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument;
|
||||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.rolloverMaxOneDocCondition;
|
||||
|
@ -247,13 +247,13 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
|
||||
assertBusy(() -> assertThat((Integer) explainIndex(client(), index).get(FAILED_STEP_RETRY_COUNT_FIELD), greaterThanOrEqualTo(1)),
|
||||
30, TimeUnit.SECONDS);
|
||||
assertFalse(getOnlyIndexSettings(index).containsKey("index.frozen"));
|
||||
assertFalse(getOnlyIndexSettings(client(), index).containsKey("index.frozen"));
|
||||
|
||||
Request request = new Request("PUT", index + "/_settings");
|
||||
request.setJsonEntity("{\"index.blocks.read_only\":false}");
|
||||
assertOK(client().performRequest(request));
|
||||
|
||||
assertBusy(() -> assertThat(getOnlyIndexSettings(index).get("index.frozen"), equalTo("true")));
|
||||
assertBusy(() -> assertThat(getOnlyIndexSettings(client(), index).get("index.frozen"), equalTo("true")));
|
||||
}
|
||||
|
||||
public void testRetryFailedShrinkAction() throws Exception {
|
||||
|
@ -283,7 +283,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
assertBusy(() -> assertTrue(aliasExists(shrunkenIndex, index)));
|
||||
assertBusy(() -> assertThat(getStepKeyForIndex(client(), shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey())));
|
||||
assertBusy(() -> {
|
||||
Map<String, Object> settings = getOnlyIndexSettings(shrunkenIndex);
|
||||
Map<String, Object> settings = getOnlyIndexSettings(client(), shrunkenIndex);
|
||||
assertThat(settings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS), equalTo(String.valueOf(expectedFinalShards)));
|
||||
assertThat(settings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true"));
|
||||
assertThat(settings.get(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id"), nullValue());
|
||||
|
@ -306,7 +306,8 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
index(client(), originalIndex, "_id", "foo", "bar");
|
||||
assertBusy(() -> assertTrue(indexExists(secondIndex)));
|
||||
assertBusy(() -> assertTrue(indexExists(originalIndex)));
|
||||
assertBusy(() -> assertEquals("true", getOnlyIndexSettings(originalIndex).get(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE)));
|
||||
assertBusy(() -> assertEquals("true",
|
||||
getOnlyIndexSettings(client(), originalIndex).get(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE)));
|
||||
}
|
||||
|
||||
public void testRolloverActionWithIndexingComplete() throws Exception {
|
||||
|
@ -346,7 +347,8 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
assertBusy(() -> assertThat(getStepKeyForIndex(client(), originalIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey())));
|
||||
assertBusy(() -> assertTrue(indexExists(originalIndex)));
|
||||
assertBusy(() -> assertFalse(indexExists(secondIndex)));
|
||||
assertBusy(() -> assertEquals("true", getOnlyIndexSettings(originalIndex).get(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE)));
|
||||
assertBusy(() -> assertEquals("true",
|
||||
getOnlyIndexSettings(client(), originalIndex).get(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE)));
|
||||
}
|
||||
|
||||
public void testAllocateOnlyAllocation() throws Exception {
|
||||
|
@ -374,7 +376,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
createNewSingletonPolicy(client(), policy, endPhase, allocateAction);
|
||||
updatePolicy(index, policy);
|
||||
assertBusy(() -> {
|
||||
Map<String, Object> settings = getOnlyIndexSettings(index);
|
||||
Map<String, Object> settings = getOnlyIndexSettings(client(), index);
|
||||
assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep(endPhase).getKey()));
|
||||
assertThat(settings.get(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey()), equalTo(String.valueOf(finalNumReplicas)));
|
||||
});
|
||||
|
@ -471,7 +473,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
updatePolicy(index, policy);
|
||||
assertBusy(() -> {
|
||||
assertThat(getStepKeyForIndex(client(), index).getAction(), equalTo("complete"));
|
||||
Map<String, Object> settings = getOnlyIndexSettings(index);
|
||||
Map<String, Object> settings = getOnlyIndexSettings(client(), index);
|
||||
assertThat(settings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey()), not("true"));
|
||||
});
|
||||
indexDocument(client(), index);
|
||||
|
@ -522,7 +524,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
createNewSingletonPolicy(client(), policy, "warm", new ReadOnlyAction());
|
||||
updatePolicy(index, policy);
|
||||
assertBusy(() -> {
|
||||
Map<String, Object> settings = getOnlyIndexSettings(index);
|
||||
Map<String, Object> settings = getOnlyIndexSettings(client(), index);
|
||||
assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep("warm").getKey()));
|
||||
assertThat(settings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true"));
|
||||
});
|
||||
|
@ -557,7 +559,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
|
||||
assertBusy(() -> {
|
||||
assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep("warm").getKey()));
|
||||
Map<String, Object> settings = getOnlyIndexSettings(index);
|
||||
Map<String, Object> settings = getOnlyIndexSettings(client(), index);
|
||||
assertThat(numSegments.get(), equalTo(1));
|
||||
assertThat(settings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true"));
|
||||
});
|
||||
|
@ -581,7 +583,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
assertBusy(() -> assertTrue(aliasExists(shrunkenIndex, index)));
|
||||
assertBusy(() -> assertThat(getStepKeyForIndex(client(), shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey())));
|
||||
assertBusy(() -> {
|
||||
Map<String, Object> settings = getOnlyIndexSettings(shrunkenIndex);
|
||||
Map<String, Object> settings = getOnlyIndexSettings(client(), shrunkenIndex);
|
||||
assertThat(settings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS), equalTo(String.valueOf(expectedFinalShards)));
|
||||
assertThat(settings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true"));
|
||||
assertThat(settings.get(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id"), nullValue());
|
||||
|
@ -600,7 +602,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
assertTrue(indexExists(index));
|
||||
assertFalse(indexExists(shrunkenIndex));
|
||||
assertFalse(aliasExists(shrunkenIndex, index));
|
||||
Map<String, Object> settings = getOnlyIndexSettings(index);
|
||||
Map<String, Object> settings = getOnlyIndexSettings(client(), index);
|
||||
assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep("warm").getKey()));
|
||||
assertThat(settings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS), equalTo(String.valueOf(numberOfShards)));
|
||||
assertNull(settings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey()));
|
||||
|
@ -644,7 +646,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
assertBusy(() -> {
|
||||
assertTrue(indexExists(shrunkenIndex));
|
||||
assertTrue(aliasExists(shrunkenIndex, index));
|
||||
Map<String, Object> settings = getOnlyIndexSettings(shrunkenIndex);
|
||||
Map<String, Object> settings = getOnlyIndexSettings(client(), shrunkenIndex);
|
||||
assertThat(getStepKeyForIndex(client(), shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey()));
|
||||
assertThat(settings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS), equalTo(String.valueOf(1)));
|
||||
assertThat(settings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true"));
|
||||
|
@ -720,7 +722,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
createNewSingletonPolicy(client(), policy, "cold", new FreezeAction());
|
||||
updatePolicy(index, policy);
|
||||
assertBusy(() -> {
|
||||
Map<String, Object> settings = getOnlyIndexSettings(index);
|
||||
Map<String, Object> settings = getOnlyIndexSettings(client(), index);
|
||||
assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep("cold").getKey()));
|
||||
assertThat(settings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true"));
|
||||
assertThat(settings.get(IndexSettings.INDEX_SEARCH_THROTTLED.getKey()), equalTo("true"));
|
||||
|
@ -758,7 +760,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
updatePolicy(index, policy);
|
||||
// assert that the index froze
|
||||
assertBusy(() -> {
|
||||
Map<String, Object> settings = getOnlyIndexSettings(index);
|
||||
Map<String, Object> settings = getOnlyIndexSettings(client(), index);
|
||||
assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep("cold").getKey()));
|
||||
assertThat(settings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true"));
|
||||
assertThat(settings.get(IndexSettings.INDEX_SEARCH_THROTTLED.getKey()), equalTo("true"));
|
||||
|
@ -779,7 +781,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
createNewSingletonPolicy(client(), policy, "warm", new SetPriorityAction(priority));
|
||||
updatePolicy(index, policy);
|
||||
assertBusy(() -> {
|
||||
Map<String, Object> settings = getOnlyIndexSettings(index);
|
||||
Map<String, Object> settings = getOnlyIndexSettings(client(), index);
|
||||
assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep("warm").getKey()));
|
||||
assertThat(settings.get(IndexMetadata.INDEX_PRIORITY_SETTING.getKey()), equalTo(String.valueOf(priority)));
|
||||
});
|
||||
|
@ -791,7 +793,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
createNewSingletonPolicy(client(), policy, "warm", new SetPriorityAction((Integer) null));
|
||||
updatePolicy(index, policy);
|
||||
assertBusy(() -> {
|
||||
Map<String, Object> settings = getOnlyIndexSettings(index);
|
||||
Map<String, Object> settings = getOnlyIndexSettings(client(), index);
|
||||
assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep("warm").getKey()));
|
||||
assertNull(settings.get(IndexMetadata.INDEX_PRIORITY_SETTING.getKey()));
|
||||
});
|
||||
|
@ -1845,15 +1847,6 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
assertOK(client.performRequest(request));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Map<String, Object> getOnlyIndexSettings(String index) throws IOException {
|
||||
Map<String, Object> response = (Map<String, Object>) getIndexSettings(index).get(index);
|
||||
if (response == null) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
return (Map<String, Object>) response.get("settings");
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private String getFailedStepForIndex(String indexName) throws IOException {
|
||||
Map<String, Object> indexResponse = explainIndex(client(), indexName);
|
||||
|
|
|
@ -18,6 +18,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
|||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.core.ilm.AllocateAction;
|
||||
import org.elasticsearch.xpack.core.ilm.AllocationRoutedStep;
|
||||
import org.elasticsearch.xpack.core.ilm.CheckNotDataStreamWriteIndexStep;
|
||||
import org.elasticsearch.xpack.core.ilm.ErrorStep;
|
||||
import org.elasticsearch.xpack.core.ilm.ForceMergeAction;
|
||||
import org.elasticsearch.xpack.core.ilm.FreezeAction;
|
||||
|
@ -159,9 +160,12 @@ public class TransportPutLifecycleActionTests extends ESTestCase {
|
|||
logger.info("--> phaseDef: {}", phaseDef);
|
||||
|
||||
assertThat(TransportPutLifecycleAction.readStepKeys(REGISTRY, client, phaseDef, "phase"),
|
||||
contains(new Step.StepKey("phase", "freeze", FreezeAction.NAME),
|
||||
contains(
|
||||
new Step.StepKey("phase", "freeze", CheckNotDataStreamWriteIndexStep.NAME),
|
||||
new Step.StepKey("phase", "freeze", FreezeAction.NAME),
|
||||
new Step.StepKey("phase", "allocate", AllocateAction.NAME),
|
||||
new Step.StepKey("phase", "allocate", AllocationRoutedStep.NAME),
|
||||
new Step.StepKey("phase", "forcemerge", CheckNotDataStreamWriteIndexStep.NAME),
|
||||
new Step.StepKey("phase", "forcemerge", ReadOnlyAction.NAME),
|
||||
new Step.StepKey("phase", "forcemerge", ForceMergeAction.NAME),
|
||||
new Step.StepKey("phase", "forcemerge", SegmentCountStep.NAME)));
|
||||
|
|
Loading…
Reference in New Issue