[7.x] Add support for snapshot and restore to data streams (#57675) (#58371)

* Add support for snapshot and restore to data streams (#57675)

This change adds support for including data streams in snapshots.
Names are provided in indices field (the same way as in other APIs), wildcards are supported.
If rename pattern is specified it renames both data streams and backing indices.
It also adds test to make sure SLM works correctly.

Closes #57127

Relates to #53100

* version fix

* compilation fix

* compilation fix

* remove unused changes

* compilation fix

* test fix
This commit is contained in:
Przemko Robakowski 2020-06-19 22:41:51 +02:00 committed by GitHub
parent bf8641aa15
commit a44dad9fbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 813 additions and 82 deletions

View File

@ -98,11 +98,11 @@ public class DataStreamIT extends ESIntegTestCase {
}
public void testBasicScenario() throws Exception {
createIndexTemplate("id1", "metrics-foo*", "@timestamp1");
createIndexTemplate("id1", "@timestamp1", "metrics-foo*");
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("metrics-foo");
client().admin().indices().createDataStream(createDataStreamRequest).get();
createIndexTemplate("id2", "metrics-bar*", "@timestamp2");
createIndexTemplate("id2", "@timestamp2", "metrics-bar*");
createDataStreamRequest = new CreateDataStreamAction.Request("metrics-bar");
client().admin().indices().createDataStream(createDataStreamRequest).get();
@ -186,7 +186,7 @@ public class DataStreamIT extends ESIntegTestCase {
}
public void testOtherWriteOps() throws Exception {
createIndexTemplate("id", "metrics-foobar*", "@timestamp1");
createIndexTemplate("id", "@timestamp1", "metrics-foobar*");
String dataStreamName = "metrics-foobar";
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
client().admin().indices().createDataStream(createDataStreamRequest).get();
@ -351,8 +351,8 @@ public class DataStreamIT extends ESIntegTestCase {
"[date, date_nanos], but instead found type [keyword]"));
}
public void testDataStreamsResolvability() throws Exception {
createIndexTemplate("id", "logs-*", "ts");
public void testResolvabilityOfDataStreamsInAPIs() throws Exception {
createIndexTemplate("id", "ts", "logs-*");
String dataStreamName = "logs-foobar";
CreateDataStreamAction.Request request = new CreateDataStreamAction.Request(dataStreamName);
client().admin().indices().createDataStream(request).actionGet();
@ -413,7 +413,7 @@ public class DataStreamIT extends ESIntegTestCase {
}
public void testCannotDeleteComposableTemplateUsedByDataStream() throws Exception {
createIndexTemplate("id", "metrics-foobar*", "@timestamp1");
createIndexTemplate("id", "@timestamp1", "metrics-foobar*");
String dataStreamName = "metrics-foobar-baz";
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
client().admin().indices().createDataStream(createDataStreamRequest).get();
@ -436,7 +436,7 @@ public class DataStreamIT extends ESIntegTestCase {
}
public void testAliasActionsFailOnDataStreams() throws Exception {
createIndexTemplate("id1", "metrics-foo*", "@timestamp1");
createIndexTemplate("id1", "@timestamp1", "metrics-foo*");
String dataStreamName = "metrics-foo";
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
client().admin().indices().createDataStream(createDataStreamRequest).get();
@ -449,7 +449,7 @@ public class DataStreamIT extends ESIntegTestCase {
}
public void testAliasActionsFailOnDataStreamBackingIndices() throws Exception {
createIndexTemplate("id1", "metrics-foo*", "@timestamp1");
createIndexTemplate("id1", "@timestamp1", "metrics-foo*");
String dataStreamName = "metrics-foo";
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
client().admin().indices().createDataStream(createDataStreamRequest).get();
@ -539,11 +539,11 @@ public class DataStreamIT extends ESIntegTestCase {
"] matches a data stream, specify the corresponding concrete indices instead."));
}
static void createIndexTemplate(String id, String pattern, String timestampFieldName) throws IOException {
public static void createIndexTemplate(String id, String timestampFieldName, String... pattern) throws IOException {
PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(id);
request.indexTemplate(
new ComposableIndexTemplate(
Collections.singletonList(pattern),
Arrays.asList(pattern),
new Template(null,
new CompressedXContent(MetadataCreateDataStreamServiceTests.generateMapping(timestampFieldName)), null),
null, null, null, null,

View File

@ -0,0 +1,218 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.snapshots;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.indices.DataStreamIT;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.junit.Before;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
private static final String DS_BACKING_INDEX_NAME = DataStream.getDefaultBackingIndexName("ds", 1);
private static final String DS2_BACKING_INDEX_NAME = DataStream.getDefaultBackingIndexName("ds2", 1);
private static final Map<String, Integer> DOCUMENT_SOURCE = Collections.singletonMap("@timestamp", 123);
public static final String REPO = "repo";
public static final String SNAPSHOT = "snap";
private Client client;
private String id;
@Before
public void setup() throws Exception {
client = client();
Path location = randomRepoPath();
createRepository(REPO, "fs", location);
DataStreamIT.createIndexTemplate("t1", "@timestamp", "ds", "other-ds");
CreateDataStreamAction.Request request = new CreateDataStreamAction.Request("ds");
AcknowledgedResponse response = client.admin().indices().createDataStream(request).get();
assertTrue(response.isAcknowledged());
request = new CreateDataStreamAction.Request("other-ds");
response = client.admin().indices().createDataStream(request).get();
assertTrue(response.isAcknowledged());
IndexResponse indexResponse = client.prepareIndex("ds", "_doc")
.setOpType(DocWriteRequest.OpType.CREATE)
.setSource(DOCUMENT_SOURCE)
.get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
id = indexResponse.getId();
}
public void testSnapshotAndRestore() throws Exception {
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster()
.prepareCreateSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices("ds")
.setIncludeGlobalState(false)
.get();
RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
assertEquals(RestStatus.OK, status);
GetSnapshotsResponse snapshot = client.admin().cluster().prepareGetSnapshots(REPO).setSnapshots(SNAPSHOT).get();
List<SnapshotInfo> snap = snapshot.getSnapshots();
assertEquals(1, snap.size());
assertEquals(Collections.singletonList(DS_BACKING_INDEX_NAME), snap.get(0).indices());
assertTrue(client.admin().indices().deleteDataStream(new DeleteDataStreamAction.Request("ds")).get().isAcknowledged());
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster()
.prepareRestoreSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices("ds")
.get();
assertEquals(1, restoreSnapshotResponse.getRestoreInfo().successfulShards());
assertEquals(DOCUMENT_SOURCE, client.prepareGet(DS_BACKING_INDEX_NAME, "_doc", id).get().getSourceAsMap());
SearchHit[] hits = client.prepareSearch("ds").get().getHits().getHits();
assertEquals(1, hits.length);
assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());
GetDataStreamAction.Response ds = client.admin().indices().getDataStreams(new GetDataStreamAction.Request("ds")).get();
assertEquals(1, ds.getDataStreams().size());
assertEquals(1, ds.getDataStreams().get(0).getIndices().size());
assertEquals(DS_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getIndices().get(0).getName());
}
public void testRename() throws Exception {
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster()
.prepareCreateSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices("ds")
.setIncludeGlobalState(false)
.get();
RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
assertEquals(RestStatus.OK, status);
expectThrows(SnapshotRestoreException.class, () -> client.admin().cluster()
.prepareRestoreSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices("ds")
.get());
client.admin().cluster()
.prepareRestoreSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices("ds")
.setRenamePattern("ds")
.setRenameReplacement("ds2")
.get();
GetDataStreamAction.Response ds = client.admin().indices().getDataStreams(new GetDataStreamAction.Request("ds2")).get();
assertEquals(1, ds.getDataStreams().size());
assertEquals(1, ds.getDataStreams().get(0).getIndices().size());
assertEquals(DS2_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getIndices().get(0).getName());
assertEquals(DOCUMENT_SOURCE, client.prepareSearch("ds2").get().getHits().getHits()[0].getSourceAsMap());
assertEquals(DOCUMENT_SOURCE, client.prepareGet(DS2_BACKING_INDEX_NAME, "_doc", id).get().getSourceAsMap());
}
public void testWildcards() throws Exception {
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster()
.prepareCreateSnapshot(REPO, "snap2")
.setWaitForCompletion(true)
.setIndices("d*")
.setIncludeGlobalState(false)
.get();
RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
assertEquals(RestStatus.OK, status);
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster()
.prepareRestoreSnapshot(REPO, "snap2")
.setWaitForCompletion(true)
.setIndices("d*")
.setRenamePattern("ds")
.setRenameReplacement("ds2")
.get();
assertEquals(RestStatus.OK, restoreSnapshotResponse.status());
GetDataStreamAction.Response ds = client.admin().indices().getDataStreams(new GetDataStreamAction.Request("ds2")).get();
assertEquals(1, ds.getDataStreams().size());
assertEquals(1, ds.getDataStreams().get(0).getIndices().size());
assertEquals(DS2_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getIndices().get(0).getName());
}
public void testDataStreamNotStoredWhenIndexRequested() throws Exception {
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster()
.prepareCreateSnapshot(REPO, "snap2")
.setWaitForCompletion(true)
.setIndices(DS_BACKING_INDEX_NAME)
.setIncludeGlobalState(false)
.get();
RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
assertEquals(RestStatus.OK, status);
expectThrows(Exception.class, () -> client.admin().cluster()
.prepareRestoreSnapshot(REPO, "snap2")
.setWaitForCompletion(true)
.setIndices("ds")
.get());
}
public void testDataStreamNotRestoredWhenIndexRequested() throws Exception {
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster()
.prepareCreateSnapshot(REPO, "snap2")
.setWaitForCompletion(true)
.setIndices("ds")
.setIncludeGlobalState(false)
.get();
RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
assertEquals(RestStatus.OK, status);
assertTrue(client.admin().indices().deleteDataStream(new DeleteDataStreamAction.Request("ds")).get().isAcknowledged());
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster()
.prepareRestoreSnapshot(REPO, "snap2")
.setWaitForCompletion(true)
.setIndices(".ds-ds-*")
.get();
assertEquals(RestStatus.OK, restoreSnapshotResponse.status());
GetDataStreamAction.Request getRequest = new GetDataStreamAction.Request("ds");
expectThrows(ResourceNotFoundException.class, () -> client.admin().indices().getDataStreams(getRequest).actionGet());
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
@ -34,6 +35,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
@ -52,6 +54,7 @@ import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
@ -78,6 +81,7 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.DataStreamIT;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.ingest.IngestTestPlugin;
@ -132,6 +136,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFa
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertRequestBuilderThrows;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
@ -2408,6 +2413,61 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
}
}
public void testDeleteDataStreamDuringSnapshot() throws Exception {
Client client = client();
logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put("block_on_data", true)));
String dataStream = "test-ds";
DataStreamIT.createIndexTemplate("dst", "@timestamp", dataStream);
logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
client.prepareIndex(dataStream, "_doc")
.setOpType(DocWriteRequest.OpType.CREATE)
.setId(Integer.toString(i))
.setSource(Collections.singletonMap("k", "v"))
.execute().actionGet();
}
refresh();
assertThat(client.prepareSearch(dataStream).setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
logger.info("--> snapshot");
ActionFuture<CreateSnapshotResponse> future = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setIndices(dataStream).setWaitForCompletion(true).setPartial(false).execute();
logger.info("--> wait for block to kick in");
waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1));
// non-partial snapshots do not allow delete operations on data streams where snapshot has not been completed
try {
logger.info("--> delete index while non-partial snapshot is running");
client.admin().indices().deleteDataStream(new DeleteDataStreamAction.Request(dataStream)).actionGet();
fail("Expected deleting index to fail during snapshot");
} catch (SnapshotInProgressException e) {
assertThat(e.getMessage(), containsString("Cannot delete data streams that are being snapshotted: [test-ds"));
} finally {
logger.info("--> unblock all data nodes");
unblockAllDataNodes("test-repo");
}
logger.info("--> waiting for snapshot to finish");
CreateSnapshotResponse createSnapshotResponse = future.get();
logger.info("Snapshot successfully completed");
SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo((SnapshotState.SUCCESS)));
assertThat(snapshotInfo.dataStreams(), contains(dataStream));
assertThat(snapshotInfo.indices(), contains(DataStream.getDefaultBackingIndexName(dataStream, 1)));
}
public void testCloseOrDeleteIndexDuringSnapshot() throws Exception {
disableRepoConsistencyCheck("This test intentionally leaves a broken repository");

View File

@ -255,7 +255,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
for (SnapshotId snapshotId : toResolve) {
final List<String> indices = snapshotsToIndices.getOrDefault(snapshotId, Collections.emptyList());
CollectionUtil.timSort(indices);
snapshotInfos.add(new SnapshotInfo(snapshotId, indices, repositoryData.getSnapshotState(snapshotId)));
snapshotInfos.add(new SnapshotInfo(snapshotId, indices, Collections.emptyList(), repositoryData.getSnapshotState(snapshotId)));
}
CollectionUtil.timSort(snapshotInfos);
return unmodifiableList(snapshotInfos);

View File

@ -46,6 +46,8 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -173,6 +175,13 @@ public class DeleteDataStreamAction extends ActionType<AcknowledgedResponse> {
}
throw new ResourceNotFoundException("data_streams matching [" + request.name + "] not found");
}
Set<String> snapshottingDataStreams = SnapshotsService.snapshottingDataStreams(currentState, dataStreams);
if (snapshottingDataStreams.isEmpty() == false) {
throw new SnapshotInProgressException("Cannot delete data streams that are being snapshotted: " + snapshottingDataStreams +
". Try again after snapshot finishes or cancel the currently running snapshot.");
}
List<String> dataStreamsToRemove = new ArrayList<>();
Set<Index> backingIndicesToRemove = new HashSet<>();
for (String dataStreamName : dataStreams) {

View File

@ -52,6 +52,8 @@ import java.util.stream.Collectors;
import static org.elasticsearch.snapshots.SnapshotInfo.METADATA_FIELD_INTRODUCED;
import static org.elasticsearch.snapshots.SnapshotInfo.DATA_STREAMS_IN_SNAPSHOT;
/**
* Meta data about snapshots that are currently executing
*/
@ -92,6 +94,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
private final boolean partial;
private final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
private final List<IndexId> indices;
private final List<String> dataStreams;
private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
private final long startTime;
private final long repositoryStateId;
@ -101,13 +104,15 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
@Nullable private final String failure;
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
String failure, Map<String, Object> userMetadata, Version version) {
List<String> dataStreams, long startTime, long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure, Map<String, Object> userMetadata,
Version version) {
this.state = state;
this.snapshot = snapshot;
this.includeGlobalState = includeGlobalState;
this.partial = partial;
this.indices = indices;
this.dataStreams = dataStreams;
this.startTime = startTime;
if (shards == null) {
this.shards = ImmutableOpenMap.of();
@ -136,26 +141,33 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
return true;
}
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
List<String> dataStreams, long startTime, long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, Map<String, Object> userMetadata, Version version) {
this(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards,
null, userMetadata, version);
}
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
Map<String, Object> userMetadata, Version version) {
this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, userMetadata,
version);
this(snapshot, includeGlobalState, partial, state, indices, Collections.emptyList(), startTime, repositoryStateId, shards,
null, userMetadata, version);
}
public Entry(Entry entry, State state, List<IndexId> indices, long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, Version version, String failure) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, indices, entry.startTime, repositoryStateId, shards,
failure, entry.userMetadata, version);
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, indices, entry.dataStreams, entry.startTime,
repositoryStateId, shards, failure, entry.userMetadata, version);
}
public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.dataStreams, entry.startTime,
entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.version);
}
public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.dataStreams, entry.startTime,
entry.repositoryStateId, shards, failure, entry.userMetadata, entry.version);
}
@ -204,6 +216,10 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
return startTime;
}
public List<String> dataStreams() {
return dataStreams;
}
@Override
public long repositoryStateId() {
return repositoryStateId;
@ -293,6 +309,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
}
}
builder.endArray();
builder.array(DATA_STREAMS, dataStreams.toArray(new String[0]));
builder.endObject();
return builder;
}
@ -552,11 +569,19 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
} else {
version = SnapshotsService.OLD_SNAPSHOT_FORMAT;
}
List<String> dataStreams;
if (in.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) {
dataStreams = in.readStringList();
} else {
dataStreams = Collections.emptyList();
}
entries[i] = new Entry(snapshot,
includeGlobalState,
partial,
state,
Collections.unmodifiableList(indexBuilder),
dataStreams,
startTime,
repositoryStateId,
builder.build(),
@ -603,6 +628,9 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
} else if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
out.writeBoolean(SnapshotsService.useShardGenerations(entry.version));
}
if (out.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) {
out.writeStringCollection(entry.dataStreams);
}
}
}
@ -614,6 +642,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
private static final String PARTIAL = "partial";
private static final String STATE = "state";
private static final String INDICES = "indices";
private static final String DATA_STREAMS = "data_streams";
private static final String START_TIME_MILLIS = "start_time_millis";
private static final String START_TIME = "start_time";
private static final String REPOSITORY_STATE_ID = "repository_state_id";

