[7.x] ILM add data stream support to searchable snapshot action (#57873) (#57916)

(cherry picked from commit 34856a90532c6c62a53817bb395399c8a8c17c0f)
Signed-off-by: Andrei Dan <andrei.dan@elastic.co>
This commit is contained in:
Andrei Dan 2020-06-10 10:16:57 +01:00 committed by GitHub
parent 80f221e920
commit 9f280621ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 162 additions and 34 deletions

View File

@ -5,17 +5,23 @@
*/
package org.elasticsearch.xpack.core.ilm;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import java.util.Locale;
/**
* Deletes a single index.
*/
public class DeleteStep extends AsyncRetryDuringSnapshotActionStep {
public static final String NAME = "delete";
private static final Logger logger = LogManager.getLogger(DeleteStep.class);
public DeleteStep(StepKey key, StepKey nextStepKey, Client client) {
super(key, nextStepKey, client);
@ -23,8 +29,26 @@ public class DeleteStep extends AsyncRetryDuringSnapshotActionStep {
@Override
public void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentState, Listener listener) {
String policyName = indexMetadata.getSettings().get(LifecycleSettings.LIFECYCLE_NAME);
String indexName = indexMetadata.getIndex().getName();
IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(indexName);
assert indexAbstraction != null : "invalid cluster metadata. index [" + indexName + "] was not found";
IndexAbstraction.DataStream dataStream = indexAbstraction.getParentDataStream();
if (dataStream != null) {
assert dataStream.getWriteIndex() != null : dataStream.getName() + " has no write index";
if (dataStream.getWriteIndex().getIndex().getName().equals(indexName)) {
String errorMessage = String.format(Locale.ROOT, "index [%s] is the write index for data stream [%s]. " +
"stopping execution of lifecycle [%s] as a data stream's write index cannot be deleted. manually rolling over the" +
" index will resume the execution of the policy as the index will not be the data stream's write index anymore",
indexName, dataStream.getName(), policyName);
logger.debug(errorMessage);
throw new IllegalStateException(errorMessage);
}
}
getClient().admin().indices()
.delete(new DeleteIndexRequest(indexMetadata.getIndex().getName()).masterNodeTimeout(getMasterTimeout(currentState)),
.delete(new DeleteIndexRequest(indexName).masterNodeTimeout(getMasterTimeout(currentState)),
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.ilm;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -30,6 +31,7 @@ public class SearchableSnapshotAction implements LifecycleAction {
public static final String NAME = "searchable_snapshot";
public static final ParseField SNAPSHOT_REPOSITORY = new ParseField("snapshot_repository");
public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep.NAME + "-on-datastream-check";
public static final String RESTORED_INDEX_PREFIX = "restored-";
@ -66,8 +68,11 @@ public class SearchableSnapshotAction implements LifecycleAction {
StepKey mountSnapshotKey = new StepKey(phase, NAME, MountSnapshotStep.NAME);
StepKey waitForGreenRestoredIndexKey = new StepKey(phase, NAME, WaitForIndexColorStep.NAME);
StepKey copyMetadataKey = new StepKey(phase, NAME, CopyExecutionStateStep.NAME);
StepKey dataStreamCheckBranchingKey = new StepKey(phase, NAME, CONDITIONAL_DATASTREAM_CHECK_KEY);
StepKey copyLifecyclePolicySettingKey = new StepKey(phase, NAME, CopySettingsStep.NAME);
StepKey swapAliasesKey = new StepKey(phase, NAME, SwapAliasesAndDeleteSourceIndexStep.NAME);
StepKey replaceDataStreamIndexKey = new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME);
StepKey deleteIndexKey = new StepKey(phase, NAME, DeleteStep.NAME);
WaitForNoFollowersStep waitForNoFollowersStep = new WaitForNoFollowersStep(waitForNoFollowerStepKey, generateSnapshotNameKey,
client);
@ -84,15 +89,25 @@ public class SearchableSnapshotAction implements LifecycleAction {
// case
CopyExecutionStateStep copyMetadataStep = new CopyExecutionStateStep(copyMetadataKey, copyLifecyclePolicySettingKey,
RESTORED_INDEX_PREFIX, nextStepKey != null ? nextStepKey.getName() : "null");
CopySettingsStep copySettingsStep = new CopySettingsStep(copyLifecyclePolicySettingKey, swapAliasesKey, RESTORED_INDEX_PREFIX,
LifecycleSettings.LIFECYCLE_NAME);
CopySettingsStep copySettingsStep = new CopySettingsStep(copyLifecyclePolicySettingKey, dataStreamCheckBranchingKey,
RESTORED_INDEX_PREFIX, LifecycleSettings.LIFECYCLE_NAME);
BranchingStep isDataStreamBranchingStep = new BranchingStep(dataStreamCheckBranchingKey, swapAliasesKey, replaceDataStreamIndexKey,
(index, clusterState) -> {
IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(index.getName());
assert indexAbstraction != null : "invalid cluster metadata. index [" + index.getName() + "] was not found";
return indexAbstraction.getParentDataStream() != null;
});
ReplaceDataStreamBackingIndexStep replaceDataStreamBackingIndex = new ReplaceDataStreamBackingIndexStep(replaceDataStreamIndexKey,
deleteIndexKey, RESTORED_INDEX_PREFIX);
DeleteStep deleteSourceIndexStep = new DeleteStep(deleteIndexKey, null, client);
// sending this step to null as the restored index (which will after this step essentially be the source index) was sent to the next
// key after we restored the lifecycle execution state
SwapAliasesAndDeleteSourceIndexStep swapAliasesAndDeleteSourceIndexStep = new SwapAliasesAndDeleteSourceIndexStep(swapAliasesKey,
null, client, RESTORED_INDEX_PREFIX);
return Arrays.asList(waitForNoFollowersStep, generateSnapshotNameStep, cleanupSnapshotStep, createSnapshotBranchingStep,
mountSnapshotStep, waitForGreenIndexHealthStep, copyMetadataStep, copySettingsStep, swapAliasesAndDeleteSourceIndexStep);
mountSnapshotStep, waitForGreenIndexHealthStep, copyMetadataStep, copySettingsStep, isDataStreamBranchingStep,
replaceDataStreamBackingIndex, deleteSourceIndexStep, swapAliasesAndDeleteSourceIndexStep);
}
@Override

View File

@ -11,11 +11,15 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import org.mockito.Mockito;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class DeleteStepTests extends AbstractStepMasterTimeoutTestCase<DeleteStep> {
@ -78,7 +82,10 @@ public class DeleteStepTests extends AbstractStepMasterTimeoutTestCase<DeleteSte
SetOnce<Boolean> actionCompleted = new SetOnce<>();
DeleteStep step = createRandomInstance();
step.performAction(indexMetadata, emptyClusterState(), null, new AsyncActionStep.Listener() {
ClusterState clusterState = ClusterState.builder(emptyClusterState()).metadata(
Metadata.builder().put(indexMetadata, true).build()
).build();
step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() {
@Override
public void onResponse(boolean complete) {
actionCompleted.set(complete);
@ -114,7 +121,10 @@ public class DeleteStepTests extends AbstractStepMasterTimeoutTestCase<DeleteSte
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
DeleteStep step = createRandomInstance();
step.performAction(indexMetadata, emptyClusterState(), null, new AsyncActionStep.Listener() {
ClusterState clusterState = ClusterState.builder(emptyClusterState()).metadata(
Metadata.builder().put(indexMetadata, true).build()
).build();
step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() {
@Override
public void onResponse(boolean complete) {
throw new AssertionError("Unexpected method call");
@ -129,4 +139,36 @@ public class DeleteStepTests extends AbstractStepMasterTimeoutTestCase<DeleteSte
assertThat(exceptionThrown.get(), equalTo(true));
}
public void testPerformActionThrowsExceptionIfIndexIsTheDataStreamWriteIndex() {
String dataStreamName = randomAlphaOfLength(10);
String indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
String policyName = "test-ilm-policy";
IndexMetadata sourceIndexMetadata =
IndexMetadata.builder(indexName).settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
DataStream dataStream = new DataStream(dataStreamName, "timestamp",
org.elasticsearch.common.collect.List.of(sourceIndexMetadata.getIndex()));
ClusterState clusterState = ClusterState.builder(emptyClusterState()).metadata(
Metadata.builder().put(sourceIndexMetadata, true).put(dataStream).build()
).build();
IllegalStateException illegalStateException = expectThrows(IllegalStateException.class,
() -> createRandomInstance().performDuringNoSnapshot(sourceIndexMetadata, clusterState, new AsyncActionStep.Listener() {
@Override
public void onResponse(boolean complete) {
fail("unexpected listener callback");
}
@Override
public void onFailure(Exception e) {
fail("unexpected listener callback");
}
}));
assertThat(illegalStateException.getMessage(),
is("index [" + indexName + "] is the write index for data stream [" + dataStreamName + "]. stopping execution of lifecycle" +
" [test-ilm-policy] as a data stream's write index cannot be deleted. manually rolling over the index will resume the " +
"execution of the policy as the index will not be the data stream's write index anymore"));
}
}

View File

@ -28,13 +28,16 @@ public class SearchableSnapshotActionTests extends AbstractActionTestCase<Search
StepKey expectedSixthStep = new StepKey(phase, NAME, WaitForIndexColorStep.NAME);
StepKey expectedSeventhStep = new StepKey(phase, NAME, CopyExecutionStateStep.NAME);
StepKey expectedEighthStep = new StepKey(phase, NAME, CopySettingsStep.NAME);
StepKey expectedNinthStep = new StepKey(phase, NAME, SwapAliasesAndDeleteSourceIndexStep.NAME);
StepKey expectedNinthStep = new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_DATASTREAM_CHECK_KEY);
StepKey expectedTenthStep = new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME);
StepKey expectedElevenStep = new StepKey(phase, NAME, DeleteStep.NAME);
StepKey expectedTwelveStep = new StepKey(phase, NAME, SwapAliasesAndDeleteSourceIndexStep.NAME);
SearchableSnapshotAction action = createTestInstance();
StepKey nextStepKey = new StepKey(phase, randomAlphaOfLengthBetween(1, 5), randomAlphaOfLengthBetween(1, 5));
List<Step> steps = action.toSteps(null, phase, nextStepKey);
assertThat(steps.size(), is(9));
assertThat(steps.size(), is(12));
assertThat(steps.get(0).getKey(), is(expectedFirstStep));
assertThat(steps.get(1).getKey(), is(expectedSecondStep));
@ -45,6 +48,9 @@ public class SearchableSnapshotActionTests extends AbstractActionTestCase<Search
assertThat(steps.get(6).getKey(), is(expectedSeventhStep));
assertThat(steps.get(7).getKey(), is(expectedEighthStep));
assertThat(steps.get(8).getKey(), is(expectedNinthStep));
assertThat(steps.get(9).getKey(), is(expectedTenthStep));
assertThat(steps.get(10).getKey(), is(expectedElevenStep));
assertThat(steps.get(11).getKey(), is(expectedTwelveStep));
AsyncActionBranchingStep branchStep = (AsyncActionBranchingStep) steps.get(3);
assertThat(branchStep.getNextKeyOnIncompleteResponse(), is(expectedThirdStep));

View File

@ -20,6 +20,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.core.ilm.AllocateAction;
import org.elasticsearch.xpack.core.ilm.DeleteAction;
import org.elasticsearch.xpack.core.ilm.ForceMergeAction;
@ -39,6 +40,7 @@ import java.util.Map;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.ESTestCase.randomAlphaOfLengthBetween;
/**
* This class provides the operational REST functions needed to control an ILM time series lifecycle.
@ -169,4 +171,20 @@ public final class TimeSeriesRestDriver {
request.setEntity(entity);
client.performRequest(request);
}
public static void createSnapshotRepo(RestClient client, String repoName, boolean compress) throws IOException {
Request request = new Request("PUT", "/_snapshot/" + repoName);
request.setJsonEntity(Strings
.toString(JsonXContent.contentBuilder()
.startObject()
.field("type", "fs")
.startObject("settings")
.field("compress", compress)
//random location to avoid clash with other snapshots
.field("location", System.getProperty("tests.path.repo") + "/" + randomAlphaOfLengthBetween(4, 10))
.field("max_snapshot_bytes_per_sec", "100m")
.endObject()
.endObject()));
client.performRequest(request);
}
}

View File

@ -6,15 +6,17 @@
package org.elasticsearch.xpack.ilm;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
import org.elasticsearch.xpack.core.ilm.ReplaceDataStreamBackingIndexStep;
import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction;
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
import java.util.concurrent.TimeUnit;
@ -22,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createComposableTemplate;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createFullPolicy;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createSnapshotRepo;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.explainIndex;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument;
@ -105,4 +108,36 @@ public class TimeSeriesDataStreamsIT extends ESRestTestCase {
assertBusy(() -> assertFalse("the shrunken index was deleted by the delete action", indexExists(shrunkenIndex)),
30, TimeUnit.SECONDS);
}
public void testSearchableSnapshotAction() throws Exception {
String snapshotRepo = randomAlphaOfLengthBetween(5, 10);
createSnapshotRepo(client(), snapshotRepo, randomBoolean());
String policyName = "logs-policy";
createNewSingletonPolicy(client(), policyName, "cold", new SearchableSnapshotAction(snapshotRepo));
Settings settings = Settings.builder()
.put(LifecycleSettings.LIFECYCLE_NAME, policyName)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3)
.build();
Template template = new Template(settings, null, null);
createComposableTemplate(client(), "logs-template", "logs-foo*", template);
String dataStream = "logs-foo";
indexDocument(client(), dataStream, true);
String backingIndexName = DataStream.getDefaultBackingIndexName(dataStream, 1);
String restoredIndexName = SearchableSnapshotAction.RESTORED_INDEX_PREFIX + backingIndexName;
assertBusy(() -> assertThat(indexExists(restoredIndexName), is(true)));
assertBusy(() -> assertThat(
"original index must wait in the " + ReplaceDataStreamBackingIndexStep.NAME + " until it is not the write index anymore",
(Integer) explainIndex(client(), backingIndexName).get(FAILED_STEP_RETRY_COUNT_FIELD), greaterThanOrEqualTo(1)),
30, TimeUnit.SECONDS);
// Manual rollover the original index such that it's not the write index in the data stream anymore
rolloverMaxOneDocCondition(client(), dataStream);
assertBusy(() -> assertFalse(indexExists(backingIndexName)), 60, TimeUnit.SECONDS);
assertBusy(() -> assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME)), 30,
TimeUnit.SECONDS);
}
}

View File

@ -69,6 +69,7 @@ import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createFullPolicy;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createSnapshotRepo;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.explain;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.explainIndex;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex;
@ -391,8 +392,9 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
assertThat(indexILMState.get("failed_step"), is("wait-for-snapshot"));
}, slmPolicy);
String repo = createSnapshotRepo();
createSlmPolicy(slmPolicy, repo);
String snapshotRepo = randomAlphaOfLengthBetween(4, 10);
createSnapshotRepo(client(), snapshotRepo, randomBoolean());
createSlmPolicy(slmPolicy, snapshotRepo);
assertBusy( () -> {
Map<String, Object> indexILMState = explainIndex(client(), index);
@ -414,8 +416,9 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
String slmPolicy = randomAlphaOfLengthBetween(4, 10);
createNewSingletonPolicy(client(), policy, "delete", new WaitForSnapshotAction(slmPolicy));
String repo = createSnapshotRepo();
createSlmPolicy(slmPolicy, repo);
String snapshotRepo = randomAlphaOfLengthBetween(4, 10);
createSnapshotRepo(client(), snapshotRepo, randomBoolean());
createSlmPolicy(slmPolicy, snapshotRepo);
Request request = new Request("PUT", "/_slm/policy/" + slmPolicy + "/_execute");
assertOK(client().performRequest(request));
@ -1557,7 +1560,8 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
}
public void testSearchableSnapshotAction() throws Exception {
String snapshotRepo = createSnapshotRepo();
String snapshotRepo = randomAlphaOfLengthBetween(4, 10);
createSnapshotRepo(client(), snapshotRepo, randomBoolean());
createNewSingletonPolicy(client(), policy, "cold", new SearchableSnapshotAction(snapshotRepo));
createIndexWithSettings(index,
@ -1582,7 +1586,8 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/54433")
public void testDeleteActionDeletesSearchableSnapshot() throws Exception {
String snapshotRepo = createSnapshotRepo();
String snapshotRepo = randomAlphaOfLengthBetween(4, 10);
createSnapshotRepo(client(), snapshotRepo, randomBoolean());
// create policy with cold and delete phases
Map<String, LifecycleAction> coldActions =
@ -1638,7 +1643,8 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
@SuppressWarnings("unchecked")
public void testDeleteActionDoesntDeleteSearchableSnapshot() throws Exception {
String snapshotRepo = createSnapshotRepo();
String snapshotRepo = randomAlphaOfLengthBetween(4, 10);
createSnapshotRepo(client(), snapshotRepo, randomBoolean());
// create policy with cold and delete phases
Map<String, LifecycleAction> coldActions =
@ -1887,24 +1893,6 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
assertOK(client().performRequest(request));
}
private String createSnapshotRepo() throws IOException {
String repo = randomAlphaOfLengthBetween(4, 10);
Request request = new Request("PUT", "/_snapshot/" + repo);
request.setJsonEntity(Strings
.toString(JsonXContent.contentBuilder()
.startObject()
.field("type", "fs")
.startObject("settings")
.field("compress", randomBoolean())
//random location to avoid clash with other snapshots
.field("location", System.getProperty("tests.path.repo")+ "/" + randomAlphaOfLengthBetween(4, 10))
.field("max_snapshot_bytes_per_sec", "100m")
.endObject()
.endObject()));
assertOK(client().performRequest(request));
return repo;
}
//adds debug information for waitForSnapshot tests
private void assertBusy(CheckedRunnable<Exception> runnable, String slmPolicy) throws Exception {
assertBusy(() -> {