diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/DataStreamIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/DataStreamIT.java index 76a3b750c93..eeb85ddbaf9 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/DataStreamIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/DataStreamIT.java @@ -98,11 +98,11 @@ public class DataStreamIT extends ESIntegTestCase { } public void testBasicScenario() throws Exception { - createIndexTemplate("id1", "metrics-foo*", "@timestamp1"); + createIndexTemplate("id1", "@timestamp1", "metrics-foo*"); CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("metrics-foo"); client().admin().indices().createDataStream(createDataStreamRequest).get(); - createIndexTemplate("id2", "metrics-bar*", "@timestamp2"); + createIndexTemplate("id2", "@timestamp2", "metrics-bar*"); createDataStreamRequest = new CreateDataStreamAction.Request("metrics-bar"); client().admin().indices().createDataStream(createDataStreamRequest).get(); @@ -186,7 +186,7 @@ public class DataStreamIT extends ESIntegTestCase { } public void testOtherWriteOps() throws Exception { - createIndexTemplate("id", "metrics-foobar*", "@timestamp1"); + createIndexTemplate("id", "@timestamp1", "metrics-foobar*"); String dataStreamName = "metrics-foobar"; CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName); client().admin().indices().createDataStream(createDataStreamRequest).get(); @@ -351,8 +351,8 @@ public class DataStreamIT extends ESIntegTestCase { "[date, date_nanos], but instead found type [keyword]")); } - public void testDataStreamsResolvability() throws Exception { - createIndexTemplate("id", "logs-*", "ts"); + public void testResolvabilityOfDataStreamsInAPIs() throws Exception { + createIndexTemplate("id", "ts", "logs-*"); String dataStreamName = "logs-foobar"; CreateDataStreamAction.Request request = new CreateDataStreamAction.Request(dataStreamName); client().admin().indices().createDataStream(request).actionGet(); @@ -413,7 +413,7 @@ public class DataStreamIT extends ESIntegTestCase { } public void testCannotDeleteComposableTemplateUsedByDataStream() throws Exception { - createIndexTemplate("id", "metrics-foobar*", "@timestamp1"); + createIndexTemplate("id", "@timestamp1", "metrics-foobar*"); String dataStreamName = "metrics-foobar-baz"; CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName); client().admin().indices().createDataStream(createDataStreamRequest).get(); @@ -436,7 +436,7 @@ public class DataStreamIT extends ESIntegTestCase { } public void testAliasActionsFailOnDataStreams() throws Exception { - createIndexTemplate("id1", "metrics-foo*", "@timestamp1"); + createIndexTemplate("id1", "@timestamp1", "metrics-foo*"); String dataStreamName = "metrics-foo"; CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName); client().admin().indices().createDataStream(createDataStreamRequest).get(); @@ -449,7 +449,7 @@ public class DataStreamIT extends ESIntegTestCase { } public void testAliasActionsFailOnDataStreamBackingIndices() throws Exception { - createIndexTemplate("id1", "metrics-foo*", "@timestamp1"); + createIndexTemplate("id1", "@timestamp1", "metrics-foo*"); String dataStreamName = "metrics-foo"; CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName); client().admin().indices().createDataStream(createDataStreamRequest).get(); @@ -539,11 +539,11 @@ public class DataStreamIT extends ESIntegTestCase { "] matches a data stream, specify the corresponding concrete indices instead.")); } - static void createIndexTemplate(String id, String pattern, String timestampFieldName) throws IOException { + public static void createIndexTemplate(String id, String timestampFieldName, String... pattern) throws IOException { PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(id); request.indexTemplate( new ComposableIndexTemplate( - Collections.singletonList(pattern), + Arrays.asList(pattern), new Template(null, new CompressedXContent(MetadataCreateDataStreamServiceTests.generateMapping(timestampFieldName)), null), null, null, null, null, diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DataStreamsSnapshotsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DataStreamsSnapshotsIT.java new file mode 100644 index 00000000000..942b008427b --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DataStreamsSnapshotsIT.java @@ -0,0 +1,218 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.indices.DataStreamIT; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.junit.Before; + +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase { + + private static final String DS_BACKING_INDEX_NAME = DataStream.getDefaultBackingIndexName("ds", 1); + private static final String DS2_BACKING_INDEX_NAME = DataStream.getDefaultBackingIndexName("ds2", 1); + private static final Map DOCUMENT_SOURCE = Collections.singletonMap("@timestamp", 123); + public static final String REPO = "repo"; + public static final String SNAPSHOT = "snap"; + private Client client; + + private String id; + + @Before + public void setup() throws Exception { + client = client(); + Path location = randomRepoPath(); + createRepository(REPO, "fs", location); + + DataStreamIT.createIndexTemplate("t1", "@timestamp", "ds", "other-ds"); + + CreateDataStreamAction.Request request = new CreateDataStreamAction.Request("ds"); + AcknowledgedResponse response = client.admin().indices().createDataStream(request).get(); + assertTrue(response.isAcknowledged()); + + request = new CreateDataStreamAction.Request("other-ds"); + response = client.admin().indices().createDataStream(request).get(); + assertTrue(response.isAcknowledged()); + + IndexResponse indexResponse = client.prepareIndex("ds", "_doc") + .setOpType(DocWriteRequest.OpType.CREATE) + .setSource(DOCUMENT_SOURCE) + .get(); + assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); + id = indexResponse.getId(); + } + + public void testSnapshotAndRestore() throws Exception { + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster() + .prepareCreateSnapshot(REPO, SNAPSHOT) + .setWaitForCompletion(true) + .setIndices("ds") + .setIncludeGlobalState(false) + .get(); + + RestStatus status = createSnapshotResponse.getSnapshotInfo().status(); + assertEquals(RestStatus.OK, status); + + GetSnapshotsResponse snapshot = client.admin().cluster().prepareGetSnapshots(REPO).setSnapshots(SNAPSHOT).get(); + List snap = snapshot.getSnapshots(); + assertEquals(1, snap.size()); + assertEquals(Collections.singletonList(DS_BACKING_INDEX_NAME), snap.get(0).indices()); + + assertTrue(client.admin().indices().deleteDataStream(new DeleteDataStreamAction.Request("ds")).get().isAcknowledged()); + + RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster() + .prepareRestoreSnapshot(REPO, SNAPSHOT) + .setWaitForCompletion(true) + .setIndices("ds") + .get(); + + assertEquals(1, restoreSnapshotResponse.getRestoreInfo().successfulShards()); + + assertEquals(DOCUMENT_SOURCE, client.prepareGet(DS_BACKING_INDEX_NAME, "_doc", id).get().getSourceAsMap()); + SearchHit[] hits = client.prepareSearch("ds").get().getHits().getHits(); + assertEquals(1, hits.length); + assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap()); + + GetDataStreamAction.Response ds = client.admin().indices().getDataStreams(new GetDataStreamAction.Request("ds")).get(); + assertEquals(1, ds.getDataStreams().size()); + assertEquals(1, ds.getDataStreams().get(0).getIndices().size()); + assertEquals(DS_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getIndices().get(0).getName()); + } + + public void testRename() throws Exception { + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster() + .prepareCreateSnapshot(REPO, SNAPSHOT) + .setWaitForCompletion(true) + .setIndices("ds") + .setIncludeGlobalState(false) + .get(); + + RestStatus status = createSnapshotResponse.getSnapshotInfo().status(); + assertEquals(RestStatus.OK, status); + + expectThrows(SnapshotRestoreException.class, () -> client.admin().cluster() + .prepareRestoreSnapshot(REPO, SNAPSHOT) + .setWaitForCompletion(true) + .setIndices("ds") + .get()); + + client.admin().cluster() + .prepareRestoreSnapshot(REPO, SNAPSHOT) + .setWaitForCompletion(true) + .setIndices("ds") + .setRenamePattern("ds") + .setRenameReplacement("ds2") + .get(); + + GetDataStreamAction.Response ds = client.admin().indices().getDataStreams(new GetDataStreamAction.Request("ds2")).get(); + assertEquals(1, ds.getDataStreams().size()); + assertEquals(1, ds.getDataStreams().get(0).getIndices().size()); + assertEquals(DS2_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getIndices().get(0).getName()); + assertEquals(DOCUMENT_SOURCE, client.prepareSearch("ds2").get().getHits().getHits()[0].getSourceAsMap()); + assertEquals(DOCUMENT_SOURCE, client.prepareGet(DS2_BACKING_INDEX_NAME, "_doc", id).get().getSourceAsMap()); + } + + public void testWildcards() throws Exception { + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster() + .prepareCreateSnapshot(REPO, "snap2") + .setWaitForCompletion(true) + .setIndices("d*") + .setIncludeGlobalState(false) + .get(); + + RestStatus status = createSnapshotResponse.getSnapshotInfo().status(); + assertEquals(RestStatus.OK, status); + + RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster() + .prepareRestoreSnapshot(REPO, "snap2") + .setWaitForCompletion(true) + .setIndices("d*") + .setRenamePattern("ds") + .setRenameReplacement("ds2") + .get(); + + assertEquals(RestStatus.OK, restoreSnapshotResponse.status()); + + GetDataStreamAction.Response ds = client.admin().indices().getDataStreams(new GetDataStreamAction.Request("ds2")).get(); + assertEquals(1, ds.getDataStreams().size()); + assertEquals(1, ds.getDataStreams().get(0).getIndices().size()); + assertEquals(DS2_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getIndices().get(0).getName()); + } + + public void testDataStreamNotStoredWhenIndexRequested() throws Exception { + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster() + .prepareCreateSnapshot(REPO, "snap2") + .setWaitForCompletion(true) + .setIndices(DS_BACKING_INDEX_NAME) + .setIncludeGlobalState(false) + .get(); + + RestStatus status = createSnapshotResponse.getSnapshotInfo().status(); + assertEquals(RestStatus.OK, status); + expectThrows(Exception.class, () -> client.admin().cluster() + .prepareRestoreSnapshot(REPO, "snap2") + .setWaitForCompletion(true) + .setIndices("ds") + .get()); + } + + public void testDataStreamNotRestoredWhenIndexRequested() throws Exception { + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster() + .prepareCreateSnapshot(REPO, "snap2") + .setWaitForCompletion(true) + .setIndices("ds") + .setIncludeGlobalState(false) + .get(); + + RestStatus status = createSnapshotResponse.getSnapshotInfo().status(); + assertEquals(RestStatus.OK, status); + + assertTrue(client.admin().indices().deleteDataStream(new DeleteDataStreamAction.Request("ds")).get().isAcknowledged()); + + RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster() + .prepareRestoreSnapshot(REPO, "snap2") + .setWaitForCompletion(true) + .setIndices(".ds-ds-*") + .get(); + + assertEquals(RestStatus.OK, restoreSnapshotResponse.status()); + + GetDataStreamAction.Request getRequest = new GetDataStreamAction.Request("ds"); + expectThrows(ResourceNotFoundException.class, () -> client.admin().indices().getDataStreams(getRequest).actionGet()); + } +} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index a297a921731..92b88c85d49 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -24,6 +24,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; @@ -34,6 +35,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; @@ -52,6 +54,7 @@ import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.State; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.MetadataIndexStateService; @@ -78,6 +81,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.DataStreamIT; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.ingest.IngestTestPlugin; @@ -132,6 +136,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFa import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertRequestBuilderThrows; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; @@ -2408,6 +2413,61 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas } } + public void testDeleteDataStreamDuringSnapshot() throws Exception { + Client client = client(); + + logger.info("--> creating repository"); + + assertAcked(client.admin().cluster().preparePutRepository("test-repo") + .setType("mock").setSettings(Settings.builder() + .put("location", randomRepoPath()) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put("block_on_data", true))); + + + String dataStream = "test-ds"; + DataStreamIT.createIndexTemplate("dst", "@timestamp", dataStream); + + logger.info("--> indexing some data"); + for (int i = 0; i < 100; i++) { + client.prepareIndex(dataStream, "_doc") + .setOpType(DocWriteRequest.OpType.CREATE) + .setId(Integer.toString(i)) + .setSource(Collections.singletonMap("k", "v")) + .execute().actionGet(); + } + refresh(); + + assertThat(client.prepareSearch(dataStream).setSize(0).get().getHits().getTotalHits().value, equalTo(100L)); + + logger.info("--> snapshot"); + ActionFuture future = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + .setIndices(dataStream).setWaitForCompletion(true).setPartial(false).execute(); + logger.info("--> wait for block to kick in"); + waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1)); + + // non-partial snapshots do not allow delete operations on data streams where snapshot has not been completed + try { + logger.info("--> delete index while non-partial snapshot is running"); + client.admin().indices().deleteDataStream(new DeleteDataStreamAction.Request(dataStream)).actionGet(); + fail("Expected deleting index to fail during snapshot"); + } catch (SnapshotInProgressException e) { + assertThat(e.getMessage(), containsString("Cannot delete data streams that are being snapshotted: [test-ds")); + } finally { + logger.info("--> unblock all data nodes"); + unblockAllDataNodes("test-repo"); + } + logger.info("--> waiting for snapshot to finish"); + CreateSnapshotResponse createSnapshotResponse = future.get(); + + logger.info("Snapshot successfully completed"); + SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo(); + assertThat(snapshotInfo.state(), equalTo((SnapshotState.SUCCESS))); + assertThat(snapshotInfo.dataStreams(), contains(dataStream)); + assertThat(snapshotInfo.indices(), contains(DataStream.getDefaultBackingIndexName(dataStream, 1))); + } + public void testCloseOrDeleteIndexDuringSnapshot() throws Exception { disableRepoConsistencyCheck("This test intentionally leaves a broken repository"); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index fa0b4155656..4e425ec6b5a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -255,7 +255,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction indices = snapshotsToIndices.getOrDefault(snapshotId, Collections.emptyList()); CollectionUtil.timSort(indices); - snapshotInfos.add(new SnapshotInfo(snapshotId, indices, repositoryData.getSnapshotState(snapshotId))); + snapshotInfos.add(new SnapshotInfo(snapshotId, indices, Collections.emptyList(), repositoryData.getSnapshotState(snapshotId))); } CollectionUtil.timSort(snapshotInfos); return unmodifiableList(snapshotInfos); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java index d95d8402408..93e572c97e8 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java @@ -46,6 +46,8 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; +import org.elasticsearch.snapshots.SnapshotInProgressException; +import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -173,6 +175,13 @@ public class DeleteDataStreamAction extends ActionType { } throw new ResourceNotFoundException("data_streams matching [" + request.name + "] not found"); } + + Set snapshottingDataStreams = SnapshotsService.snapshottingDataStreams(currentState, dataStreams); + if (snapshottingDataStreams.isEmpty() == false) { + throw new SnapshotInProgressException("Cannot delete data streams that are being snapshotted: " + snapshottingDataStreams + + ". Try again after snapshot finishes or cancel the currently running snapshot."); + } + List dataStreamsToRemove = new ArrayList<>(); Set backingIndicesToRemove = new HashSet<>(); for (String dataStreamName : dataStreams) { diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 7e06577ac58..d19e6f96e8b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -52,6 +52,8 @@ import java.util.stream.Collectors; import static org.elasticsearch.snapshots.SnapshotInfo.METADATA_FIELD_INTRODUCED; +import static org.elasticsearch.snapshots.SnapshotInfo.DATA_STREAMS_IN_SNAPSHOT; + /** * Meta data about snapshots that are currently executing */ @@ -92,6 +94,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement private final boolean partial; private final ImmutableOpenMap shards; private final List indices; + private final List dataStreams; private final ImmutableOpenMap> waitingIndices; private final long startTime; private final long repositoryStateId; @@ -101,13 +104,15 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement @Nullable private final String failure; public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, - long startTime, long repositoryStateId, ImmutableOpenMap shards, - String failure, Map userMetadata, Version version) { + List dataStreams, long startTime, long repositoryStateId, + ImmutableOpenMap shards, String failure, Map userMetadata, + Version version) { this.state = state; this.snapshot = snapshot; this.includeGlobalState = includeGlobalState; this.partial = partial; this.indices = indices; + this.dataStreams = dataStreams; this.startTime = startTime; if (shards == null) { this.shards = ImmutableOpenMap.of(); @@ -136,26 +141,33 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement return true; } + public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, + List dataStreams, long startTime, long repositoryStateId, + ImmutableOpenMap shards, Map userMetadata, Version version) { + this(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards, + null, userMetadata, version); + } + public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, long startTime, long repositoryStateId, ImmutableOpenMap shards, Map userMetadata, Version version) { - this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, userMetadata, - version); + this(snapshot, includeGlobalState, partial, state, indices, Collections.emptyList(), startTime, repositoryStateId, shards, + null, userMetadata, version); } public Entry(Entry entry, State state, List indices, long repositoryStateId, ImmutableOpenMap shards, Version version, String failure) { - this(entry.snapshot, entry.includeGlobalState, entry.partial, state, indices, entry.startTime, repositoryStateId, shards, - failure, entry.userMetadata, version); + this(entry.snapshot, entry.includeGlobalState, entry.partial, state, indices, entry.dataStreams, entry.startTime, + repositoryStateId, shards, failure, entry.userMetadata, version); } public Entry(Entry entry, State state, ImmutableOpenMap shards) { - this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, + this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.dataStreams, entry.startTime, entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.version); } public Entry(Entry entry, State state, ImmutableOpenMap shards, String failure) { - this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, + this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.dataStreams, entry.startTime, entry.repositoryStateId, shards, failure, entry.userMetadata, entry.version); } @@ -204,6 +216,10 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement return startTime; } + public List dataStreams() { + return dataStreams; + } + @Override public long repositoryStateId() { return repositoryStateId; @@ -293,6 +309,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement } } builder.endArray(); + builder.array(DATA_STREAMS, dataStreams.toArray(new String[0])); builder.endObject(); return builder; } @@ -552,11 +569,19 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement } else { version = SnapshotsService.OLD_SNAPSHOT_FORMAT; } + + List dataStreams; + if (in.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) { + dataStreams = in.readStringList(); + } else { + dataStreams = Collections.emptyList(); + } entries[i] = new Entry(snapshot, includeGlobalState, partial, state, Collections.unmodifiableList(indexBuilder), + dataStreams, startTime, repositoryStateId, builder.build(), @@ -603,6 +628,9 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement } else if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { out.writeBoolean(SnapshotsService.useShardGenerations(entry.version)); } + if (out.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) { + out.writeStringCollection(entry.dataStreams); + } } } @@ -614,6 +642,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement private static final String PARTIAL = "partial"; private static final String STATE = "state"; private static final String INDICES = "indices"; + private static final String DATA_STREAMS = "data_streams"; private static final String START_TIME_MILLIS = "start_time_millis"; private static final String START_TIME = "start_time"; private static final String REPOSITORY_STATE_ID = "repository_state_id"; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index b3666608c92..82140986d3e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -38,6 +38,7 @@ import java.util.Objects; public final class DataStream extends AbstractDiffable implements ToXContentObject { public static final String BACKING_INDEX_PREFIX = ".ds-"; + public static final String DATA_STREAMS_METADATA_FIELD = "data-streams"; private final String name; private final String timeStampField; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java index 4f4ff4ae994..deb885ceac2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java @@ -121,6 +121,15 @@ public class IndexNameExpressionResolver { return concreteIndexNames(context, indexExpressions); } + public List dataStreamNames(ClusterState state, IndicesOptions options, String... indexExpressions) { + Context context = new Context(state, options, false, false, true); + return Arrays.stream(indexExpressions) + .flatMap(expression -> WildcardExpressionResolver.matches(context, state.metadata(), expression).values().stream()) + .filter(i -> i.getType() == IndexAbstraction.Type.DATA_STREAM) + .map(IndexAbstraction::getName) + .collect(Collectors.toList()); + } + /** * Translates the provided index expression into actual concrete indices, properly deduplicated. * diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index a2f7ee4158c..701401c8515 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1040,6 +1040,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp executor.execute(ActionRunnable.supply(allMetaListener, () -> { final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId, indices.stream().map(IndexId::getName).collect(Collectors.toList()), + new ArrayList<>(clusterMetadata.dataStreams().keySet()), startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures, includeGlobalState, userMetadata); snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false); diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index a6cade46068..ebc57960345 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -29,6 +29,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; @@ -41,6 +42,7 @@ import org.elasticsearch.cluster.RestoreInProgress.ShardRestoreStatus; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.AliasMetadata; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; import org.elasticsearch.cluster.metadata.Metadata; @@ -75,6 +77,7 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -82,6 +85,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -201,12 +205,38 @@ public class RestoreService implements ClusterStateApplier { // Make sure that we can restore from this snapshot validateSnapshotRestorable(repositoryName, snapshotInfo); + Metadata globalMetadata = null; // Resolve the indices from the snapshot that need to be restored - final List indicesInSnapshot = filterIndices(snapshotInfo.indices(), request.indices(), request.indicesOptions()); + Map dataStreams; + List requestIndices = new ArrayList<>(Arrays.asList(request.indices())); + + List requestedDataStreams = filterIndices(snapshotInfo.dataStreams(), requestIndices.toArray(new String[0]), + IndicesOptions.fromOptions(true, true, true, true)); + if (requestedDataStreams.isEmpty()) { + dataStreams = new HashMap<>(); + } else { + globalMetadata = repository.getSnapshotGlobalMetadata(snapshotId); + dataStreams = globalMetadata.dataStreams(); + if (request.includeGlobalState() == false) { + dataStreams.keySet().retainAll(requestedDataStreams); + } + } + requestIndices.removeAll(dataStreams.keySet()); + Set dataStreamIndices = dataStreams.values().stream() + .flatMap(ds -> ds.getIndices().stream()) + .map(Index::getName) + .collect(Collectors.toSet()); + requestIndices.addAll(dataStreamIndices); + + final List indicesInSnapshot = filterIndices(snapshotInfo.indices(), requestIndices.toArray(new String[0]), + request.indicesOptions()); final Metadata.Builder metadataBuilder; if (request.includeGlobalState()) { - metadataBuilder = Metadata.builder(repository.getSnapshotGlobalMetadata(snapshotId)); + if (globalMetadata == null) { + globalMetadata = repository.getSnapshotGlobalMetadata(snapshotId); + } + metadataBuilder = Metadata.builder(globalMetadata); } else { metadataBuilder = Metadata.builder(); } @@ -220,7 +250,7 @@ public class RestoreService implements ClusterStateApplier { // Apply renaming on index names, returning a map of names where // the key is the renamed index and the value is the original name - final Map indices = renamedIndices(request, indicesInSnapshot); + final Map indices = renamedIndices(request, indicesInSnapshot, dataStreamIndices); // Now we can start the actual restore process by adding shards to be recovered in the cluster state // and updating cluster metadata (global and index) as needed @@ -389,6 +419,12 @@ public class RestoreService implements ClusterStateApplier { checkAliasNameConflicts(indices, aliases); + Map updatedDataStreams = new HashMap<>(currentState.metadata().dataStreams()); + updatedDataStreams.putAll(dataStreams.values().stream() + .map(ds -> updateDataStream(ds, mdBuilder, request)) + .collect(Collectors.toMap(DataStream::getName, Function.identity()))); + mdBuilder.dataStreams(updatedDataStreams); + // Restore global state if needed if (request.includeGlobalState()) { if (metadata.persistentSettings() != null) { @@ -560,6 +596,18 @@ public class RestoreService implements ClusterStateApplier { } } + //visible for testing + static DataStream updateDataStream(DataStream dataStream, Metadata.Builder metadata, RestoreSnapshotRequest request) { + String dataStreamName = dataStream.getName(); + if (request.renamePattern() != null && request.renameReplacement() != null) { + dataStreamName = dataStreamName.replaceAll(request.renamePattern(), request.renameReplacement()); + } + List updatedIndices = dataStream.getIndices().stream() + .map(i -> metadata.get(renameIndex(i.getName(), request, true)).getIndex()) + .collect(Collectors.toList()); + return new DataStream(dataStreamName, dataStream.getTimeStampField(), updatedIndices, dataStream.getGeneration()); + } + public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInProgress oldRestore, Set deletedIndices) { boolean changesMade = false; RestoreInProgress.Builder builder = new RestoreInProgress.Builder(); @@ -831,13 +879,11 @@ public class RestoreService implements ClusterStateApplier { return failedShards; } - private static Map renamedIndices(RestoreSnapshotRequest request, List filteredIndices) { + private static Map renamedIndices(RestoreSnapshotRequest request, List filteredIndices, + Set dataStreamIndices) { Map renamedIndices = new HashMap<>(); for (String index : filteredIndices) { - String renamedIndex = index; - if (request.renameReplacement() != null && request.renamePattern() != null) { - renamedIndex = index.replaceAll(request.renamePattern(), request.renameReplacement()); - } + String renamedIndex = renameIndex(index, request, dataStreamIndices.contains(index)); String previousIndex = renamedIndices.put(renamedIndex, index); if (previousIndex != null) { throw new SnapshotRestoreException(request.repository(), request.snapshot(), @@ -847,6 +893,21 @@ public class RestoreService implements ClusterStateApplier { return Collections.unmodifiableMap(renamedIndices); } + private static String renameIndex(String index, RestoreSnapshotRequest request, boolean partOfDataStream) { + String renamedIndex = index; + if (request.renameReplacement() != null && request.renamePattern() != null) { + partOfDataStream = partOfDataStream && index.startsWith(DataStream.BACKING_INDEX_PREFIX); + if (partOfDataStream) { + index = index.substring(DataStream.BACKING_INDEX_PREFIX.length()); + } + renamedIndex = index.replaceAll(request.renamePattern(), request.renameReplacement()); + if (partOfDataStream) { + renamedIndex = DataStream.BACKING_INDEX_PREFIX + renamedIndex; + } + } + return renamedIndex; + } + /** * Checks that snapshots can be restored and have compatible version * diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java index 8ad02784089..35c7177b7db 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java @@ -53,6 +53,8 @@ import java.util.stream.Collectors; */ public final class SnapshotInfo implements Comparable, ToXContent, Writeable { + public static final Version DATA_STREAMS_IN_SNAPSHOT = Version.V_7_9_0; + public static final String CONTEXT_MODE_PARAM = "context_mode"; public static final String CONTEXT_MODE_SNAPSHOT = "SNAPSHOT"; public static final Version METADATA_FIELD_INTRODUCED = Version.V_7_3_0; @@ -60,6 +62,7 @@ public final class SnapshotInfo implements Comparable, ToXContent, private static final String SNAPSHOT = "snapshot"; private static final String UUID = "uuid"; private static final String INDICES = "indices"; + private static final String DATA_STREAMS = "data_streams"; private static final String STATE = "state"; private static final String REASON = "reason"; private static final String START_TIME = "start_time"; @@ -92,6 +95,7 @@ public final class SnapshotInfo implements Comparable, ToXContent, private String state = null; private String reason = null; private List indices = null; + private List dataStreams = null; private long startTime = 0L; private long endTime = 0L; private ShardStatsBuilder shardStatsBuilder = null; @@ -120,6 +124,10 @@ public final class SnapshotInfo implements Comparable, ToXContent, this.indices = indices; } + private void setDataStreams(List dataStreams) { + this.dataStreams = dataStreams; + } + private void setStartTime(long startTime) { this.startTime = startTime; } @@ -155,6 +163,10 @@ public final class SnapshotInfo implements Comparable, ToXContent, indices = Collections.emptyList(); } + if (dataStreams == null) { + dataStreams = Collections.emptyList(); + } + SnapshotState snapshotState = state == null ? null : SnapshotState.valueOf(state); Version version = this.version == -1 ? Version.CURRENT : Version.fromId(this.version); @@ -165,7 +177,7 @@ public final class SnapshotInfo implements Comparable, ToXContent, shardFailures = new ArrayList<>(); } - return new SnapshotInfo(snapshotId, indices, snapshotState, reason, version, startTime, endTime, + return new SnapshotInfo(snapshotId, indices, dataStreams, snapshotState, reason, version, startTime, endTime, totalShards, successfulShards, shardFailures, includeGlobalState, userMetadata); } } @@ -203,6 +215,7 @@ public final class SnapshotInfo implements Comparable, ToXContent, SNAPSHOT_INFO_PARSER.declareString(SnapshotInfoBuilder::setState, new ParseField(STATE)); SNAPSHOT_INFO_PARSER.declareString(SnapshotInfoBuilder::setReason, new ParseField(REASON)); SNAPSHOT_INFO_PARSER.declareStringArray(SnapshotInfoBuilder::setIndices, new ParseField(INDICES)); + SNAPSHOT_INFO_PARSER.declareStringArray(SnapshotInfoBuilder::setDataStreams, new ParseField(DATA_STREAMS)); SNAPSHOT_INFO_PARSER.declareLong(SnapshotInfoBuilder::setStartTime, new ParseField(START_TIME_IN_MILLIS)); SNAPSHOT_INFO_PARSER.declareLong(SnapshotInfoBuilder::setEndTime, new ParseField(END_TIME_IN_MILLIS)); SNAPSHOT_INFO_PARSER.declareObject(SnapshotInfoBuilder::setShardStatsBuilder, SHARD_STATS_PARSER, new ParseField(SHARDS)); @@ -226,6 +239,8 @@ public final class SnapshotInfo implements Comparable, ToXContent, private final List indices; + private final List dataStreams; + private final long startTime; private final long endTime; @@ -245,34 +260,33 @@ public final class SnapshotInfo implements Comparable, ToXContent, private final List shardFailures; - public SnapshotInfo(SnapshotId snapshotId, List indices, SnapshotState state) { - this(snapshotId, indices, state, null, null, 0L, 0L, 0, 0, - Collections.emptyList(), null, null); + public SnapshotInfo(SnapshotId snapshotId, List indices, List dataStreams, SnapshotState state) { + this(snapshotId, indices, dataStreams, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null); } - public SnapshotInfo(SnapshotId snapshotId, List indices, SnapshotState state, Version version) { - this(snapshotId, indices, state, null, version, 0L, 0L, 0, 0, - Collections.emptyList(), null, null); + public SnapshotInfo(SnapshotId snapshotId, List indices, List dataStreams, SnapshotState state, Version version) { + this(snapshotId, indices, dataStreams, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null, null); } public SnapshotInfo(SnapshotsInProgress.Entry entry) { this(entry.snapshot().getSnapshotId(), - entry.indices().stream().map(IndexId::getName).collect(Collectors.toList()), SnapshotState.IN_PROGRESS, null, Version.CURRENT, - entry.startTime(), 0L, 0, 0, Collections.emptyList(), entry.includeGlobalState(), entry.userMetadata()); + entry.indices().stream().map(IndexId::getName).collect(Collectors.toList()), entry.dataStreams(), SnapshotState.IN_PROGRESS, + null, Version.CURRENT, entry.startTime(), 0L, 0, 0, Collections.emptyList(), entry.includeGlobalState(), entry.userMetadata()); } - public SnapshotInfo(SnapshotId snapshotId, List indices, long startTime, String reason, long endTime, - int totalShards, List shardFailures, Boolean includeGlobalState, + public SnapshotInfo(SnapshotId snapshotId, List indices, List dataStreams, long startTime, String reason, + long endTime, int totalShards, List shardFailures, Boolean includeGlobalState, Map userMetadata) { - this(snapshotId, indices, snapshotState(reason, shardFailures), reason, Version.CURRENT, + this(snapshotId, indices, dataStreams, snapshotState(reason, shardFailures), reason, Version.CURRENT, startTime, endTime, totalShards, totalShards - shardFailures.size(), shardFailures, includeGlobalState, userMetadata); } - private SnapshotInfo(SnapshotId snapshotId, List indices, SnapshotState state, String reason, Version version, - long startTime, long endTime, int totalShards, int successfulShards, List shardFailures, - Boolean includeGlobalState, Map userMetadata) { + private SnapshotInfo(SnapshotId snapshotId, List indices, List dataStreams, SnapshotState state, String reason, + Version version, long startTime, long endTime, int totalShards, int successfulShards, + List shardFailures, Boolean includeGlobalState, Map userMetadata) { this.snapshotId = Objects.requireNonNull(snapshotId); this.indices = Collections.unmodifiableList(Objects.requireNonNull(indices)); + this.dataStreams = Collections.unmodifiableList(Objects.requireNonNull(dataStreams)); this.state = state; this.reason = reason; this.version = version; @@ -321,6 +335,11 @@ public final class SnapshotInfo implements Comparable, ToXContent, } else { userMetadata = null; } + if (in.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) { + dataStreams = in.readStringList(); + } else { + dataStreams = Collections.emptyList(); + } } /** @@ -328,7 +347,7 @@ public final class SnapshotInfo implements Comparable, ToXContent, * all information stripped out except the snapshot id, state, and indices. */ public SnapshotInfo basic() { - return new SnapshotInfo(snapshotId, indices, state); + return new SnapshotInfo(snapshotId, indices, Collections.emptyList(), state); } /** @@ -369,6 +388,13 @@ public final class SnapshotInfo implements Comparable, ToXContent, return indices; } + /** + * @return list of data streams that were included in this snapshot. + */ + public List dataStreams(){ + return dataStreams; + } + /** * Returns time when snapshot started; a value of {@code 0L} will be returned if * {@link #state()} returns {@code null}. @@ -514,6 +540,11 @@ public final class SnapshotInfo implements Comparable, ToXContent, builder.value(index); } builder.endArray(); + builder.startArray(DATA_STREAMS); + for (String dataStream : dataStreams) { + builder.value(dataStream); + } + builder.endArray(); if (includeGlobalState != null) { builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState); } @@ -564,6 +595,11 @@ public final class SnapshotInfo implements Comparable, ToXContent, builder.value(index); } builder.endArray(); + builder.startArray(DATA_STREAMS); + for (String dataStream : dataStreams) { + builder.value(dataStream); + } + builder.endArray(); builder.field(STATE, state); if (reason != null) { builder.field(REASON, reason); @@ -597,6 +633,7 @@ public final class SnapshotInfo implements Comparable, ToXContent, SnapshotState state = SnapshotState.IN_PROGRESS; String reason = null; List indices = Collections.emptyList(); + List dataStreams = Collections.emptyList(); long startTime = 0; long endTime = 0; int totalShards = 0; @@ -641,7 +678,12 @@ public final class SnapshotInfo implements Comparable, ToXContent, includeGlobalState = parser.booleanValue(); } } else if (token == XContentParser.Token.START_ARRAY) { - if (INDICES.equals(currentFieldName)) { + if (DATA_STREAMS.equals(currentFieldName)){ + dataStreams = new ArrayList<>(); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + dataStreams.add(parser.text()); + } + } else if (INDICES.equals(currentFieldName)) { ArrayList indicesArray = new ArrayList<>(); while (parser.nextToken() != XContentParser.Token.END_ARRAY) { indicesArray.add(parser.text()); @@ -677,6 +719,7 @@ public final class SnapshotInfo implements Comparable, ToXContent, } return new SnapshotInfo(new SnapshotId(name, uuid), indices, + dataStreams, state, reason, version, @@ -722,6 +765,9 @@ public final class SnapshotInfo implements Comparable, ToXContent, } if (out.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) { out.writeMap(userMetadata); + if(out.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)){ + out.writeStringCollection(dataStreams); + } } } @@ -750,6 +796,7 @@ public final class SnapshotInfo implements Comparable, ToXContent, state == that.state && Objects.equals(reason, that.reason) && Objects.equals(indices, that.indices) && + Objects.equals(dataStreams, that.dataStreams) && Objects.equals(includeGlobalState, that.includeGlobalState) && Objects.equals(version, that.version) && Objects.equals(shardFailures, that.shardFailures) && @@ -759,7 +806,7 @@ public final class SnapshotInfo implements Comparable, ToXContent, @Override public int hashCode() { - return Objects.hash(snapshotId, state, reason, indices, startTime, endTime, + return Objects.hash(snapshotId, state, reason, indices, dataStreams, startTime, endTime, totalShards, successfulShards, includeGlobalState, version, shardFailures, userMetadata); } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 343bbff4afc..177505eabee 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -44,6 +44,7 @@ import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.elasticsearch.cluster.SnapshotsInProgress.ShardState; import org.elasticsearch.cluster.SnapshotsInProgress.State; import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; @@ -207,13 +208,23 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus } // Store newSnapshot here to be processed in clusterStateProcessed indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, - request.indicesOptions(), request.indices())); + request.indicesOptions(), true, request.indices())); + + Map allDataStreams = currentState.metadata().dataStreams(); + List dataStreams; + if (request.includeGlobalState()) { + dataStreams = new ArrayList<>(allDataStreams.keySet()); + } else { + dataStreams = indexNameExpressionResolver.dataStreamNames(currentState, request.indicesOptions(), request.indices()); + } + logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); newSnapshot = new SnapshotsInProgress.Entry( new Snapshot(repositoryName, snapshotId), request.includeGlobalState(), request.partial(), State.INIT, Collections.emptyList(), // We'll resolve the list of indices when moving to the STARTED state in #beginSnapshot + dataStreams, threadPool.absoluteTimeInMillis(), RepositoryData.UNKNOWN_REPO_GEN, null, @@ -508,6 +519,18 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus builder.put(indexMetadata, false); } } + + Map dataStreams = new HashMap<>(); + for (String dataStreamName : snapshot.dataStreams()) { + DataStream dataStream = metadata.dataStreams().get(dataStreamName); + if (dataStream == null) { + assert snapshot.partial() : "Data stream [" + dataStreamName + + "] was deleted during a snapshot but snapshot was not partial."; + } else { + dataStreams.put(dataStreamName, dataStream); + } + } + builder.dataStreams(dataStreams); metadata = builder.build(); } return metadata; @@ -1475,6 +1498,24 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus return builder.build(); } + /** + * Returns the data streams that are currently being snapshotted (with partial == false) and that are contained in the + * indices-to-check set. + */ + public static Set snapshottingDataStreams(final ClusterState currentState, final Set dataStreamsToCheck) { + final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + if (snapshots == null) { + return emptySet(); + } + + Map dataStreams = currentState.metadata().dataStreams(); + return snapshots.entries().stream() + .filter(e -> e.partial() == false) + .flatMap(e -> e.dataStreams().stream()) + .filter(ds -> dataStreams.containsKey(ds) && dataStreamsToCheck.contains(ds)) + .collect(Collectors.toSet()); + } + /** * Returns the indices that are currently being snapshotted (with partial == false) and that are contained in the indices-to-check set. */ diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java index 5c4bc2ad2d7..520f1dcce94 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java @@ -51,6 +51,10 @@ public class CreateSnapshotResponseTests extends AbstractXContentTestCase indices = new ArrayList<>(); indices.add("test0"); indices.add("test1"); + + List dataStreams = new ArrayList<>(); + dataStreams.add("test0"); + dataStreams.add("test1"); String reason = "reason"; long startTime = System.currentTimeMillis(); long endTime = startTime + 10000; @@ -66,7 +70,7 @@ public class CreateSnapshotResponseTests extends AbstractXContentTestCase shardFailures = Collections.singletonList(new SnapshotShardFailure("node-id", shardId, "reason")); - snapshots.add(new SnapshotInfo(snapshotId, Arrays.asList("indice1", "indice2"), System.currentTimeMillis(), reason, - System.currentTimeMillis(), randomIntBetween(2, 3), shardFailures, randomBoolean(), + snapshots.add(new SnapshotInfo(snapshotId, Arrays.asList("index1", "index2"), Collections.singletonList("ds"), + System.currentTimeMillis(), reason, System.currentTimeMillis(), randomIntBetween(2, 3), shardFailures, randomBoolean(), SnapshotInfoTests.randomUserMetadata())); } return new GetSnapshotsResponse(snapshots); diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java index ddf6aed6425..ff35019baf8 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction.Request; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; @@ -32,9 +33,14 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; +import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInProgressException; import org.elasticsearch.test.AbstractWireSerializingTestCase; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -85,6 +91,32 @@ public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCas } } + public void testDeleteSnapshottingDataStream() { + final String dataStreamName = "my-data-stream1"; + final String dataStreamName2 = "my-data-stream2"; + final List otherIndices = randomSubsetOf(Arrays.asList("foo", "bar", "baz")); + + ClusterState cs = getClusterStateWithDataStreams(Arrays.asList(new Tuple<>(dataStreamName, 2), new Tuple<>(dataStreamName2, 2)), + otherIndices); + SnapshotsInProgress snapshotsInProgress = new SnapshotsInProgress(Arrays.asList( + createEntry(dataStreamName, "repo1", false), + createEntry(dataStreamName2, "repo2", true))); + ClusterState snapshotCs = ClusterState.builder(cs).putCustom(SnapshotsInProgress.TYPE, snapshotsInProgress).build(); + + DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(dataStreamName); + SnapshotInProgressException e = expectThrows(SnapshotInProgressException.class, + () -> DeleteDataStreamAction.TransportAction.removeDataStream(getMetadataDeleteIndexService(), snapshotCs, req)); + + assertThat(e.getMessage(), equalTo("Cannot delete data streams that are being snapshotted: [my-data-stream1]. Try again after " + + "snapshot finishes or cancel the currently running snapshot.")); + } + + private SnapshotsInProgress.Entry createEntry(String dataStreamName, String repo, boolean partial) { + return new SnapshotsInProgress.Entry(new Snapshot(repo, new SnapshotId("", "")), false, partial, + SnapshotsInProgress.State.STARTED, Collections.emptyList(), Collections.singletonList(dataStreamName), 0, 1, null, null, null + , null); + } + public void testDeleteNonexistentDataStream() { final String dataStreamName = "my-data-stream"; ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java index 0314b6413e4..8d8be06587b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java @@ -1944,4 +1944,45 @@ public class IndexNameExpressionResolverTests extends ESTestCase { assertThat(result[1].getName(), equalTo(DataStream.getDefaultBackingIndexName(dataStream1, 2))); assertThat(result[2].getName(), equalTo("logs-foobarbaz-0")); } + + public void testDataStreamsNames() { + final String dataStream1 = "logs-foobar"; + final String dataStream2 = "other-foobar"; + IndexMetadata index1 = createBackingIndex(dataStream1, 1).build(); + IndexMetadata index2 = createBackingIndex(dataStream1, 2).build(); + IndexMetadata justAnIndex = IndexMetadata.builder("logs-foobarbaz-0") + .settings(ESTestCase.settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1) + .putAlias(new AliasMetadata.Builder("logs-foobarbaz")) + .build(); + + IndexMetadata index3 = createBackingIndex(dataStream2, 1).build(); + IndexMetadata index4 = createBackingIndex(dataStream2, 2).build(); + + ClusterState state = ClusterState.builder(new ClusterName("_name")) + .metadata(Metadata.builder() + .put(index1, false) + .put(index2, false) + .put(index3, false) + .put(index4, false) + .put(justAnIndex, false) + .put(new DataStream(dataStream1, "ts", Arrays.asList(index1.getIndex(), index2.getIndex()))) + .put(new DataStream(dataStream2, "ts", Arrays.asList(index3.getIndex(), index4.getIndex())))).build(); + + List names = indexNameExpressionResolver.dataStreamNames(state, IndicesOptions.lenientExpand(), "log*"); + assertEquals(Collections.singletonList(dataStream1), names); + + names = indexNameExpressionResolver.dataStreamNames(state, IndicesOptions.lenientExpand(), "other*"); + assertEquals(Collections.singletonList(dataStream2), names); + + names = indexNameExpressionResolver.dataStreamNames(state, IndicesOptions.lenientExpand(), "*foobar"); + assertThat(names, containsInAnyOrder(dataStream1, dataStream2)); + + names = indexNameExpressionResolver.dataStreamNames(state, IndicesOptions.lenientExpand(), "notmatched"); + assertThat(names, empty()); + + names = indexNameExpressionResolver.dataStreamNames(state, IndicesOptions.lenientExpand(), index3.getIndex().getName()); + assertThat(names, empty()); + } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java new file mode 100644 index 00000000000..5c45c5ea74a --- /dev/null +++ b/server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.index.Index; +import org.elasticsearch.test.ESTestCase; + +import java.util.Collections; +import java.util.List; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RestoreServiceTests extends ESTestCase { + + public void testUpdateDataStream() { + String dataStreamName = "data-stream-1"; + String backingIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); + List indices = Collections.singletonList(new Index(backingIndexName, "uuid")); + + DataStream dataStream = new DataStream(dataStreamName, "@timestamp", indices); + + Metadata.Builder metadata = mock(Metadata.Builder.class); + IndexMetadata indexMetadata = mock(IndexMetadata.class); + when(metadata.get(eq(backingIndexName))).thenReturn(indexMetadata); + Index updatedIndex = new Index(backingIndexName, "uuid2"); + when(indexMetadata.getIndex()).thenReturn(updatedIndex); + + RestoreSnapshotRequest request = new RestoreSnapshotRequest(); + + DataStream updateDataStream = RestoreService.updateDataStream(dataStream, metadata, request); + + assertEquals(dataStreamName, updateDataStream.getName()); + assertEquals(Collections.singletonList(updatedIndex), updateDataStream.getIndices()); + } + + public void testUpdateDataStreamRename() { + String dataStreamName = "data-stream-1"; + String renamedDataStreamName = "data-stream-2"; + String backingIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); + String renamedBackingIndexName = DataStream.getDefaultBackingIndexName(renamedDataStreamName, 1); + List indices = Collections.singletonList(new Index(backingIndexName, "uuid")); + + DataStream dataStream = new DataStream(dataStreamName, "@timestamp", indices); + + Metadata.Builder metadata = mock(Metadata.Builder.class); + IndexMetadata indexMetadata = mock(IndexMetadata.class); + when(metadata.get(eq(renamedBackingIndexName))).thenReturn(indexMetadata); + Index renamedIndex = new Index(renamedBackingIndexName, "uuid2"); + when(indexMetadata.getIndex()).thenReturn(renamedIndex); + + RestoreSnapshotRequest request = new RestoreSnapshotRequest().renamePattern("data-stream-1").renameReplacement("data-stream-2"); + + DataStream renamedDataStream = RestoreService.updateDataStream(dataStream, metadata, request); + + assertEquals(renamedDataStreamName, renamedDataStream.getName()); + assertEquals(Collections.singletonList(renamedIndex), renamedDataStream.getIndices()); + } + + public void testPrefixNotChanged() { + String dataStreamName = "ds-000001"; + String renamedDataStreamName = "ds2-000001"; + String backingIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); + String renamedBackingIndexName = DataStream.getDefaultBackingIndexName(renamedDataStreamName, 1); + List indices = Collections.singletonList(new Index(backingIndexName, "uuid")); + + DataStream dataStream = new DataStream(dataStreamName, "@timestamp", indices); + + Metadata.Builder metadata = mock(Metadata.Builder.class); + IndexMetadata indexMetadata = mock(IndexMetadata.class); + when(metadata.get(eq(renamedBackingIndexName))).thenReturn(indexMetadata); + Index renamedIndex = new Index(renamedBackingIndexName, "uuid2"); + when(indexMetadata.getIndex()).thenReturn(renamedIndex); + + RestoreSnapshotRequest request = new RestoreSnapshotRequest().renamePattern("ds-").renameReplacement("ds2-"); + + DataStream renamedDataStream = RestoreService.updateDataStream(dataStream, metadata, request); + + assertEquals(renamedDataStreamName, renamedDataStream.getName()); + assertEquals(Collections.singletonList(renamedIndex), renamedDataStream.getIndices()); + + request = new RestoreSnapshotRequest().renamePattern("ds-000001").renameReplacement("ds2-000001"); + + renamedDataStream = RestoreService.updateDataStream(dataStream, metadata, request); + + assertEquals(renamedDataStreamName, renamedDataStream.getName()); + assertEquals(Collections.singletonList(renamedIndex), renamedDataStream.getIndices()); + } +} diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoTests.java index 723ce4ae751..8c5eb8c695e 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoTests.java @@ -36,6 +36,8 @@ public class SnapshotInfoTests extends AbstractWireSerializingTestCase indices = Arrays.asList(randomArray(1, 10, String[]::new, () -> randomAlphaOfLengthBetween(2, 20))); + List dataStreams = Arrays.asList(randomArray(1, 10, String[]::new, () -> randomAlphaOfLengthBetween(2, 20))); + String reason = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 15); long startTime = randomNonNegativeLong(); @@ -57,7 +59,7 @@ public class SnapshotInfoTests extends AbstractWireSerializingTestCase userMetadata = randomUserMetadata(); - return new SnapshotInfo(snapshotId, indices, startTime, reason, endTime, totalShards, shardFailures, + return new SnapshotInfo(snapshotId, indices, dataStreams, startTime, reason, endTime, totalShards, shardFailures, includeGlobalState, userMetadata); } @@ -68,34 +70,34 @@ public class SnapshotInfoTests extends AbstractWireSerializingTestCase randomAlphaOfLength(5)), randomValueOtherThan(instance.snapshotId().getUUID(), () -> randomAlphaOfLength(5))); - return new SnapshotInfo(snapshotId, instance.indices(), instance.startTime(), instance.reason(), + return new SnapshotInfo(snapshotId, instance.indices(), instance.dataStreams(), instance.startTime(), instance.reason(), instance.endTime(), instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata()); case 1: int indicesSize = randomValueOtherThan(instance.indices().size(), () -> randomIntBetween(1, 10)); List indices = Arrays.asList(randomArray(indicesSize, indicesSize, String[]::new, () -> randomAlphaOfLengthBetween(2, 20))); - return new SnapshotInfo(instance.snapshotId(), indices, instance.startTime(), instance.reason(), + return new SnapshotInfo(instance.snapshotId(), indices, instance.dataStreams(), instance.startTime(), instance.reason(), instance.endTime(), instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata()); case 2: - return new SnapshotInfo(instance.snapshotId(), instance.indices(), + return new SnapshotInfo(instance.snapshotId(), instance.indices(), instance.dataStreams(), randomValueOtherThan(instance.startTime(), ESTestCase::randomNonNegativeLong), instance.reason(), instance.endTime(), instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata()); case 3: - return new SnapshotInfo(instance.snapshotId(), instance.indices(), instance.startTime(), + return new SnapshotInfo(instance.snapshotId(), instance.indices(), instance.dataStreams(), instance.startTime(), randomValueOtherThan(instance.reason(), () -> randomAlphaOfLengthBetween(5, 15)), instance.endTime(), instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata()); case 4: - return new SnapshotInfo(instance.snapshotId(), instance.indices(), instance.startTime(), instance.reason(), - randomValueOtherThan(instance.endTime(), ESTestCase::randomNonNegativeLong), instance.totalShards(), - instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata()); + return new SnapshotInfo(instance.snapshotId(), instance.indices(), instance.dataStreams(), + instance.startTime(), instance.reason(), randomValueOtherThan(instance.endTime(), ESTestCase::randomNonNegativeLong), + instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata()); case 5: int totalShards = randomValueOtherThan(instance.totalShards(), () -> randomIntBetween(0, 100)); int failedShards = randomIntBetween(0, totalShards); @@ -108,16 +110,23 @@ public class SnapshotInfoTests extends AbstractWireSerializingTestCase dataStreams = randomValueOtherThan(instance.dataStreams(), + () -> Arrays.asList(randomArray(1, 10, String[]::new, () -> randomAlphaOfLengthBetween(2, 20)))); + return new SnapshotInfo(instance.snapshotId(), instance.indices(), dataStreams, + instance.startTime(), instance.reason(), instance.endTime(), instance.totalShards(), instance.shardFailures(), + instance.includeGlobalState(), instance.userMetadata()); default: throw new IllegalArgumentException("invalid randomization case"); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java index 01b81cebb98..6a91856e0f0 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.test.AbstractDiffableWireSerializationTestCase; import org.elasticsearch.test.VersionUtils; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -66,6 +67,7 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); final List esIndices = indices.stream().map(i -> new Index(i.getName(), randomAlphaOfLength(10))).collect(Collectors.toList()); + List dataStreams = Arrays.asList(generateRandomStringArray(10, 10, false)); for (Index idx : esIndices) { int shardsCount = randomIntBetween(1, 10); for (int j = 0; j < shardsCount; j++) { @@ -77,7 +79,7 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS } } ImmutableOpenMap shards = builder.build(); - return new Entry(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, + return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards, SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random())); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 40449f6ec72..866803d8a87 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -172,11 +172,13 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit Client remoteClient = getRemoteClusterClient(); ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetadata(true).setNodes(true) .get(ccrSettings.getRecoveryActionTimeout()); - ImmutableOpenMap indicesMap = response.getState().metadata().indices(); + Metadata metadata = response.getState().metadata(); + ImmutableOpenMap indicesMap = metadata.indices(); ArrayList indices = new ArrayList<>(indicesMap.size()); indicesMap.keysIt().forEachRemaining(indices::add); - return new SnapshotInfo(snapshotId, indices, SnapshotState.SUCCESS, response.getState().getNodes().getMaxNodeVersion()); + return new SnapshotInfo(snapshotId, indices, new ArrayList<>(metadata.dataStreams().keySet()), SnapshotState.SUCCESS, + response.getState().getNodes().getMaxNodeVersion()); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionConfigurationTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionConfigurationTests.java index 9659ca0fe2f..fa79dd7d4ee 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionConfigurationTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionConfigurationTests.java @@ -249,6 +249,7 @@ public class SnapshotRetentionConfigurationTests extends ESTestCase { final int totalShards = between(1,20); SnapshotInfo snapInfo = new SnapshotInfo(new SnapshotId("snap-" + randomAlphaOfLength(3), "uuid"), Collections.singletonList("foo"), + Collections.singletonList("bar"), startTime, null, startTime + between(1, 10000), @@ -280,6 +281,7 @@ public class SnapshotRetentionConfigurationTests extends ESTestCase { assert failureCount == failures.size(); SnapshotInfo snapInfo = new SnapshotInfo(new SnapshotId("snap-fail-" + randomAlphaOfLength(3), "uuid-fail"), Collections.singletonList("foo-fail"), + Collections.singletonList("bar-fail"), startTime, "forced-failure", startTime + between(1, 10000), @@ -303,6 +305,7 @@ public class SnapshotRetentionConfigurationTests extends ESTestCase { assert failureCount == failures.size(); SnapshotInfo snapInfo = new SnapshotInfo(new SnapshotId("snap-fail-" + randomAlphaOfLength(3), "uuid-fail"), Collections.singletonList("foo-fail"), + Collections.singletonList("bar-fail"), startTime, null, startTime + between(1, 10000), diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleRestIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleRestIT.java index 3a07319b2ba..2ec37510273 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleRestIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleRestIT.java @@ -15,7 +15,10 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.indexlifecycle.RolloverAction; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; @@ -48,10 +51,11 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.createComposableTemplate; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex; import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem.CREATE_OPERATION; import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem.DELETE_OPERATION; import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryStore.SLM_HISTORY_INDEX_PREFIX; -import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -458,6 +462,48 @@ public class SnapshotLifecycleRestIT extends ESRestTestCase { } } + public void testDataStreams() throws Exception { + String dataStreamName = "ds-test"; + String repoId = "ds-repo"; + String policyName = "ds-policy"; + + String mapping = "{\n" + + " \"properties\": {\n" + + " \"@timestamp\": {\n" + + " \"type\": \"date\"\n" + + " }\n" + + " }\n" + + " }"; + Template template = new Template(null, new CompressedXContent(mapping), null); + createComposableTemplate(client(), "ds-template", dataStreamName, template); + + client().performRequest(new Request("PUT", "_data_stream/" + dataStreamName)); + + // Create a snapshot repo + initializeRepo(repoId); + + createSnapshotPolicy(policyName, "snap", NEVER_EXECUTE_CRON_SCHEDULE, repoId, dataStreamName, true); + + final String snapshotName = executePolicy(policyName); + + // Check that the executed snapshot is created + assertBusy(() -> { + try { + Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName)); + Map snapshotResponseMap; + try (InputStream is = response.getEntity().getContent()) { + snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); + } + assertThat(snapshotResponseMap.size(), greaterThan(0)); + final Map snapshot = extractSnapshot(snapshotResponseMap, snapshotName); + assertEquals(Collections.singletonList(dataStreamName), snapshot.get("data_streams")); + assertEquals(Collections.singletonList(DataStream.getDefaultBackingIndexName(dataStreamName, 1)), snapshot.get("indices")); + } catch (ResponseException e) { + fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity())); + } + }); + } + @SuppressWarnings("unchecked") public void testSLMXpackInfo() { Map features = (Map) getLocation("/_xpack").get("features"); @@ -546,10 +592,14 @@ public class SnapshotLifecycleRestIT extends ESRestTestCase { @SuppressWarnings("unchecked") private static Map extractMetadata(Map snapshotResponseMap, String snapshotPrefix) { + return (Map) extractSnapshot(snapshotResponseMap, snapshotPrefix).get("metadata"); + } + + @SuppressWarnings("unchecked") + private static Map extractSnapshot(Map snapshotResponseMap, String snapshotPrefix) { List> snapshots = ((List>) snapshotResponseMap.get("snapshots")); return snapshots.stream() .filter(snapshot -> ((String) snapshot.get("snapshot")).startsWith(snapshotPrefix)) - .map(snapshot -> (Map) snapshot.get("metadata")) .findFirst() .orElse(null); } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTaskTests.java index 6d280046a1e..4764316404e 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTaskTests.java @@ -242,6 +242,7 @@ public class SnapshotLifecycleTaskTests extends ESTestCase { new SnapshotInfo( new SnapshotId(req.snapshot(), "uuid"), Arrays.asList(req.indices()), + Collections.emptyList(), startTime, "snapshot started", endTime, diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index cd34f1f644b..664012007f7 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -124,33 +124,33 @@ public class SnapshotRetentionTaskTests extends ESTestCase { // Test when user metadata is null SnapshotInfo info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), - 0L, null, 1L, 1, Collections.emptyList(), true, null); + Collections.emptyList(),0L, null, 1L, 1, Collections.emptyList(), true, null); assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(false)); // Test when no retention is configured - info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), + info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), Collections.emptyList(), 0L, null, 1L, 1, Collections.emptyList(), true, null); assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyWithNoRetentionMap), equalTo(false)); // Test when user metadata is a map that doesn't contain "policy" - info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), + info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), Collections.emptyList(), 0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("foo", "bar")); assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(false)); // Test with an ancient snapshot that should be expunged - info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), + info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), Collections.emptyList(), 0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(true)); // Test with a snapshot that's start date is old enough to be expunged (but the finish date is not) long time = System.currentTimeMillis() - TimeValue.timeValueDays(30).millis() - 1; - info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), + info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), Collections.emptyList(), time, null, time + TimeValue.timeValueDays(4).millis(), 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(true)); // Test with a fresh snapshot that should not be expunged - info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), + info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), Collections.emptyList(), System.currentTimeMillis(), null, System.currentTimeMillis() + 1, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(false)); @@ -178,9 +178,9 @@ public class SnapshotRetentionTaskTests extends ESTestCase { ClusterServiceUtils.setState(clusterService, state); final SnapshotInfo eligibleSnapshot = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), - 0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); + Collections.emptyList(), 0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); final SnapshotInfo ineligibleSnapshot = new SnapshotInfo(new SnapshotId("name2", "uuid2"), Collections.singletonList("index"), - System.currentTimeMillis(), null, System.currentTimeMillis() + 1, 1, + Collections.emptyList(), System.currentTimeMillis(), null, System.currentTimeMillis() + 1, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); Set deleted = ConcurrentHashMap.newKeySet(); @@ -266,15 +266,15 @@ public class SnapshotRetentionTaskTests extends ESTestCase { ClusterServiceUtils.setState(clusterService, state); final SnapshotInfo snap1 = new SnapshotInfo(new SnapshotId("name1", "uuid1"), Collections.singletonList("index"), - 0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); + Collections.emptyList(), 0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); final SnapshotInfo snap2 = new SnapshotInfo(new SnapshotId("name2", "uuid2"), Collections.singletonList("index"), - 1L, null, 2L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); + Collections.emptyList(), 1L, null, 2L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); final SnapshotInfo snap3 = new SnapshotInfo(new SnapshotId("name3", "uuid3"), Collections.singletonList("index"), - 2L, null, 3L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); + Collections.emptyList(), 2L, null, 3L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); final SnapshotInfo snap4 = new SnapshotInfo(new SnapshotId("name4", "uuid4"), Collections.singletonList("index"), - 3L, null, 4L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); + Collections.emptyList(), 3L, null, 4L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); final SnapshotInfo snap5 = new SnapshotInfo(new SnapshotId("name5", "uuid5"), Collections.singletonList("index"), - 4L, null, 5L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); + Collections.emptyList(), 4L, null, 5L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId)); final Set deleted = ConcurrentHashMap.newKeySet(); // We're expected two deletions before they hit the "taken too long" test, so have a latch of 2