View File

@ -38,6 +38,7 @@ import java.util.Objects;
public final class DataStream extends AbstractDiffable<DataStream> implements ToXContentObject {
public static final String BACKING_INDEX_PREFIX = ".ds-";
public static final String DATA_STREAMS_METADATA_FIELD = "data-streams";
private final String name;
private final String timeStampField;

View File

@ -121,6 +121,15 @@ public class IndexNameExpressionResolver {
return concreteIndexNames(context, indexExpressions);
}
public List<String> dataStreamNames(ClusterState state, IndicesOptions options, String... indexExpressions) {
Context context = new Context(state, options, false, false, true);
return Arrays.stream(indexExpressions)
.flatMap(expression -> WildcardExpressionResolver.matches(context, state.metadata(), expression).values().stream())
.filter(i -> i.getType() == IndexAbstraction.Type.DATA_STREAM)
.map(IndexAbstraction::getName)
.collect(Collectors.toList());
}
/**
* Translates the provided index expression into actual concrete indices, properly deduplicated.
*

View File

@ -1040,6 +1040,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
executor.execute(ActionRunnable.supply(allMetaListener, () -> {
final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId,
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
new ArrayList<>(clusterMetadata.dataStreams().keySet()),
startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures,
includeGlobalState, userMetadata);
snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false);

View File

@ -29,6 +29,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
@ -41,6 +42,7 @@ import org.elasticsearch.cluster.RestoreInProgress.ShardRestoreStatus;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
@ -75,6 +77,7 @@ import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -82,6 +85,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -201,12 +205,38 @@ public class RestoreService implements ClusterStateApplier {
// Make sure that we can restore from this snapshot
validateSnapshotRestorable(repositoryName, snapshotInfo);
Metadata globalMetadata = null;
// Resolve the indices from the snapshot that need to be restored
final List<String> indicesInSnapshot = filterIndices(snapshotInfo.indices(), request.indices(), request.indicesOptions());
Map<String, DataStream> dataStreams;
List<String> requestIndices = new ArrayList<>(Arrays.asList(request.indices()));
List<String> requestedDataStreams = filterIndices(snapshotInfo.dataStreams(), requestIndices.toArray(new String[0]),
IndicesOptions.fromOptions(true, true, true, true));
if (requestedDataStreams.isEmpty()) {
dataStreams = new HashMap<>();
} else {
globalMetadata = repository.getSnapshotGlobalMetadata(snapshotId);
dataStreams = globalMetadata.dataStreams();
if (request.includeGlobalState() == false) {
dataStreams.keySet().retainAll(requestedDataStreams);
}
}
requestIndices.removeAll(dataStreams.keySet());
Set<String> dataStreamIndices = dataStreams.values().stream()
.flatMap(ds -> ds.getIndices().stream())
.map(Index::getName)
.collect(Collectors.toSet());
requestIndices.addAll(dataStreamIndices);
final List<String> indicesInSnapshot = filterIndices(snapshotInfo.indices(), requestIndices.toArray(new String[0]),
request.indicesOptions());
final Metadata.Builder metadataBuilder;
if (request.includeGlobalState()) {
metadataBuilder = Metadata.builder(repository.getSnapshotGlobalMetadata(snapshotId));
if (globalMetadata == null) {
globalMetadata = repository.getSnapshotGlobalMetadata(snapshotId);
}
metadataBuilder = Metadata.builder(globalMetadata);
} else {
metadataBuilder = Metadata.builder();
}
@ -220,7 +250,7 @@ public class RestoreService implements ClusterStateApplier {
// Apply renaming on index names, returning a map of names where
// the key is the renamed index and the value is the original name
final Map<String, String> indices = renamedIndices(request, indicesInSnapshot);
final Map<String, String> indices = renamedIndices(request, indicesInSnapshot, dataStreamIndices);
// Now we can start the actual restore process by adding shards to be recovered in the cluster state
// and updating cluster metadata (global and index) as needed
@ -389,6 +419,12 @@ public class RestoreService implements ClusterStateApplier {
checkAliasNameConflicts(indices, aliases);
Map<String, DataStream> updatedDataStreams = new HashMap<>(currentState.metadata().dataStreams());
updatedDataStreams.putAll(dataStreams.values().stream()
.map(ds -> updateDataStream(ds, mdBuilder, request))
.collect(Collectors.toMap(DataStream::getName, Function.identity())));
mdBuilder.dataStreams(updatedDataStreams);
// Restore global state if needed
if (request.includeGlobalState()) {
if (metadata.persistentSettings() != null) {
@ -560,6 +596,18 @@ public class RestoreService implements ClusterStateApplier {
}
}
//visible for testing
static DataStream updateDataStream(DataStream dataStream, Metadata.Builder metadata, RestoreSnapshotRequest request) {
String dataStreamName = dataStream.getName();
if (request.renamePattern() != null && request.renameReplacement() != null) {
dataStreamName = dataStreamName.replaceAll(request.renamePattern(), request.renameReplacement());
}
List<Index> updatedIndices = dataStream.getIndices().stream()
.map(i -> metadata.get(renameIndex(i.getName(), request, true)).getIndex())
.collect(Collectors.toList());
return new DataStream(dataStreamName, dataStream.getTimeStampField(), updatedIndices, dataStream.getGeneration());
}
public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInProgress oldRestore, Set<Index> deletedIndices) {
boolean changesMade = false;
RestoreInProgress.Builder builder = new RestoreInProgress.Builder();
@ -831,13 +879,11 @@ public class RestoreService implements ClusterStateApplier {
return failedShards;
}
private static Map<String, String> renamedIndices(RestoreSnapshotRequest request, List<String> filteredIndices) {
private static Map<String, String> renamedIndices(RestoreSnapshotRequest request, List<String> filteredIndices,
Set<String> dataStreamIndices) {
Map<String, String> renamedIndices = new HashMap<>();
for (String index : filteredIndices) {
String renamedIndex = index;
if (request.renameReplacement() != null && request.renamePattern() != null) {
renamedIndex = index.replaceAll(request.renamePattern(), request.renameReplacement());
}
String renamedIndex = renameIndex(index, request, dataStreamIndices.contains(index));
String previousIndex = renamedIndices.put(renamedIndex, index);
if (previousIndex != null) {
throw new SnapshotRestoreException(request.repository(), request.snapshot(),
@ -847,6 +893,21 @@ public class RestoreService implements ClusterStateApplier {
return Collections.unmodifiableMap(renamedIndices);
}
private static String renameIndex(String index, RestoreSnapshotRequest request, boolean partOfDataStream) {
String renamedIndex = index;
if (request.renameReplacement() != null && request.renamePattern() != null) {
partOfDataStream = partOfDataStream && index.startsWith(DataStream.BACKING_INDEX_PREFIX);
if (partOfDataStream) {
index = index.substring(DataStream.BACKING_INDEX_PREFIX.length());
}
renamedIndex = index.replaceAll(request.renamePattern(), request.renameReplacement());
if (partOfDataStream) {
renamedIndex = DataStream.BACKING_INDEX_PREFIX + renamedIndex;
}
}
return renamedIndex;
}
/**
* Checks that snapshots can be restored and have compatible version
*

View File

@ -53,6 +53,8 @@ import java.util.stream.Collectors;
*/
public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent, Writeable {
public static final Version DATA_STREAMS_IN_SNAPSHOT = Version.V_7_9_0;
public static final String CONTEXT_MODE_PARAM = "context_mode";
public static final String CONTEXT_MODE_SNAPSHOT = "SNAPSHOT";
public static final Version METADATA_FIELD_INTRODUCED = Version.V_7_3_0;
@ -60,6 +62,7 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
private static final String SNAPSHOT = "snapshot";
private static final String UUID = "uuid";
private static final String INDICES = "indices";
private static final String DATA_STREAMS = "data_streams";
private static final String STATE = "state";
private static final String REASON = "reason";
private static final String START_TIME = "start_time";
@ -92,6 +95,7 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
private String state = null;
private String reason = null;
private List<String> indices = null;
private List<String> dataStreams = null;
private long startTime = 0L;
private long endTime = 0L;
private ShardStatsBuilder shardStatsBuilder = null;
@ -120,6 +124,10 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
this.indices = indices;
}
private void setDataStreams(List<String> dataStreams) {
this.dataStreams = dataStreams;
}
private void setStartTime(long startTime) {
this.startTime = startTime;
}
@ -155,6 +163,10 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
indices = Collections.emptyList();
}
if (dataStreams == null) {
dataStreams = Collections.emptyList();
}
SnapshotState snapshotState = state == null ? null : SnapshotState.valueOf(state);
Version version = this.version == -1 ? Version.CURRENT : Version.fromId(this.version);
@ -165,7 +177,7 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
shardFailures = new ArrayList<>();
}
return new SnapshotInfo(snapshotId, indices, snapshotState, reason, version, startTime, endTime,
return new SnapshotInfo(snapshotId, indices, dataStreams, snapshotState, reason, version, startTime, endTime,
totalShards, successfulShards, shardFailures, includeGlobalState, userMetadata);
}
}
@ -203,6 +215,7 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
SNAPSHOT_INFO_PARSER.declareString(SnapshotInfoBuilder::setState, new ParseField(STATE));
SNAPSHOT_INFO_PARSER.declareString(SnapshotInfoBuilder::setReason, new ParseField(REASON));
SNAPSHOT_INFO_PARSER.declareStringArray(SnapshotInfoBuilder::setIndices, new ParseField(INDICES));
SNAPSHOT_INFO_PARSER.declareStringArray(SnapshotInfoBuilder::setDataStreams, new ParseField(DATA_STREAMS));
SNAPSHOT_INFO_PARSER.declareLong(SnapshotInfoBuilder::setStartTime, new ParseField(START_TIME_IN_MILLIS));
SNAPSHOT_INFO_PARSER.declareLong(SnapshotInfoBuilder::setEndTime, new ParseField(END_TIME_IN_MILLIS));
SNAPSHOT_INFO_PARSER.declareObject(SnapshotInfoBuilder::setShardStatsBuilder, SHARD_STATS_PARSER, new ParseField(SHARDS));
@ -226,6 +239,8 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
private final List<String> indices;
private final List<String> dataStreams;
private final long startTime;
private final long endTime;
@ -245,34 +260,33 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
private final List<SnapshotShardFailure> shardFailures;
public SnapshotInfo(SnapshotId snapshotId, List<String> indices, SnapshotState state) {
this(snapshotId, indices, state, null, null, 0L, 0L, 0, 0,
Collections.emptyList(), null, null);
public SnapshotInfo(SnapshotId snapshotId, List<String> indices, List<String> dataStreams, SnapshotState state) {
this(snapshotId, indices, dataStreams, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null);
}
public SnapshotInfo(SnapshotId snapshotId, List<String> indices, SnapshotState state, Version version) {
this(snapshotId, indices, state, null, version, 0L, 0L, 0, 0,
Collections.emptyList(), null, null);
public SnapshotInfo(SnapshotId snapshotId, List<String> indices, List<String> dataStreams, SnapshotState state, Version version) {
this(snapshotId, indices, dataStreams, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null, null);
}
public SnapshotInfo(SnapshotsInProgress.Entry entry) {
this(entry.snapshot().getSnapshotId(),
entry.indices().stream().map(IndexId::getName).collect(Collectors.toList()), SnapshotState.IN_PROGRESS, null, Version.CURRENT,
entry.startTime(), 0L, 0, 0, Collections.emptyList(), entry.includeGlobalState(), entry.userMetadata());
entry.indices().stream().map(IndexId::getName).collect(Collectors.toList()), entry.dataStreams(), SnapshotState.IN_PROGRESS,
null, Version.CURRENT, entry.startTime(), 0L, 0, 0, Collections.emptyList(), entry.includeGlobalState(), entry.userMetadata());
}
public SnapshotInfo(SnapshotId snapshotId, List<String> indices, long startTime, String reason, long endTime,
int totalShards, List<SnapshotShardFailure> shardFailures, Boolean includeGlobalState,
public SnapshotInfo(SnapshotId snapshotId, List<String> indices, List<String> dataStreams, long startTime, String reason,
long endTime, int totalShards, List<SnapshotShardFailure> shardFailures, Boolean includeGlobalState,
Map<String, Object> userMetadata) {
this(snapshotId, indices, snapshotState(reason, shardFailures), reason, Version.CURRENT,
this(snapshotId, indices, dataStreams, snapshotState(reason, shardFailures), reason, Version.CURRENT,
startTime, endTime, totalShards, totalShards - shardFailures.size(), shardFailures, includeGlobalState, userMetadata);
}
private SnapshotInfo(SnapshotId snapshotId, List<String> indices, SnapshotState state, String reason, Version version,
long startTime, long endTime, int totalShards, int successfulShards, List<SnapshotShardFailure> shardFailures,
Boolean includeGlobalState, Map<String, Object> userMetadata) {
private SnapshotInfo(SnapshotId snapshotId, List<String> indices, List<String> dataStreams, SnapshotState state, String reason,
Version version, long startTime, long endTime, int totalShards, int successfulShards,
List<SnapshotShardFailure> shardFailures, Boolean includeGlobalState, Map<String, Object> userMetadata) {
this.snapshotId = Objects.requireNonNull(snapshotId);
this.indices = Collections.unmodifiableList(Objects.requireNonNull(indices));
this.dataStreams = Collections.unmodifiableList(Objects.requireNonNull(dataStreams));
this.state = state;
this.reason = reason;
this.version = version;
@ -321,6 +335,11 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
} else {
userMetadata = null;
}
if (in.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) {
dataStreams = in.readStringList();
} else {
dataStreams = Collections.emptyList();
}
}
/**
@ -328,7 +347,7 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
* all information stripped out except the snapshot id, state, and indices.
*/
public SnapshotInfo basic() {
return new SnapshotInfo(snapshotId, indices, state);
return new SnapshotInfo(snapshotId, indices, Collections.emptyList(), state);
}
/**
@ -369,6 +388,13 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
return indices;
}
/**
* @return list of data streams that were included in this snapshot.
*/
public List<String> dataStreams(){
return dataStreams;
}
/**
* Returns time when snapshot started; a value of {@code 0L} will be returned if
* {@link #state()} returns {@code null}.
@ -514,6 +540,11 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
builder.value(index);
}
builder.endArray();
builder.startArray(DATA_STREAMS);
for (String dataStream : dataStreams) {
builder.value(dataStream);
}
builder.endArray();
if (includeGlobalState != null) {
builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState);
}
@ -564,6 +595,11 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
builder.value(index);
}
builder.endArray();
builder.startArray(DATA_STREAMS);
for (String dataStream : dataStreams) {
builder.value(dataStream);
}
builder.endArray();
builder.field(STATE, state);
if (reason != null) {
builder.field(REASON, reason);
@ -597,6 +633,7 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
SnapshotState state = SnapshotState.IN_PROGRESS;
String reason = null;
List<String> indices = Collections.emptyList();
List<String> dataStreams = Collections.emptyList();
long startTime = 0;
long endTime = 0;
int totalShards = 0;
@ -641,7 +678,12 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
includeGlobalState = parser.booleanValue();
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (INDICES.equals(currentFieldName)) {
if (DATA_STREAMS.equals(currentFieldName)){
dataStreams = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
dataStreams.add(parser.text());
}
} else if (INDICES.equals(currentFieldName)) {
ArrayList<String> indicesArray = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
indicesArray.add(parser.text());
@ -677,6 +719,7 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
}
return new SnapshotInfo(new SnapshotId(name, uuid),
indices,
dataStreams,
state,
reason,
version,
@ -722,6 +765,9 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
}
if (out.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
out.writeMap(userMetadata);
if(out.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)){
out.writeStringCollection(dataStreams);
}
}
}
@ -750,6 +796,7 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
state == that.state &&
Objects.equals(reason, that.reason) &&
Objects.equals(indices, that.indices) &&
Objects.equals(dataStreams, that.dataStreams) &&
Objects.equals(includeGlobalState, that.includeGlobalState) &&
Objects.equals(version, that.version) &&
Objects.equals(shardFailures, that.shardFailures) &&
@ -759,7 +806,7 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
@Override
public int hashCode() {
return Objects.hash(snapshotId, state, reason, indices, startTime, endTime,
return Objects.hash(snapshotId, state, reason, indices, dataStreams, startTime, endTime,
totalShards, successfulShards, includeGlobalState, version, shardFailures, userMetadata);
}
}

View File

@ -44,6 +44,7 @@ import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
@ -207,13 +208,23 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
}
// Store newSnapshot here to be processed in clusterStateProcessed
indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState,
request.indicesOptions(), request.indices()));
request.indicesOptions(), true, request.indices()));
Map<String, DataStream> allDataStreams = currentState.metadata().dataStreams();
List<String> dataStreams;
if (request.includeGlobalState()) {
dataStreams = new ArrayList<>(allDataStreams.keySet());
} else {
dataStreams = indexNameExpressionResolver.dataStreamNames(currentState, request.indicesOptions(), request.indices());
}
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
newSnapshot = new SnapshotsInProgress.Entry(
new Snapshot(repositoryName, snapshotId),
request.includeGlobalState(), request.partial(),
State.INIT,
Collections.emptyList(), // We'll resolve the list of indices when moving to the STARTED state in #beginSnapshot
dataStreams,
threadPool.absoluteTimeInMillis(),
RepositoryData.UNKNOWN_REPO_GEN,
null,
@ -508,6 +519,18 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
builder.put(indexMetadata, false);
}
}
Map<String, DataStream> dataStreams = new HashMap<>();
for (String dataStreamName : snapshot.dataStreams()) {
DataStream dataStream = metadata.dataStreams().get(dataStreamName);
if (dataStream == null) {
assert snapshot.partial() : "Data stream [" + dataStreamName +
"] was deleted during a snapshot but snapshot was not partial.";
} else {
dataStreams.put(dataStreamName, dataStream);
}
}
builder.dataStreams(dataStreams);
metadata = builder.build();
}
return metadata;
@ -1475,6 +1498,24 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
return builder.build();
}
/**
* Returns the data streams that are currently being snapshotted (with partial == false) and that are contained in the
* indices-to-check set.
*/
public static Set<String> snapshottingDataStreams(final ClusterState currentState, final Set<String> dataStreamsToCheck) {
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots == null) {
return emptySet();
}
Map<String, DataStream> dataStreams = currentState.metadata().dataStreams();
return snapshots.entries().stream()
.filter(e -> e.partial() == false)
.flatMap(e -> e.dataStreams().stream())
.filter(ds -> dataStreams.containsKey(ds) && dataStreamsToCheck.contains(ds))
.collect(Collectors.toSet());
}
/**
* Returns the indices that are currently being snapshotted (with partial == false) and that are contained in the indices-to-check set.
*/

View File

@ -51,6 +51,10 @@ public class CreateSnapshotResponseTests extends AbstractXContentTestCase<Create
List<String> indices = new ArrayList<>();
indices.add("test0");
indices.add("test1");
List<String> dataStreams = new ArrayList<>();
dataStreams.add("test0");
dataStreams.add("test1");
String reason = "reason";
long startTime = System.currentTimeMillis();
long endTime = startTime + 10000;
@ -66,7 +70,7 @@ public class CreateSnapshotResponseTests extends AbstractXContentTestCase<Create
boolean globalState = randomBoolean();
return new CreateSnapshotResponse(
new SnapshotInfo(snapshotId, indices, startTime, reason, endTime, totalShards, shardFailures,
new SnapshotInfo(snapshotId, indices, dataStreams, startTime, reason, endTime, totalShards, shardFailures,
globalState, SnapshotInfoTests.randomUserMetadata()));
}

View File

@ -52,8 +52,8 @@ public class GetSnapshotsResponseTests extends AbstractSerializingTestCase<GetSn
String reason = randomBoolean() ? null : "reason";
ShardId shardId = new ShardId("index", UUIDs.base64UUID(), 2);
List<SnapshotShardFailure> shardFailures = Collections.singletonList(new SnapshotShardFailure("node-id", shardId, "reason"));
snapshots.add(new SnapshotInfo(snapshotId, Arrays.asList("indice1", "indice2"), System.currentTimeMillis(), reason,
System.currentTimeMillis(), randomIntBetween(2, 3), shardFailures, randomBoolean(),
snapshots.add(new SnapshotInfo(snapshotId, Arrays.asList("index1", "index2"), Collections.singletonList("ds"),
System.currentTimeMillis(), reason, System.currentTimeMillis(), randomIntBetween(2, 3), shardFailures, randomBoolean(),
SnapshotInfoTests.randomUserMetadata()));
}
return new GetSnapshotsResponse(snapshots);

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction.Request;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
@ -32,9 +33,14 @@ import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@ -85,6 +91,32 @@ public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCas
}
}
public void testDeleteSnapshottingDataStream() {
final String dataStreamName = "my-data-stream1";
final String dataStreamName2 = "my-data-stream2";
final List<String> otherIndices = randomSubsetOf(Arrays.asList("foo", "bar", "baz"));
ClusterState cs = getClusterStateWithDataStreams(Arrays.asList(new Tuple<>(dataStreamName, 2), new Tuple<>(dataStreamName2, 2)),
otherIndices);
SnapshotsInProgress snapshotsInProgress = new SnapshotsInProgress(Arrays.asList(
createEntry(dataStreamName, "repo1", false),
createEntry(dataStreamName2, "repo2", true)));
ClusterState snapshotCs = ClusterState.builder(cs).putCustom(SnapshotsInProgress.TYPE, snapshotsInProgress).build();
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(dataStreamName);
SnapshotInProgressException e = expectThrows(SnapshotInProgressException.class,
() -> DeleteDataStreamAction.TransportAction.removeDataStream(getMetadataDeleteIndexService(), snapshotCs, req));
assertThat(e.getMessage(), equalTo("Cannot delete data streams that are being snapshotted: [my-data-stream1]. Try again after " +
"snapshot finishes or cancel the currently running snapshot."));
}
private SnapshotsInProgress.Entry createEntry(String dataStreamName, String repo, boolean partial) {
return new SnapshotsInProgress.Entry(new Snapshot(repo, new SnapshotId("", "")), false, partial,
SnapshotsInProgress.State.STARTED, Collections.emptyList(), Collections.singletonList(dataStreamName), 0, 1, null, null, null
, null);
}
public void testDeleteNonexistentDataStream() {
final String dataStreamName = "my-data-stream";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();

View File

@ -1944,4 +1944,45 @@ public class IndexNameExpressionResolverTests extends ESTestCase {
assertThat(result[1].getName(), equalTo(DataStream.getDefaultBackingIndexName(dataStream1, 2)));
assertThat(result[2].getName(), equalTo("logs-foobarbaz-0"));
}
public void testDataStreamsNames() {
final String dataStream1 = "logs-foobar";
final String dataStream2 = "other-foobar";
IndexMetadata index1 = createBackingIndex(dataStream1, 1).build();
IndexMetadata index2 = createBackingIndex(dataStream1, 2).build();
IndexMetadata justAnIndex = IndexMetadata.builder("logs-foobarbaz-0")
.settings(ESTestCase.settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.putAlias(new AliasMetadata.Builder("logs-foobarbaz"))
.build();
IndexMetadata index3 = createBackingIndex(dataStream2, 1).build();
IndexMetadata index4 = createBackingIndex(dataStream2, 2).build();
ClusterState state = ClusterState.builder(new ClusterName("_name"))
.metadata(Metadata.builder()
.put(index1, false)
.put(index2, false)
.put(index3, false)
.put(index4, false)
.put(justAnIndex, false)
.put(new DataStream(dataStream1, "ts", Arrays.asList(index1.getIndex(), index2.getIndex())))
.put(new DataStream(dataStream2, "ts", Arrays.asList(index3.getIndex(), index4.getIndex())))).build();
List<String> names = indexNameExpressionResolver.dataStreamNames(state, IndicesOptions.lenientExpand(), "log*");
assertEquals(Collections.singletonList(dataStream1), names);
names = indexNameExpressionResolver.dataStreamNames(state, IndicesOptions.lenientExpand(), "other*");
assertEquals(Collections.singletonList(dataStream2), names);
names = indexNameExpressionResolver.dataStreamNames(state, IndicesOptions.lenientExpand(), "*foobar");
assertThat(names, containsInAnyOrder(dataStream1, dataStream2));
names = indexNameExpressionResolver.dataStreamNames(state, IndicesOptions.lenientExpand(), "notmatched");
assertThat(names, empty());
names = indexNameExpressionResolver.dataStreamNames(state, IndicesOptions.lenientExpand(), index3.getIndex().getName());
assertThat(names, empty());
}
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.snapshots;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.index.Index;
import org.elasticsearch.test.ESTestCase;
import java.util.Collections;
import java.util.List;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class RestoreServiceTests extends ESTestCase {
public void testUpdateDataStream() {
String dataStreamName = "data-stream-1";
String backingIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
List<Index> indices = Collections.singletonList(new Index(backingIndexName, "uuid"));
DataStream dataStream = new DataStream(dataStreamName, "@timestamp", indices);
Metadata.Builder metadata = mock(Metadata.Builder.class);
IndexMetadata indexMetadata = mock(IndexMetadata.class);
when(metadata.get(eq(backingIndexName))).thenReturn(indexMetadata);
Index updatedIndex = new Index(backingIndexName, "uuid2");
when(indexMetadata.getIndex()).thenReturn(updatedIndex);
RestoreSnapshotRequest request = new RestoreSnapshotRequest();
DataStream updateDataStream = RestoreService.updateDataStream(dataStream, metadata, request);
assertEquals(dataStreamName, updateDataStream.getName());
assertEquals(Collections.singletonList(updatedIndex), updateDataStream.getIndices());
}
public void testUpdateDataStreamRename() {
String dataStreamName = "data-stream-1";
String renamedDataStreamName = "data-stream-2";
String backingIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
String renamedBackingIndexName = DataStream.getDefaultBackingIndexName(renamedDataStreamName, 1);
List<Index> indices = Collections.singletonList(new Index(backingIndexName, "uuid"));
DataStream dataStream = new DataStream(dataStreamName, "@timestamp", indices);
Metadata.Builder metadata = mock(Metadata.Builder.class);
IndexMetadata indexMetadata = mock(IndexMetadata.class);
when(metadata.get(eq(renamedBackingIndexName))).thenReturn(indexMetadata);
Index renamedIndex = new Index(renamedBackingIndexName, "uuid2");
when(indexMetadata.getIndex()).thenReturn(renamedIndex);
RestoreSnapshotRequest request = new RestoreSnapshotRequest().renamePattern("data-stream-1").renameReplacement("data-stream-2");
DataStream renamedDataStream = RestoreService.updateDataStream(dataStream, metadata, request);
assertEquals(renamedDataStreamName, renamedDataStream.getName());
assertEquals(Collections.singletonList(renamedIndex), renamedDataStream.getIndices());
}
public void testPrefixNotChanged() {
String dataStreamName = "ds-000001";
String renamedDataStreamName = "ds2-000001";
String backingIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
String renamedBackingIndexName = DataStream.getDefaultBackingIndexName(renamedDataStreamName, 1);
List<Index> indices = Collections.singletonList(new Index(backingIndexName, "uuid"));
DataStream dataStream = new DataStream(dataStreamName, "@timestamp", indices);
Metadata.Builder metadata = mock(Metadata.Builder.class);
IndexMetadata indexMetadata = mock(IndexMetadata.class);
when(metadata.get(eq(renamedBackingIndexName))).thenReturn(indexMetadata);
Index renamedIndex = new Index(renamedBackingIndexName, "uuid2");
when(indexMetadata.getIndex()).thenReturn(renamedIndex);
RestoreSnapshotRequest request = new RestoreSnapshotRequest().renamePattern("ds-").renameReplacement("ds2-");
DataStream renamedDataStream = RestoreService.updateDataStream(dataStream, metadata, request);
assertEquals(renamedDataStreamName, renamedDataStream.getName());
assertEquals(Collections.singletonList(renamedIndex), renamedDataStream.getIndices());
request = new RestoreSnapshotRequest().renamePattern("ds-000001").renameReplacement("ds2-000001");
renamedDataStream = RestoreService.updateDataStream(dataStream, metadata, request);
assertEquals(renamedDataStreamName, renamedDataStream.getName());
assertEquals(Collections.singletonList(renamedIndex), renamedDataStream.getIndices());
}
}

View File

@ -36,6 +36,8 @@ public class SnapshotInfoTests extends AbstractWireSerializingTestCase<SnapshotI
SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5));
List<String> indices = Arrays.asList(randomArray(1, 10, String[]::new, () -> randomAlphaOfLengthBetween(2, 20)));
List<String> dataStreams = Arrays.asList(randomArray(1, 10, String[]::new, () -> randomAlphaOfLengthBetween(2, 20)));
String reason = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 15);
long startTime = randomNonNegativeLong();
@ -57,7 +59,7 @@ public class SnapshotInfoTests extends AbstractWireSerializingTestCase<SnapshotI
Map<String, Object> userMetadata = randomUserMetadata();
return new SnapshotInfo(snapshotId, indices, startTime, reason, endTime, totalShards, shardFailures,
return new SnapshotInfo(snapshotId, indices, dataStreams, startTime, reason, endTime, totalShards, shardFailures,
includeGlobalState, userMetadata);
}
@ -68,34 +70,34 @@ public class SnapshotInfoTests extends AbstractWireSerializingTestCase<SnapshotI
@Override
protected SnapshotInfo mutateInstance(SnapshotInfo instance) {
switch (randomIntBetween(0, 7)) {
switch (randomIntBetween(0, 8)) {
case 0:
SnapshotId snapshotId = new SnapshotId(
randomValueOtherThan(instance.snapshotId().getName(), () -> randomAlphaOfLength(5)),
randomValueOtherThan(instance.snapshotId().getUUID(), () -> randomAlphaOfLength(5)));
return new SnapshotInfo(snapshotId, instance.indices(), instance.startTime(), instance.reason(),
return new SnapshotInfo(snapshotId, instance.indices(), instance.dataStreams(), instance.startTime(), instance.reason(),
instance.endTime(), instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(),
instance.userMetadata());
case 1:
int indicesSize = randomValueOtherThan(instance.indices().size(), () -> randomIntBetween(1, 10));
List<String> indices = Arrays.asList(randomArray(indicesSize, indicesSize, String[]::new,
() -> randomAlphaOfLengthBetween(2, 20)));
return new SnapshotInfo(instance.snapshotId(), indices, instance.startTime(), instance.reason(),
return new SnapshotInfo(instance.snapshotId(), indices, instance.dataStreams(), instance.startTime(), instance.reason(),
instance.endTime(), instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(),
instance.userMetadata());
case 2:
return new SnapshotInfo(instance.snapshotId(), instance.indices(),
return new SnapshotInfo(instance.snapshotId(), instance.indices(), instance.dataStreams(),
randomValueOtherThan(instance.startTime(), ESTestCase::randomNonNegativeLong), instance.reason(),
instance.endTime(), instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(),
instance.userMetadata());
case 3:
return new SnapshotInfo(instance.snapshotId(), instance.indices(), instance.startTime(),
return new SnapshotInfo(instance.snapshotId(), instance.indices(), instance.dataStreams(), instance.startTime(),
randomValueOtherThan(instance.reason(), () -> randomAlphaOfLengthBetween(5, 15)), instance.endTime(),
instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata());
case 4:
return new SnapshotInfo(instance.snapshotId(), instance.indices(), instance.startTime(), instance.reason(),
randomValueOtherThan(instance.endTime(), ESTestCase::randomNonNegativeLong), instance.totalShards(),
instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata());
return new SnapshotInfo(instance.snapshotId(), instance.indices(), instance.dataStreams(),
instance.startTime(), instance.reason(), randomValueOtherThan(instance.endTime(), ESTestCase::randomNonNegativeLong),
instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata());
case 5:
int totalShards = randomValueOtherThan(instance.totalShards(), () -> randomIntBetween(0, 100));
int failedShards = randomIntBetween(0, totalShards);
@ -108,16 +110,23 @@ public class SnapshotInfoTests extends AbstractWireSerializingTestCase<SnapshotI
return new SnapshotShardFailure(randomAlphaOfLengthBetween(5, 10), shardId, randomAlphaOfLengthBetween(5, 10));
}));
return new SnapshotInfo(instance.snapshotId(), instance.indices(), instance.startTime(), instance.reason(),
instance.endTime(), totalShards, shardFailures, instance.includeGlobalState(), instance.userMetadata());
return new SnapshotInfo(instance.snapshotId(), instance.indices(), instance.dataStreams(), instance.startTime(),
instance.reason(), instance.endTime(), totalShards, shardFailures, instance.includeGlobalState(),
instance.userMetadata());
case 6:
return new SnapshotInfo(instance.snapshotId(), instance.indices(), instance.startTime(), instance.reason(),
instance.endTime(), instance.totalShards(), instance.shardFailures(),
return new SnapshotInfo(instance.snapshotId(), instance.indices(), instance.dataStreams(), instance.startTime(),
instance.reason(), instance.endTime(), instance.totalShards(), instance.shardFailures(),
Boolean.FALSE.equals(instance.includeGlobalState()), instance.userMetadata());
case 7:
return new SnapshotInfo(instance.snapshotId(), instance.indices(), instance.startTime(), instance.reason(),
instance.endTime(), instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(),
return new SnapshotInfo(instance.snapshotId(), instance.indices(), instance.dataStreams(), instance.startTime(),
instance.reason(), instance.endTime(), instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(),
randomValueOtherThan(instance.userMetadata(), SnapshotInfoTests::randomUserMetadata));
case 8:
List<String> dataStreams = randomValueOtherThan(instance.dataStreams(),
() -> Arrays.asList(randomArray(1, 10, String[]::new, () -> randomAlphaOfLengthBetween(2, 20))));
return new SnapshotInfo(instance.snapshotId(), instance.indices(), dataStreams,
instance.startTime(), instance.reason(), instance.endTime(), instance.totalShards(), instance.shardFailures(),
instance.includeGlobalState(), instance.userMetadata());
default:
throw new IllegalArgumentException("invalid randomization case");
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.test.AbstractDiffableWireSerializationTestCase;
import org.elasticsearch.test.VersionUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@ -66,6 +67,7 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS
ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
final List<Index> esIndices =
indices.stream().map(i -> new Index(i.getName(), randomAlphaOfLength(10))).collect(Collectors.toList());
List<String> dataStreams = Arrays.asList(generateRandomStringArray(10, 10, false));
for (Index idx : esIndices) {
int shardsCount = randomIntBetween(1, 10);
for (int j = 0; j < shardsCount; j++) {
@ -77,7 +79,7 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS
}
}
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = builder.build();
return new Entry(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards,
return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards,
SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random()));
}

View File

@ -172,11 +172,13 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
Client remoteClient = getRemoteClusterClient();
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetadata(true).setNodes(true)
.get(ccrSettings.getRecoveryActionTimeout());
ImmutableOpenMap<String, IndexMetadata> indicesMap = response.getState().metadata().indices();
Metadata metadata = response.getState().metadata();
ImmutableOpenMap<String, IndexMetadata> indicesMap = metadata.indices();
ArrayList<String> indices = new ArrayList<>(indicesMap.size());
indicesMap.keysIt().forEachRemaining(indices::add);
return new SnapshotInfo(snapshotId, indices, SnapshotState.SUCCESS, response.getState().getNodes().getMaxNodeVersion());
return new SnapshotInfo(snapshotId, indices, new ArrayList<>(metadata.dataStreams().keySet()), SnapshotState.SUCCESS,
response.getState().getNodes().getMaxNodeVersion());
}
@Override

View File

@ -249,6 +249,7 @@ public class SnapshotRetentionConfigurationTests extends ESTestCase {
final int totalShards = between(1,20);
SnapshotInfo snapInfo = new SnapshotInfo(new SnapshotId("snap-" + randomAlphaOfLength(3), "uuid"),
Collections.singletonList("foo"),
Collections.singletonList("bar"),
startTime,
null,
startTime + between(1, 10000),
@ -280,6 +281,7 @@ public class SnapshotRetentionConfigurationTests extends ESTestCase {
assert failureCount == failures.size();
SnapshotInfo snapInfo = new SnapshotInfo(new SnapshotId("snap-fail-" + randomAlphaOfLength(3), "uuid-fail"),
Collections.singletonList("foo-fail"),
Collections.singletonList("bar-fail"),
startTime,
"forced-failure",
startTime + between(1, 10000),
@ -303,6 +305,7 @@ public class SnapshotRetentionConfigurationTests extends ESTestCase {
assert failureCount == failures.size();
SnapshotInfo snapInfo = new SnapshotInfo(new SnapshotId("snap-fail-" + randomAlphaOfLength(3), "uuid-fail"),
Collections.singletonList("foo-fail"),
Collections.singletonList("bar-fail"),
startTime,
null,
startTime + between(1, 10000),

View File

@ -15,7 +15,10 @@ import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.indexlifecycle.RolloverAction;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
@ -48,10 +51,11 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createComposableTemplate;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex;
import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem.CREATE_OPERATION;
import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem.DELETE_OPERATION;
import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryStore.SLM_HISTORY_INDEX_PREFIX;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@ -458,6 +462,48 @@ public class SnapshotLifecycleRestIT extends ESRestTestCase {
}
}
public void testDataStreams() throws Exception {
String dataStreamName = "ds-test";
String repoId = "ds-repo";
String policyName = "ds-policy";
String mapping = "{\n" +
" \"properties\": {\n" +
" \"@timestamp\": {\n" +
" \"type\": \"date\"\n" +
" }\n" +
" }\n" +
" }";
Template template = new Template(null, new CompressedXContent(mapping), null);
createComposableTemplate(client(), "ds-template", dataStreamName, template);
client().performRequest(new Request("PUT", "_data_stream/" + dataStreamName));
// Create a snapshot repo
initializeRepo(repoId);
createSnapshotPolicy(policyName, "snap", NEVER_EXECUTE_CRON_SCHEDULE, repoId, dataStreamName, true);
final String snapshotName = executePolicy(policyName);
// Check that the executed snapshot is created
assertBusy(() -> {
try {
Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName));
Map<String, Object> snapshotResponseMap;
try (InputStream is = response.getEntity().getContent()) {
snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
}
assertThat(snapshotResponseMap.size(), greaterThan(0));
final Map<String, Object> snapshot = extractSnapshot(snapshotResponseMap, snapshotName);
assertEquals(Collections.singletonList(dataStreamName), snapshot.get("data_streams"));
assertEquals(Collections.singletonList(DataStream.getDefaultBackingIndexName(dataStreamName, 1)), snapshot.get("indices"));
} catch (ResponseException e) {
fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity()));
}
});
}
@SuppressWarnings("unchecked")
public void testSLMXpackInfo() {
Map<String, Object> features = (Map<String, Object>) getLocation("/_xpack").get("features");
@ -546,10 +592,14 @@ public class SnapshotLifecycleRestIT extends ESRestTestCase {
@SuppressWarnings("unchecked")
private static Map<String, Object> extractMetadata(Map<String, Object> snapshotResponseMap, String snapshotPrefix) {
return (Map<String, Object>) extractSnapshot(snapshotResponseMap, snapshotPrefix).get("metadata");
}
@SuppressWarnings("unchecked")
private static Map<String, Object> extractSnapshot(Map<String, Object> snapshotResponseMap, String snapshotPrefix) {
List<Map<String, Object>> snapshots = ((List<Map<String, Object>>) snapshotResponseMap.get("snapshots"));
return snapshots.stream()
.filter(snapshot -> ((String) snapshot.get("snapshot")).startsWith(snapshotPrefix))
.map(snapshot -> (Map<String, Object>) snapshot.get("metadata"))
.findFirst()
.orElse(null);
}

View File

@ -242,6 +242,7 @@ public class SnapshotLifecycleTaskTests extends ESTestCase {
new SnapshotInfo(
new SnapshotId(req.snapshot(), "uuid"),
Arrays.asList(req.indices()),
Collections.emptyList(),
startTime,
"snapshot started",
endTime,

View File

@ -124,33 +124,33 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
// Test when user metadata is null
SnapshotInfo info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"),
0L, null, 1L, 1, Collections.emptyList(), true, null);
Collections.emptyList(),0L, null, 1L, 1, Collections.emptyList(), true, null);
assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(false));
// Test when no retention is configured
info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"),
info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), Collections.emptyList(),
0L, null, 1L, 1, Collections.emptyList(), true, null);
assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyWithNoRetentionMap), equalTo(false));
// Test when user metadata is a map that doesn't contain "policy"
info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"),
info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), Collections.emptyList(),
0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("foo", "bar"));
assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(false));
// Test with an ancient snapshot that should be expunged
info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"),
info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), Collections.emptyList(),
0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy"));
assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(true));
// Test with a snapshot that's start date is old enough to be expunged (but the finish date is not)
long time = System.currentTimeMillis() - TimeValue.timeValueDays(30).millis() - 1;
info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"),
info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), Collections.emptyList(),
time, null, time + TimeValue.timeValueDays(4).millis(), 1, Collections.emptyList(),
true, Collections.singletonMap("policy", "policy"));
assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(true));
// Test with a fresh snapshot that should not be expunged
info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"),
info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), Collections.emptyList(),
System.currentTimeMillis(), null, System.currentTimeMillis() + 1,
1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy"));
assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(false));
@ -178,9 +178,9 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
ClusterServiceUtils.setState(clusterService, state);
final SnapshotInfo eligibleSnapshot = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"),
0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId));
Collections.emptyList(), 0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId));
final SnapshotInfo ineligibleSnapshot = new SnapshotInfo(new SnapshotId("name2", "uuid2"), Collections.singletonList("index"),
System.currentTimeMillis(), null, System.currentTimeMillis() + 1, 1,
Collections.emptyList(), System.currentTimeMillis(), null, System.currentTimeMillis() + 1, 1,
Collections.emptyList(), true, Collections.singletonMap("policy", policyId));
Set<SnapshotId> deleted = ConcurrentHashMap.newKeySet();
@ -266,15 +266,15 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
ClusterServiceUtils.setState(clusterService, state);
final SnapshotInfo snap1 = new SnapshotInfo(new SnapshotId("name1", "uuid1"), Collections.singletonList("index"),
0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId));
Collections.emptyList(), 0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId));
final SnapshotInfo snap2 = new SnapshotInfo(new SnapshotId("name2", "uuid2"), Collections.singletonList("index"),
1L, null, 2L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId));
Collections.emptyList(), 1L, null, 2L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId));
final SnapshotInfo snap3 = new SnapshotInfo(new SnapshotId("name3", "uuid3"), Collections.singletonList("index"),
2L, null, 3L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId));
Collections.emptyList(), 2L, null, 3L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId));
final SnapshotInfo snap4 = new SnapshotInfo(new SnapshotId("name4", "uuid4"), Collections.singletonList("index"),
3L, null, 4L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId));
Collections.emptyList(), 3L, null, 4L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId));
final SnapshotInfo snap5 = new SnapshotInfo(new SnapshotId("name5", "uuid5"), Collections.singletonList("index"),
4L, null, 5L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId));
Collections.emptyList(), 4L, null, 5L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", policyId));
final Set<SnapshotId> deleted = ConcurrentHashMap.newKeySet();
// We're expected two deletions before they hit the "taken too long" test, so have a latch of 2