Clone Snapshot API (#61839) (#63291)

Snapshot clone API. Complete except for some TODOs around documentation (and adding HLRC support).

backport of #61839, #63217, #63037
This commit is contained in:
Armin Braun 2020-10-06 01:52:25 +02:00 committed by GitHub
parent 25f8a3ba42
commit 5c3a4c13dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 2143 additions and 129 deletions

View File

@ -0,0 +1,52 @@
[[clone-snapshot-api]]
=== Clone snapshot API
++++
<titleabbrev>Clone snapshot</titleabbrev>
++++
Clones part or all of a snapshot into a new snapshot.
[source,console]
----
PUT /_snapshot/my_repository/source_snapshot/_clone/target_snapshot
{
"indices": "index_a,index_b"
}
----
// TEST[skip:TODO]
[[clone-snapshot-api-request]]
==== {api-request-title}
`PUT /_snapshot/<repository>/<source_snapshot>/_clone/<target_snapshot>`
[[clone-snapshot-api-desc]]
==== {api-description-title}
The clone snapshot API allows creating a copy of all or part of an existing snapshot
within the same repository.
[[clone-snapshot-api-params]]
==== {api-path-parms-title}
`<repository>`::
(Required, string)
Name of the snapshot repository that both source and target snapshot belong to.
[[clone-snapshot-api-query-params]]
==== {api-query-parms-title}
`master_timeout`::
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
`timeout`::
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a response. If no response is received before the timeout expires, the request
fails and returns an error. Defaults to `30s`.
`indices`::
(Required, string)
A comma-separated list of indices to include in the snapshot.
<<multi-index,Multi-index syntax>> is supported.

View File

@ -107,6 +107,7 @@ understand the time requirements before proceeding.
--
include::register-repository.asciidoc[]
include::apis/clone-snapshot-api.asciidoc[]
include::take-snapshot.asciidoc[]
include::restore-snapshot.asciidoc[]
include::monitor-snapshot-restore.asciidoc[]

View File

@ -124,3 +124,5 @@ PUT /_snapshot/my_backup/<snapshot-{now/d}>
PUT /_snapshot/my_backup/%3Csnapshot-%7Bnow%2Fd%7D%3E
-----------------------------------
// TEST[continued]
NOTE: You can also create snapshots that are copies of part of an existing snapshot using the <<clone-snapshot-api,clone snapshot API>>.

View File

@ -18,6 +18,26 @@
*/
package org.elasticsearch.snapshots;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.UUIDs;
@ -26,16 +46,11 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots;
import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryShardId;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.test.ESIntegTestCase;
import java.nio.file.Path;
import java.util.List;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
@ -53,7 +68,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
if (useBwCFormat) {
initWithSnapshotVersion(repoName, repoPath, SnapshotsService.OLD_SNAPSHOT_FORMAT);
// Re-create repo to clear repository data cache
assertAcked(client().admin().cluster().prepareDeleteRepository(repoName).get());
assertAcked(clusterAdmin().prepareDeleteRepository(repoName).get());
createRepository(repoName, "fs", repoPath);
}
@ -107,6 +122,389 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
assertEquals(newShardGeneration, newShardGeneration2);
}
public void testCloneSnapshotIndex() throws Exception {
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();
final String repoName = "repo-name";
createRepository(repoName, "fs");
final String indexName = "index-1";
createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));
final String sourceSnapshot = "source-snapshot";
createFullSnapshot(repoName, sourceSnapshot);
indexRandomDocs(indexName, randomIntBetween(20, 100));
if (randomBoolean()) {
assertAcked(admin().indices().prepareDelete(indexName));
}
final String targetSnapshot = "target-snapshot";
assertAcked(startClone(repoName, sourceSnapshot, targetSnapshot, indexName).get());
final List<SnapshotStatus> status = clusterAdmin().prepareSnapshotStatus(repoName)
.setSnapshots(sourceSnapshot, targetSnapshot).get().getSnapshots();
assertThat(status, hasSize(2));
final SnapshotIndexStatus status1 = status.get(0).getIndices().get(indexName);
final SnapshotIndexStatus status2 = status.get(1).getIndices().get(indexName);
assertEquals(status1.getStats().getTotalFileCount(), status2.getStats().getTotalFileCount());
assertEquals(status1.getStats().getTotalSize(), status2.getStats().getTotalSize());
}
public void testClonePreventsSnapshotDelete() throws Exception {
final String masterName = internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();
final String repoName = "repo-name";
createRepository(repoName, "mock");
final String indexName = "index-1";
createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));
final String sourceSnapshot = "source-snapshot";
createFullSnapshot(repoName, sourceSnapshot);
indexRandomDocs(indexName, randomIntBetween(20, 100));
final String targetSnapshot = "target-snapshot";
blockNodeOnAnyFiles(repoName, masterName);
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexName);
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
assertFalse(cloneFuture.isDone());
ConcurrentSnapshotExecutionException ex = expectThrows(ConcurrentSnapshotExecutionException.class,
() -> startDeleteSnapshot(repoName, sourceSnapshot).actionGet());
assertThat(ex.getMessage(), containsString("cannot delete snapshot while it is being cloned"));
unblockNode(repoName, masterName);
assertAcked(cloneFuture.get());
final List<SnapshotStatus> status = clusterAdmin().prepareSnapshotStatus(repoName)
.setSnapshots(sourceSnapshot, targetSnapshot).get().getSnapshots();
assertThat(status, hasSize(2));
final SnapshotIndexStatus status1 = status.get(0).getIndices().get(indexName);
final SnapshotIndexStatus status2 = status.get(1).getIndices().get(indexName);
assertEquals(status1.getStats().getTotalFileCount(), status2.getStats().getTotalFileCount());
assertEquals(status1.getStats().getTotalSize(), status2.getStats().getTotalSize());
}
public void testConcurrentCloneAndSnapshot() throws Exception {
internalCluster().startMasterOnlyNode();
final String dataNode = internalCluster().startDataOnlyNode();
final String repoName = "repo-name";
createRepository(repoName, "mock");
final String indexName = "index-1";
createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));
final String sourceSnapshot = "source-snapshot";
createFullSnapshot(repoName, sourceSnapshot);
indexRandomDocs(indexName, randomIntBetween(20, 100));
final String targetSnapshot = "target-snapshot";
final ActionFuture<CreateSnapshotResponse> snapshot2Future =
startFullSnapshotBlockedOnDataNode("snapshot-2", repoName, dataNode);
waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexName);
awaitNumberOfSnapshotsInProgress(2);
unblockNode(repoName, dataNode);
assertAcked(cloneFuture.get());
assertSuccessful(snapshot2Future);
}
public void testLongRunningCloneAllowsConcurrentSnapshot() throws Exception {
// large snapshot pool so blocked snapshot threads from cloning don't prevent concurrent snapshot finalizations
final String masterNode = internalCluster().startMasterOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS);
internalCluster().startDataOnlyNode();
final String repoName = "test-repo";
createRepository(repoName, "mock");
final String indexSlow = "index-slow";
createIndexWithContent(indexSlow);
final String sourceSnapshot = "source-snapshot";
createFullSnapshot(repoName, sourceSnapshot);
final String targetSnapshot = "target-snapshot";
blockMasterOnShardClone(repoName);
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexSlow);
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
final String indexFast = "index-fast";
createIndexWithRandomDocs(indexFast, randomIntBetween(20, 100));
assertSuccessful(clusterAdmin().prepareCreateSnapshot(repoName, "fast-snapshot")
.setIndices(indexFast).setWaitForCompletion(true).execute());
assertThat(cloneFuture.isDone(), is(false));
unblockNode(repoName, masterNode);
assertAcked(cloneFuture.get());
}
public void testLongRunningSnapshotAllowsConcurrentClone() throws Exception {
internalCluster().startMasterOnlyNode();
final String dataNode = internalCluster().startDataOnlyNode();
final String repoName = "test-repo";
createRepository(repoName, "mock");
final String indexSlow = "index-slow";
createIndexWithContent(indexSlow);
final String sourceSnapshot = "source-snapshot";
createFullSnapshot(repoName, sourceSnapshot);
final String indexFast = "index-fast";
createIndexWithRandomDocs(indexFast, randomIntBetween(20, 100));
blockDataNode(repoName, dataNode);
final ActionFuture<CreateSnapshotResponse> snapshotFuture = clusterAdmin()
.prepareCreateSnapshot(repoName, "fast-snapshot").setIndices(indexFast).setWaitForCompletion(true).execute();
waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
final String targetSnapshot = "target-snapshot";
assertAcked(startClone(repoName, sourceSnapshot, targetSnapshot, indexSlow).get());
assertThat(snapshotFuture.isDone(), is(false));
unblockNode(repoName, dataNode);
assertSuccessful(snapshotFuture);
}
public void testDeletePreventsClone() throws Exception {
final String masterName = internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();
final String repoName = "repo-name";
createRepository(repoName, "mock");
final String indexName = "index-1";
createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));
final String sourceSnapshot = "source-snapshot";
createFullSnapshot(repoName, sourceSnapshot);
indexRandomDocs(indexName, randomIntBetween(20, 100));
final String targetSnapshot = "target-snapshot";
blockNodeOnAnyFiles(repoName, masterName);
final ActionFuture<AcknowledgedResponse> deleteFuture = startDeleteSnapshot(repoName, sourceSnapshot);
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
assertFalse(deleteFuture.isDone());
ConcurrentSnapshotExecutionException ex = expectThrows(ConcurrentSnapshotExecutionException.class, () ->
startClone(repoName, sourceSnapshot, targetSnapshot, indexName).actionGet());
assertThat(ex.getMessage(), containsString("cannot clone from snapshot that is being deleted"));
unblockNode(repoName, masterName);
assertAcked(deleteFuture.get());
}
public void testBackToBackClonesForIndexNotInCluster() throws Exception {
// large snapshot pool so blocked snapshot threads from cloning don't prevent concurrent snapshot finalizations
final String masterNode = internalCluster().startMasterOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS);
internalCluster().startDataOnlyNode();
final String repoName = "test-repo";
createRepository(repoName, "mock");
final String indexBlocked = "index-blocked";
createIndexWithContent(indexBlocked);
final String sourceSnapshot = "source-snapshot";
createFullSnapshot(repoName, sourceSnapshot);
assertAcked(admin().indices().prepareDelete(indexBlocked).get());
final String targetSnapshot1 = "target-snapshot";
blockMasterOnShardClone(repoName);
final ActionFuture<AcknowledgedResponse> cloneFuture1 = startClone(repoName, sourceSnapshot, targetSnapshot1, indexBlocked);
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
assertThat(cloneFuture1.isDone(), is(false));
final int extraClones = randomIntBetween(1, 5);
final List<ActionFuture<AcknowledgedResponse>> extraCloneFutures = new ArrayList<>(extraClones);
for (int i = 0; i < extraClones; i++) {
extraCloneFutures.add(startClone(repoName, sourceSnapshot, "target-snapshot-" + i, indexBlocked));
}
awaitNumberOfSnapshotsInProgress(1 + extraClones);
for (ActionFuture<AcknowledgedResponse> extraCloneFuture : extraCloneFutures) {
assertFalse(extraCloneFuture.isDone());
}
final int extraSnapshots = randomIntBetween(0, 5);
if (extraSnapshots > 0) {
createIndexWithContent(indexBlocked);
}
final List<ActionFuture<CreateSnapshotResponse>> extraSnapshotFutures = new ArrayList<>(extraSnapshots);
for (int i = 0; i < extraSnapshots; i++) {
extraSnapshotFutures.add(startFullSnapshot(repoName, "extra-snap-" + i));
}
awaitNumberOfSnapshotsInProgress(1 + extraClones + extraSnapshots);
for (ActionFuture<CreateSnapshotResponse> extraSnapshotFuture : extraSnapshotFutures) {
assertFalse(extraSnapshotFuture.isDone());
}
unblockNode(repoName, masterNode);
assertAcked(cloneFuture1.get());
for (ActionFuture<AcknowledgedResponse> extraCloneFuture : extraCloneFutures) {
assertAcked(extraCloneFuture.get());
}
for (ActionFuture<CreateSnapshotResponse> extraSnapshotFuture : extraSnapshotFutures) {
assertSuccessful(extraSnapshotFuture);
}
}
public void testMasterFailoverDuringCloneStep1() throws Exception {
internalCluster().startMasterOnlyNodes(3);
internalCluster().startDataOnlyNode();
final String repoName = "test-repo";
createRepository(repoName, "mock");
final String testIndex = "index-test";
createIndexWithContent(testIndex);
final String sourceSnapshot = "source-snapshot";
createFullSnapshot(repoName, sourceSnapshot);
blockMasterOnReadIndexMeta(repoName);
final ActionFuture<AcknowledgedResponse> cloneFuture =
startCloneFromDataNode(repoName, sourceSnapshot, "target-snapshot", testIndex);
awaitNumberOfSnapshotsInProgress(1);
final String masterNode = internalCluster().getMasterName();
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
internalCluster().restartNode(masterNode);
boolean cloneSucceeded = false;
try {
cloneFuture.actionGet(TimeValue.timeValueSeconds(30L));
cloneSucceeded = true;
} catch (SnapshotException sne) {
// ignored, most of the time we will throw here but we could randomly run into a situation where the data node retries the
// snapshot on disconnect slowly enough for it to work out
}
awaitNoMoreRunningOperations(internalCluster().getMasterName());
assertAllSnapshotsSuccessful(getRepositoryData(repoName), cloneSucceeded ? 2 : 1);
}
public void testFailsOnCloneMissingIndices() {
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();
final String repoName = "repo-name";
final Path repoPath = randomRepoPath();
if (randomBoolean()) {
createIndexWithContent("test-idx");
}
createRepository(repoName, "fs", repoPath);
final String snapshotName = "snapshot";
createFullSnapshot(repoName, snapshotName);
expectThrows(IndexNotFoundException.class,
() -> startClone(repoName, snapshotName, "target-snapshot", "does-not-exist").actionGet());
}
public void testMasterFailoverDuringCloneStep2() throws Exception {
// large snapshot pool so blocked snapshot threads from cloning don't prevent concurrent snapshot finalizations
internalCluster().startMasterOnlyNodes(3, LARGE_SNAPSHOT_POOL_SETTINGS);
internalCluster().startDataOnlyNode();
final String repoName = "test-repo";
createRepository(repoName, "mock");
final String testIndex = "index-test";
createIndexWithContent(testIndex);
final String sourceSnapshot = "source-snapshot";
createFullSnapshot(repoName, sourceSnapshot);
final String targetSnapshot = "target-snapshot";
blockMasterOnShardClone(repoName);
final ActionFuture<AcknowledgedResponse> cloneFuture = startCloneFromDataNode(repoName, sourceSnapshot, targetSnapshot, testIndex);
awaitNumberOfSnapshotsInProgress(1);
final String masterNode = internalCluster().getMasterName();
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
internalCluster().restartNode(masterNode);
expectThrows(SnapshotException.class, cloneFuture::actionGet);
awaitNoMoreRunningOperations(internalCluster().getMasterName());
assertAllSnapshotsSuccessful(getRepositoryData(repoName), 2);
}
public void testExceptionDuringShardClone() throws Exception {
// large snapshot pool so blocked snapshot threads from cloning don't prevent concurrent snapshot finalizations
internalCluster().startMasterOnlyNodes(3, LARGE_SNAPSHOT_POOL_SETTINGS);
internalCluster().startDataOnlyNode();
final String repoName = "test-repo";
createRepository(repoName, "mock");
final String testIndex = "index-test";
createIndexWithContent(testIndex);
final String sourceSnapshot = "source-snapshot";
createFullSnapshot(repoName, sourceSnapshot);
final String targetSnapshot = "target-snapshot";
blockMasterFromFinalizingSnapshotOnSnapFile(repoName);
final ActionFuture<AcknowledgedResponse> cloneFuture = startCloneFromDataNode(repoName, sourceSnapshot, targetSnapshot, testIndex);
awaitNumberOfSnapshotsInProgress(1);
final String masterNode = internalCluster().getMasterName();
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
unblockNode(repoName, masterNode);
expectThrows(SnapshotException.class, cloneFuture::actionGet);
awaitNoMoreRunningOperations(internalCluster().getMasterName());
assertAllSnapshotsSuccessful(getRepositoryData(repoName), 1);
assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get());
}
public void testDoesNotStartOnBrokenSourceSnapshot() throws Exception {
internalCluster().startMasterOnlyNode();
final String dataNode = internalCluster().startDataOnlyNode();
final String repoName = "test-repo";
createRepository(repoName, "mock");
final String testIndex = "index-test";
createIndexWithContent(testIndex);
final String sourceSnapshot = "source-snapshot";
blockDataNode(repoName, dataNode);
final Client masterClient = internalCluster().masterClient();
final ActionFuture<CreateSnapshotResponse> sourceSnapshotFuture = masterClient.admin().cluster()
.prepareCreateSnapshot(repoName, sourceSnapshot).setWaitForCompletion(true).execute();
awaitNumberOfSnapshotsInProgress(1);
waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
internalCluster().restartNode(dataNode);
assertThat(sourceSnapshotFuture.get().getSnapshotInfo().state(), is(SnapshotState.PARTIAL));
final SnapshotException sne = expectThrows(SnapshotException.class, () -> startClone(masterClient, repoName, sourceSnapshot,
"target-snapshot", testIndex).actionGet(TimeValue.timeValueSeconds(30L)));
assertThat(sne.getMessage(), containsString("Can't clone index [" + getRepositoryData(repoName).resolveIndexId(testIndex) +
"] because its snapshot was not successful."));
}
private ActionFuture<AcknowledgedResponse> startCloneFromDataNode(String repoName, String sourceSnapshot, String targetSnapshot,
String... indices) {
return startClone(dataNodeClient(), repoName, sourceSnapshot, targetSnapshot, indices);
}
private ActionFuture<AcknowledgedResponse> startClone(String repoName, String sourceSnapshot, String targetSnapshot,
String... indices) {
return startClone(client(), repoName, sourceSnapshot, targetSnapshot, indices);
}
private static ActionFuture<AcknowledgedResponse> startClone(Client client, String repoName, String sourceSnapshot,
String targetSnapshot, String... indices) {
return client.admin().cluster().prepareCloneSnapshot(repoName, sourceSnapshot, targetSnapshot).setIndices(indices).execute();
}
private void blockMasterOnReadIndexMeta(String repoName) {
((MockRepository)internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName))
.setBlockOnReadIndexMeta();
}
private void blockMasterOnShardClone(String repoName) {
((MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName))
.setBlockOnWriteShardLevelMeta();
}
/**
* Assert that given {@link RepositoryData} contains exactly the given number of snapshots and all of them are successful.
*/
private static void assertAllSnapshotsSuccessful(RepositoryData repositoryData, int successfulSnapshotCount) {
final Collection<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
assertThat(snapshotIds, hasSize(successfulSnapshotCount));
for (SnapshotId snapshotId : snapshotIds) {
assertThat(repositoryData.getSnapshotState(snapshotId), is(SnapshotState.SUCCESS));
}
}
private static BlobStoreIndexShardSnapshots readShardGeneration(BlobStoreRepository repository, RepositoryShardId repositoryShardId,
String generation) {
return PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.supply(f,

View File

@ -64,6 +64,8 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsActi
import org.elasticsearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.clone.TransportCloneSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.create.TransportCreateSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotAction;
@ -267,6 +269,7 @@ import org.elasticsearch.rest.action.admin.cluster.RestAddVotingConfigExclusionA
import org.elasticsearch.rest.action.admin.cluster.RestCancelTasksAction;
import org.elasticsearch.rest.action.admin.cluster.RestCleanupRepositoryAction;
import org.elasticsearch.rest.action.admin.cluster.RestClearVotingConfigExclusionsAction;
import org.elasticsearch.rest.action.admin.cluster.RestCloneSnapshotAction;
import org.elasticsearch.rest.action.admin.cluster.RestClusterAllocationExplainAction;
import org.elasticsearch.rest.action.admin.cluster.RestClusterGetSettingsAction;
import org.elasticsearch.rest.action.admin.cluster.RestClusterHealthAction;
@ -522,6 +525,7 @@ public class ActionModule extends AbstractModule {
actions.register(GetSnapshotsAction.INSTANCE, TransportGetSnapshotsAction.class);
actions.register(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class);
actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
actions.register(CloneSnapshotAction.INSTANCE, TransportCloneSnapshotAction.class);
actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);
@ -665,6 +669,7 @@ public class ActionModule extends AbstractModule {
registerHandler.accept(new RestCleanupRepositoryAction());
registerHandler.accept(new RestGetSnapshotsAction());
registerHandler.accept(new RestCreateSnapshotAction());
registerHandler.accept(new RestCloneSnapshotAction());
registerHandler.accept(new RestRestoreSnapshotAction());
registerHandler.accept(new RestDeleteSnapshotAction());
registerHandler.accept(new RestSnapshotsStatusAction());

View File

@ -0,0 +1,33 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.snapshots.clone;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
public final class CloneSnapshotAction extends ActionType<AcknowledgedResponse> {
public static final CloneSnapshotAction INSTANCE = new CloneSnapshotAction();
public static final String NAME = "cluster:admin/snapshot/clone";
private CloneSnapshotAction() {
super(NAME, AcknowledgedResponse::new);
}
}

View File

@ -0,0 +1,142 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.snapshots.clone;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class CloneSnapshotRequest extends MasterNodeRequest<CloneSnapshotRequest> implements IndicesRequest.Replaceable{
private final String repository;
private final String source;
private final String target;
private String[] indices;
private IndicesOptions indicesOptions = IndicesOptions.strictExpandHidden();
public CloneSnapshotRequest(StreamInput in) throws IOException {
super(in);
repository = in.readString();
source = in.readString();
target = in.readString();
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
}
/**
* Creates a clone snapshot request for cloning the given source snapshot's indices into the given target snapshot on the given
* repository.
*
* @param repository repository that source snapshot belongs to and that the target snapshot will be created in
* @param source source snapshot name
* @param target target snapshot name
* @param indices indices to clone from source to target
*/
public CloneSnapshotRequest(String repository, String source, String target, String[] indices) {
this.repository = repository;
this.source = source;
this.target = target;
this.indices = indices;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(repository);
out.writeString(source);
out.writeString(target);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (source == null) {
validationException = addValidationError("source snapshot name is missing", null);
}
if (target == null) {
validationException = addValidationError("target snapshot name is missing", null);
}
if (repository == null) {
validationException = addValidationError("repository is missing", validationException);
}
if (indices == null) {
validationException = addValidationError("indices is null", validationException);
} else if (indices.length == 0) {
validationException = addValidationError("indices patterns are empty", validationException);
} else {
for (String index : indices) {
if (index == null) {
validationException = addValidationError("index is null", validationException);
break;
}
}
}
return validationException;
}
@Override
public String[] indices() {
return this.indices;
}
@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}
@Override
public CloneSnapshotRequest indices(String... indices) {
this.indices = indices;
return this;
}
/**
* @see CloneSnapshotRequestBuilder#setIndicesOptions
*/
public CloneSnapshotRequest indicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}
public String repository() {
return this.repository;
}
public String target() {
return this.target;
}
public String source() {
return this.source;
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.snapshots.clone;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.Strings;
public class CloneSnapshotRequestBuilder extends MasterNodeOperationRequestBuilder<CloneSnapshotRequest, AcknowledgedResponse,
CloneSnapshotRequestBuilder> {
protected CloneSnapshotRequestBuilder(ElasticsearchClient client, ActionType<AcknowledgedResponse> action,
CloneSnapshotRequest request) {
super(client, action, request);
}
public CloneSnapshotRequestBuilder(ElasticsearchClient client, ActionType<AcknowledgedResponse> action,
String repository, String source, String target) {
this(client, action, new CloneSnapshotRequest(repository, source, target, Strings.EMPTY_ARRAY));
}
/**
* Sets a list of indices that should be cloned from the source to the target snapshot
* <p>
* The list of indices supports multi-index syntax. For example: "+test*" ,"-test42" will clone all indices with
* prefix "test" except index "test42".
*
* @return this builder
*/
public CloneSnapshotRequestBuilder setIndices(String... indices) {
request.indices(indices);
return this;
}
/**
* Specifies the indices options. Like what type of requested indices to ignore. For example indices that don't exist.
*
* @param indicesOptions the desired behaviour regarding indices options
* @return this request
*/
public CloneSnapshotRequestBuilder setIndicesOptions(IndicesOptions indicesOptions) {
request.indicesOptions(indicesOptions);
return this;
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.snapshots.clone;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
/**
* Transport action for the clone snapshot operation.
*/
public final class TransportCloneSnapshotAction extends TransportMasterNodeAction<CloneSnapshotRequest, AcknowledgedResponse> {
private final SnapshotsService snapshotsService;
@Inject
public TransportCloneSnapshotAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, SnapshotsService snapshotsService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(CloneSnapshotAction.NAME, transportService, clusterService, threadPool, actionFilters, CloneSnapshotRequest::new,
indexNameExpressionResolver);
this.snapshotsService = snapshotsService;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected AcknowledgedResponse read(StreamInput in) throws IOException {
return new AcknowledgedResponse(in);
}
@Override
protected void masterOperation(CloneSnapshotRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
snapshotsService.cloneSnapshot(request, ActionListener.map(listener, v -> new AcknowledgedResponse(true)));
}
@Override
protected ClusterBlockException checkBlock(CloneSnapshotRequest request, ClusterState state) {
// Cluster is not affected but we look up repositories in metadata
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
}

View File

@ -71,6 +71,8 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResp
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequestBuilder;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequestBuilder;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequestBuilder;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
@ -506,7 +508,22 @@ public interface ClusterAdminClient extends ElasticsearchClient {
CreateSnapshotRequestBuilder prepareCreateSnapshot(String repository, String name);
/**
* Get snapshot.
* Clones a snapshot.
*/
CloneSnapshotRequestBuilder prepareCloneSnapshot(String repository, String source, String target);
/**
* Clones a snapshot.
*/
ActionFuture<AcknowledgedResponse> cloneSnapshot(CloneSnapshotRequest request);
/**
* Clones a snapshot.
*/
void cloneSnapshot(CloneSnapshotRequest request, ActionListener<AcknowledgedResponse> listener);
/**
* Get snapshots.
*/
ActionFuture<GetSnapshotsResponse> getSnapshots(GetSnapshotsRequest request);

View File

@ -94,6 +94,9 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequestBuilder;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequestBuilder;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequestBuilder;
@ -959,6 +962,21 @@ public abstract class AbstractClient implements Client {
return new CreateSnapshotRequestBuilder(this, CreateSnapshotAction.INSTANCE, repository, name);
}
@Override
public CloneSnapshotRequestBuilder prepareCloneSnapshot(String repository, String source, String target) {
return new CloneSnapshotRequestBuilder(this, CloneSnapshotAction.INSTANCE, repository, source, target);
}
@Override
public ActionFuture<AcknowledgedResponse> cloneSnapshot(CloneSnapshotRequest request) {
return execute(CloneSnapshotAction.INSTANCE, request);
}
@Override
public void cloneSnapshot(CloneSnapshotRequest request, ActionListener<AcknowledgedResponse> listener) {
execute(CloneSnapshotAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<GetSnapshotsResponse> getSnapshots(GetSnapshotsRequest request) {
return execute(GetSnapshotsAction.INSTANCE, request);

View File

@ -35,8 +35,10 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoryShardId;
import org.elasticsearch.repositories.RepositoryOperation;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotsService;
import java.io.IOException;
@ -101,11 +103,32 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
indices, dataStreams, startTime, repositoryStateId, shards, null, userMetadata, version);
}
/**
* Creates the initial snapshot clone entry
*
* @param snapshot snapshot to clone into
* @param source snapshot to clone from
* @param indices indices to clone
* @param startTime start time
* @param repositoryStateId repository state id that this clone is based on
* @param version repository metadata version to write
* @return snapshot clone entry
*/
public static Entry startClone(Snapshot snapshot, SnapshotId source, List<IndexId> indices, long startTime,
long repositoryStateId, Version version) {
return new SnapshotsInProgress.Entry(snapshot, true, false, State.STARTED, indices, Collections.emptyList(),
startTime, repositoryStateId, ImmutableOpenMap.of(), null, Collections.emptyMap(), version, source,
ImmutableOpenMap.of());
}
public static class Entry implements Writeable, ToXContent, RepositoryOperation {
private final State state;
private final Snapshot snapshot;
private final boolean includeGlobalState;
private final boolean partial;
/**
* Map of {@link ShardId} to {@link ShardSnapshotStatus} tracking the state of each shard snapshot operation.
*/
private final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
private final List<IndexId> indices;
private final List<String> dataStreams;
@ -113,6 +136,19 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
private final long repositoryStateId;
// see #useShardGenerations
private final Version version;
/**
* Source snapshot if this is a clone operation or {@code null} if this is a snapshot.
*/
@Nullable
private final SnapshotId source;
/**
* Map of {@link RepositoryShardId} to {@link ShardSnapshotStatus} tracking the state of each shard clone operation in this entry
* the same way {@link #shards} tracks the status of each shard snapshot operation in non-clone entries.
*/
private final ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones;
@Nullable private final Map<String, Object> userMetadata;
@Nullable private final String failure;
@ -121,6 +157,15 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
List<String> dataStreams, long startTime, long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure, Map<String, Object> userMetadata,
Version version) {
this(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards, failure,
userMetadata, version, null, ImmutableOpenMap.of());
}
private Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
List<String> dataStreams, long startTime, long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure, Map<String, Object> userMetadata,
Version version, @Nullable SnapshotId source,
@Nullable ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones) {
this.state = state;
this.snapshot = snapshot;
this.includeGlobalState = includeGlobalState;
@ -129,11 +174,18 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
this.dataStreams = dataStreams;
this.startTime = startTime;
this.shards = shards;
assert assertShardsConsistent(state, indices, shards);
this.repositoryStateId = repositoryStateId;
this.failure = failure;
this.userMetadata = userMetadata;
this.version = version;
this.source = source;
if (source == null) {
assert clones == null || clones.isEmpty() : "Provided [" + clones + "] but no source";
this.clones = ImmutableOpenMap.of();
} else {
this.clones = clones;
}
assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones);
}
private Entry(StreamInput in) throws IOException {
@ -143,7 +195,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
state = State.fromValue(in.readByte());
indices = in.readList(IndexId::new);
startTime = in.readLong();
shards = in.readImmutableMap(ShardId::new, ShardSnapshotStatus::new);
shards = in.readImmutableMap(ShardId::new, ShardSnapshotStatus::readFrom);
repositoryStateId = in.readLong();
failure = in.readOptionalString();
if (in.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
@ -166,21 +218,41 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
} else {
dataStreams = Collections.emptyList();
}
if (in.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) {
source = in.readOptionalWriteable(SnapshotId::new);
clones = in.readImmutableMap(RepositoryShardId::new, ShardSnapshotStatus::readFrom);
} else {
source = null;
clones = ImmutableOpenMap.of();
}
}
private static boolean assertShardsConsistent(State state, List<IndexId> indices,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
private static boolean assertShardsConsistent(SnapshotId source, State state, List<IndexId> indices,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones) {
if ((state == State.INIT || state == State.ABORTED) && shards.isEmpty()) {
return true;
}
final Set<String> indexNames = indices.stream().map(IndexId::getName).collect(Collectors.toSet());
final Set<String> indexNamesInShards = new HashSet<>();
shards.keysIt().forEachRemaining(s -> indexNamesInShards.add(s.getIndexName()));
assert indexNames.equals(indexNamesInShards)
shards.iterator().forEachRemaining(s -> {
indexNamesInShards.add(s.key.getIndexName());
assert source == null || s.value.nodeId == null :
"Shard snapshot must not be assigned to data node when copying from snapshot [" + source + "]";
});
assert source == null || indexNames.isEmpty() == false : "No empty snapshot clones allowed";
assert source != null || indexNames.equals(indexNamesInShards)
: "Indices in shards " + indexNamesInShards + " differ from expected indices " + indexNames + " for state [" + state + "]";
final boolean shardsCompleted = completed(shards.values());
assert (state.completed() && shardsCompleted) || (state.completed() == false && shardsCompleted == false)
: "Completed state must imply all shards completed but saw state [" + state + "] and shards " + shards;
final boolean shardsCompleted = completed(shards.values()) && completed(clones.values());
// Check state consistency for normal snapshots and started clone operations
if (source == null || clones.isEmpty() == false) {
assert (state.completed() && shardsCompleted) || (state.completed() == false && shardsCompleted == false)
: "Completed state must imply all shards completed but saw state [" + state + "] and shards " + shards;
}
if (source != null && state.completed()) {
assert hasFailures(clones) == false || state == State.FAILED
: "Failed shard clones in [" + clones + "] but state was [" + state + "]";
}
return true;
}
@ -201,7 +273,17 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
assert newRepoGen > repositoryStateId : "Updated repository generation [" + newRepoGen
+ "] must be higher than current generation [" + repositoryStateId + "]";
return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, newRepoGen, shards, failure,
userMetadata, version);
userMetadata, version, source, clones);
}
public Entry withClones(ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> updatedClones) {
if (updatedClones.equals(clones)) {
return this;
}
return new Entry(snapshot, includeGlobalState, partial,
completed(updatedClones.values()) ? (hasFailures(updatedClones) ? State.FAILED : State.SUCCESS) :
state, indices, dataStreams, startTime, repositoryStateId, shards, failure, userMetadata, version, source,
updatedClones);
}
/**
@ -230,7 +312,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
public Entry fail(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, State state, String failure) {
return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards,
failure, userMetadata, version);
failure, userMetadata, version, source, clones);
}
/**
@ -318,6 +400,19 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
return version;
}
@Nullable
public SnapshotId source() {
return source;
}
public boolean isClone() {
return source != null;
}
public ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones() {
return clones;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -334,6 +429,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
if (state != entry.state) return false;
if (repositoryStateId != entry.repositoryStateId) return false;
if (version.equals(entry.version) == false) return false;
if (Objects.equals(source, ((Entry) o).source) == false) return false;
if (clones.equals(((Entry) o).clones) == false) return false;
return true;
}
@ -349,6 +446,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
result = 31 * result + Long.hashCode(startTime);
result = 31 * result + Long.hashCode(repositoryStateId);
result = 31 * result + version.hashCode();
result = 31 * result + (source == null ? 0 : source.hashCode());
result = 31 * result + clones.hashCode();
return result;
}
@ -418,6 +517,10 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
if (out.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) {
out.writeStringCollection(dataStreams);
}
if (out.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) {
out.writeOptionalWriteable(source);
out.writeMap(clones);
}
}
@Override
@ -441,6 +544,15 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
return true;
}
private static boolean hasFailures(ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones) {
for (ObjectCursor<ShardSnapshotStatus> value : clones.values()) {
if (value.value.state().failed()) {
return true;
}
}
return false;
}
public static class ShardSnapshotStatus implements Writeable {
/**
@ -490,15 +602,20 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
return true;
}
public ShardSnapshotStatus(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = ShardState.fromValue(in.readByte());
public static ShardSnapshotStatus readFrom(StreamInput in) throws IOException {
String nodeId = in.readOptionalString();
final ShardState state = ShardState.fromValue(in.readByte());
final String generation;
if (SnapshotsService.useShardGenerations(in.getVersion())) {
generation = in.readOptionalString();
} else {
generation = null;
}
reason = in.readOptionalString();
final String reason = in.readOptionalString();
if (state == ShardState.QUEUED) {
return UNASSIGNED_QUEUED;
}
return new ShardSnapshotStatus(nodeId, state, reason, generation);
}
public ShardState state() {

View File

@ -0,0 +1,62 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.cluster;
import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.rest.RestRequest.Method.PUT;
/**
* Clones indices from one snapshot into another snapshot in the same repository
*/
public class RestCloneSnapshotAction extends BaseRestHandler {
@Override
public List<Route> routes() {
return Collections.singletonList(new Route(PUT, "/_snapshot/{repository}/{snapshot}/_clone/{target_snapshot}"));
}
@Override
public String getName() {
return "clone_snapshot_action";
}
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
final Map<String, Object> source = request.contentParser().map();
final CloneSnapshotRequest cloneSnapshotRequest = new CloneSnapshotRequest(
request.param("repository"), request.param("snapshot"), request.param("target_snapshot"),
XContentMapValues.nodeStringArrayValue(source.getOrDefault("indices", Collections.emptyList())));
cloneSnapshotRequest.masterNodeTimeout(request.paramAsTime("master_timeout", cloneSnapshotRequest.masterNodeTimeout()));
cloneSnapshotRequest.indicesOptions(IndicesOptions.fromMap(source, cloneSnapshotRequest.indicesOptions()));
return channel -> client.admin().cluster().cloneSnapshot(cloneSnapshotRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -924,7 +924,7 @@ public class RestoreService implements ClusterStateApplier {
}
}
private static boolean failed(SnapshotInfo snapshot, String index) {
public static boolean failed(SnapshotInfo snapshot, String index) {
for (SnapshotShardFailure failure : snapshot.shardFailures()) {
if (index.equals(failure.index())) {
return true;

View File

@ -197,6 +197,10 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
final String localNodeId = clusterService.localNode().getId();
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
final State entryState = entry.state();
if (entry.isClone()) {
// This is a snapshot clone, it will be executed on the current master
continue;
}
if (entryState == State.STARTED) {
Map<ShardId, IndexShardSnapshotStatus> startedShards = null;
final Snapshot snapshot = entry.snapshot();

View File

@ -28,9 +28,12 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
@ -44,6 +47,7 @@ import org.elasticsearch.cluster.RepositoryCleanupInProgress;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.repositories.RepositoryShardId;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
@ -105,6 +109,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -128,6 +134,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
public static final Version FULL_CONCURRENCY_VERSION = Version.V_7_9_0;
public static final Version CLONE_SNAPSHOT_VERSION = Version.V_7_10_0;
public static final Version SHARD_GEN_IN_REPO_DATA_VERSION = Version.V_7_6_0;
public static final Version INDEX_GEN_IN_REPO_DATA_VERSION = Version.V_7_9_0;
@ -166,6 +174,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
// Set of snapshots that are currently being ended by this node
private final Set<Snapshot> endingSnapshots = Collections.synchronizedSet(new HashSet<>());
// Set of currently initializing clone operations
private final Set<Snapshot> initializingClones = Collections.synchronizedSet(new HashSet<>());
private final UpdateSnapshotStatusAction updateSnapshotStatusHandler;
private final TransportService transportService;
@ -363,20 +374,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
@Override
public ClusterState execute(ClusterState currentState) {
// check if the snapshot name already exists in the repository
if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
throw new InvalidSnapshotNameException(
repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
}
ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
final List<SnapshotsInProgress.Entry> runningSnapshots = snapshots.entries();
if (runningSnapshots.stream().anyMatch(s -> {
final Snapshot running = s.snapshot();
return running.getRepository().equals(repositoryName) && running.getSnapshotId().getName().equals(snapshotName);
})) {
throw new InvalidSnapshotNameException(
repository.getMetadata().name(), snapshotName, "snapshot with the same name is already in-progress");
}
ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName);
validate(repositoryName, snapshotName, currentState);
final boolean concurrentOperationsAllowed = currentState.nodes().getMinNodeVersion().onOrAfter(FULL_CONCURRENCY_VERSION);
final SnapshotDeletionsInProgress deletionsInProgress =
@ -397,6 +398,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
if (concurrentOperationsAllowed == false && runningSnapshots.stream().anyMatch(entry -> entry.state() != State.INIT)) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
}
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress);
// Store newSnapshot here to be processed in clusterStateProcessed
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request));
@ -407,9 +409,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
final List<IndexId> indexIds = repositoryData.resolveNewIndices(
indices, runningSnapshots.stream().filter(entry -> entry.repository().equals(repositoryName))
.flatMap(entry -> entry.indices().stream()).distinct()
.collect(Collectors.toMap(IndexId::getName, Function.identity())));
indices, getInFlightIndexIds(runningSnapshots, repositoryName));
final Version version = minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null);
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = shards(snapshots, deletionsInProgress, currentState.metadata(),
currentState.routingTable(), indexIds, useShardGenerations(version), repositoryData, repositoryName);
@ -459,6 +459,278 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
}, "create_snapshot [" + snapshotName + ']', listener::onFailure);
}
private static void ensureSnapshotNameNotRunning(List<SnapshotsInProgress.Entry> runningSnapshots, String repositoryName,
String snapshotName) {
if (runningSnapshots.stream().anyMatch(s -> {
final Snapshot running = s.snapshot();
return running.getRepository().equals(repositoryName) && running.getSnapshotId().getName().equals(snapshotName);
})) {
throw new InvalidSnapshotNameException(repositoryName, snapshotName, "snapshot with the same name is already in-progress");
}
}
private static Map<String, IndexId> getInFlightIndexIds(List<SnapshotsInProgress.Entry> runningSnapshots, String repositoryName) {
return runningSnapshots.stream().filter(entry -> entry.repository().equals(repositoryName))
.flatMap(entry -> entry.indices().stream()).distinct()
.collect(Collectors.toMap(IndexId::getName, Function.identity()));
}
// TODO: It is worth revisiting the design choice of creating a placeholder entry in snapshots-in-progress here once we have a cache
// for repository metadata and loading it has predictable performance
public void cloneSnapshot(CloneSnapshotRequest request, ActionListener<Void> listener) {
final String repositoryName = request.repository();
Repository repository = repositoriesService.repository(repositoryName);
if (repository.isReadOnly()) {
listener.onFailure(new RepositoryException(repositoryName, "cannot create snapshot in a readonly repository"));
return;
}
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.target());
validate(repositoryName, snapshotName);
final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID());
final Snapshot snapshot = new Snapshot(repositoryName, snapshotId);
initializingClones.add(snapshot);
repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask() {
private SnapshotsInProgress.Entry newEntry;
@Override
public ClusterState execute(ClusterState currentState) {
ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
final List<SnapshotsInProgress.Entry> runningSnapshots = snapshots.entries();
ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName);
validate(repositoryName, snapshotName, currentState);
final SnapshotId sourceSnapshotId = repositoryData.getSnapshotIds()
.stream()
.filter(src -> src.getName().equals(request.source()))
.findAny()
.orElseThrow(() -> new SnapshotMissingException(repositoryName, request.source()));
final SnapshotDeletionsInProgress deletionsInProgress =
currentState.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY);
if (deletionsInProgress.getEntries().stream().anyMatch(entry -> entry.getSnapshots().contains(sourceSnapshotId))) {
throw new ConcurrentSnapshotExecutionException(repositoryName, sourceSnapshotId.getName(),
"cannot clone from snapshot that is being deleted");
}
ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress);
final List<String> indicesForSnapshot = new ArrayList<>();
for (IndexId indexId : repositoryData.getIndices().values()) {
if (repositoryData.getSnapshots(indexId).contains(sourceSnapshotId)) {
indicesForSnapshot.add(indexId.getName());
}
}
final List<String> matchingIndices =
SnapshotUtils.filterIndices(indicesForSnapshot, request.indices(), request.indicesOptions());
if (matchingIndices.isEmpty()) {
throw new SnapshotException(new Snapshot(repositoryName, sourceSnapshotId),
"No indices in the source snapshot [" + sourceSnapshotId + "] matched requested pattern ["
+ Strings.arrayToCommaDelimitedString(request.indices()) + "]");
}
newEntry = SnapshotsInProgress.startClone(
snapshot, sourceSnapshotId,
repositoryData.resolveIndices(matchingIndices),
threadPool.absoluteTimeInMillis(), repositoryData.getGenId(),
minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null));
final List<SnapshotsInProgress.Entry> newEntries = new ArrayList<>(runningSnapshots);
newEntries.add(newEntry);
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
SnapshotsInProgress.of(newEntries)).build();
}
@Override
public void onFailure(String source, Exception e) {
initializingClones.remove(snapshot);
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to clone snapshot", repositoryName, snapshotName), e);
listener.onFailure(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
logger.info("snapshot clone [{}] started", snapshot);
addListener(snapshot, ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure));
startCloning(repository, newEntry);
}
@Override
public TimeValue timeout() {
initializingClones.remove(snapshot);
return request.masterNodeTimeout();
}
}, "clone_snapshot [" + request.source() + "][" + snapshotName + ']', listener::onFailure);
}
private static void ensureNoCleanupInProgress(ClusterState currentState, String repositoryName, String snapshotName) {
final RepositoryCleanupInProgress repositoryCleanupInProgress =
currentState.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY);
if (repositoryCleanupInProgress.hasCleanupInProgress()) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
}
}
private static void ensureSnapshotNameAvailableInRepo(RepositoryData repositoryData, String snapshotName, Repository repository) {
// check if the snapshot name already exists in the repository
if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
throw new InvalidSnapshotNameException(
repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
}
}
/**
* Determine the number of shards in each index of a clone operation and update the cluster state accordingly.
*
* @param repository repository to run operation on
* @param cloneEntry clone operation in the cluster state
*/
private void startCloning(Repository repository, SnapshotsInProgress.Entry cloneEntry) {
final List<IndexId> indices = cloneEntry.indices();
final SnapshotId sourceSnapshot = cloneEntry.source();
final Snapshot targetSnapshot = cloneEntry.snapshot();
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
// Exception handler for IO exceptions with loading index and repo metadata
final Consumer<Exception> onFailure = e -> {
initializingClones.remove(targetSnapshot);
logger.info(() -> new ParameterizedMessage("Failed to start snapshot clone [{}]", cloneEntry), e);
removeFailedSnapshotFromClusterState(targetSnapshot, e, null, null);
};
// 1. step, load SnapshotInfo to make sure that source snapshot was successful for the indices we want to clone
// TODO: we could skip this step for snapshots with state SUCCESS
final StepListener<SnapshotInfo> snapshotInfoListener = new StepListener<>();
executor.execute(ActionRunnable.supply(snapshotInfoListener, () -> repository.getSnapshotInfo(sourceSnapshot)));
final StepListener<Collection<Tuple<IndexId, Integer>>> allShardCountsListener = new StepListener<>();
final GroupedActionListener<Tuple<IndexId, Integer>> shardCountListener =
new GroupedActionListener<>(allShardCountsListener, indices.size());
snapshotInfoListener.whenComplete(snapshotInfo -> {
for (IndexId indexId : indices) {
if (RestoreService.failed(snapshotInfo, indexId.getName())) {
throw new SnapshotException(targetSnapshot, "Can't clone index [" + indexId +
"] because its snapshot was not successful.");
}
}
// 2. step, load the number of shards we have in each index to be cloned from the index metadata.
repository.getRepositoryData(ActionListener.wrap(repositoryData -> {
for (IndexId index : indices) {
executor.execute(ActionRunnable.supply(shardCountListener, () -> {
final IndexMetadata metadata = repository.getSnapshotIndexMetaData(repositoryData, sourceSnapshot, index);
return Tuple.tuple(index, metadata.getNumberOfShards());
}));
}
}, onFailure));
}, onFailure);
// 3. step, we have all the shard counts, now update the cluster state to have clone jobs in the snap entry
allShardCountsListener.whenComplete(counts -> repository.executeConsistentStateUpdate(repoData -> new ClusterStateUpdateTask() {
private SnapshotsInProgress.Entry updatedEntry;
@Override
public ClusterState execute(ClusterState currentState) {
final SnapshotsInProgress snapshotsInProgress =
currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
final List<SnapshotsInProgress.Entry> updatedEntries = new ArrayList<>(snapshotsInProgress.entries());
boolean changed = false;
final String localNodeId = currentState.nodes().getLocalNodeId();
final String repoName = cloneEntry.repository();
final Map<String, IndexId> indexIds = getInFlightIndexIds(updatedEntries, repoName);
final ShardGenerations shardGenerations = repoData.shardGenerations();
for (int i = 0; i < updatedEntries.size(); i++) {
if (cloneEntry.equals(updatedEntries.get(i))) {
final ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> clonesBuilder =
ImmutableOpenMap.builder();
// TODO: could be optimized by just dealing with repo shard id directly
final Set<RepositoryShardId> busyShardsInRepo =
busyShardsForRepo(repoName, snapshotsInProgress, currentState.metadata())
.stream()
.map(shardId -> new RepositoryShardId(indexIds.get(shardId.getIndexName()), shardId.getId()))
.collect(Collectors.toSet());
for (Tuple<IndexId, Integer> count : counts) {
for (int shardId = 0; shardId < count.v2(); shardId++) {
final RepositoryShardId repoShardId = new RepositoryShardId(count.v1(), shardId);
if (busyShardsInRepo.contains(repoShardId)) {
clonesBuilder.put(repoShardId, ShardSnapshotStatus.UNASSIGNED_QUEUED);
} else {
clonesBuilder.put(repoShardId,
new ShardSnapshotStatus(localNodeId, shardGenerations.getShardGen(count.v1(), shardId)));
}
}
}
updatedEntry = cloneEntry.withClones(clonesBuilder.build());
updatedEntries.set(i, updatedEntry);
changed = true;
break;
}
}
return updateWithSnapshots(currentState, changed ? SnapshotsInProgress.of(updatedEntries) : null, null);
}
@Override
public void onFailure(String source, Exception e) {
initializingClones.remove(targetSnapshot);
logger.info(() -> new ParameterizedMessage("Failed to start snapshot clone [{}]", cloneEntry), e);
failAllListenersOnMasterFailOver(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
initializingClones.remove(targetSnapshot);
if (updatedEntry != null) {
final Snapshot target = updatedEntry.snapshot();
final SnapshotId sourceSnapshot = updatedEntry.source();
for (ObjectObjectCursor<RepositoryShardId, ShardSnapshotStatus> indexClone : updatedEntry.clones()) {
final ShardSnapshotStatus shardStatusBefore = indexClone.value;
if (shardStatusBefore.state() != ShardState.INIT) {
continue;
}
final RepositoryShardId repoShardId = indexClone.key;
runReadyClone(target, sourceSnapshot, shardStatusBefore, repoShardId, repository);
}
} else {
// Extremely unlikely corner case of master failing over between between starting the clone and
// starting shard clones.
logger.warn("Did not find expected entry [{}] in the cluster state", cloneEntry);
}
}
}, "start snapshot clone", onFailure), onFailure);
}
private final Set<RepositoryShardId> currentlyCloning = Collections.synchronizedSet(new HashSet<>());
private void runReadyClone(Snapshot target, SnapshotId sourceSnapshot, ShardSnapshotStatus shardStatusBefore,
RepositoryShardId repoShardId, Repository repository) {
final SnapshotId targetSnapshot = target.getSnapshotId();
final String localNodeId = clusterService.localNode().getId();
if (currentlyCloning.add(repoShardId)) {
repository.cloneShardSnapshot(sourceSnapshot, targetSnapshot, repoShardId, shardStatusBefore.generation(), ActionListener.wrap(
generation -> innerUpdateSnapshotState(
new ShardSnapshotUpdate(target, repoShardId,
new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation)),
ActionListener.runBefore(
ActionListener.wrap(
v -> logger.trace("Marked [{}] as successfully cloned from [{}] to [{}]", repoShardId,
sourceSnapshot, targetSnapshot),
e -> {
logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId);
failAllListenersOnMasterFailOver(e);
}
), () -> currentlyCloning.remove(repoShardId))
), e -> innerUpdateSnapshotState(
new ShardSnapshotUpdate(target, repoShardId,
new ShardSnapshotStatus(localNodeId, ShardState.FAILED, "failed to clone shard snapshot", null)),
ActionListener.runBefore(ActionListener.wrap(
v -> logger.trace("Marked [{}] as failed clone from [{}] to [{}]", repoShardId,
sourceSnapshot, targetSnapshot),
ex -> {
logger.warn("Cluster state update after failed shard clone [{}] failed", repoShardId);
failAllListenersOnMasterFailOver(ex);
}
), () -> currentlyCloning.remove(repoShardId)))));
}
}
private void ensureBelowConcurrencyLimit(String repository, String name, SnapshotsInProgress snapshotsInProgress,
SnapshotDeletionsInProgress deletionsInProgress) {
final int inProgressOperations = snapshotsInProgress.entries().size() + deletionsInProgress.getEntries().size();
@ -695,17 +967,24 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
ShardGenerations.Builder builder = ShardGenerations.builder();
final Map<String, IndexId> indexLookup = new HashMap<>();
snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx));
snapshot.shards().forEach(c -> {
if (metadata.index(c.key.getIndex()) == null) {
assert snapshot.partial() :
"Index [" + c.key.getIndex() + "] was deleted during a snapshot but snapshot was not partial.";
return;
}
final IndexId indexId = indexLookup.get(c.key.getIndexName());
if (indexId != null) {
builder.put(indexId, c.key.id(), c.value.generation());
}
});
if (snapshot.isClone()) {
snapshot.clones().forEach(c -> {
final IndexId indexId = indexLookup.get(c.key.indexName());
builder.put(indexId, c.key.shardId(), c.value.generation());
});
} else {
snapshot.shards().forEach(c -> {
if (metadata.index(c.key.getIndex()) == null) {
assert snapshot.partial() :
"Index [" + c.key.getIndex() + "] was deleted during a snapshot but snapshot was not partial.";
return;
}
final IndexId indexId = indexLookup.get(c.key.getIndexName());
if (indexId != null) {
builder.put(indexId, c.key.id(), c.value.generation());
}
});
}
return builder.build();
}
@ -934,17 +1213,27 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
if (statesToUpdate.contains(snapshot.state())) {
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = processWaitingShardsAndRemovedNodes(snapshot.shards(),
routingTable, nodes, knownFailures.computeIfAbsent(snapshot.repository(), k -> new HashMap<>()));
if (shards != null) {
final SnapshotsInProgress.Entry updatedSnapshot = snapshot.withShardStates(shards);
changed = true;
if (updatedSnapshot.state().completed()) {
finishedSnapshots.add(updatedSnapshot);
// Currently initializing clone
if (snapshot.isClone() && snapshot.clones().isEmpty()) {
if (initializingClones.contains(snapshot.snapshot())) {
updatedSnapshotEntries.add(snapshot);
} else {
logger.debug("removing not yet start clone operation [{}]", snapshot);
changed = true;
}
updatedSnapshotEntries.add(updatedSnapshot);
} else {
updatedSnapshotEntries.add(snapshot);
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = processWaitingShardsAndRemovedNodes(snapshot.shards(),
routingTable, nodes, knownFailures.computeIfAbsent(snapshot.repository(), k -> new HashMap<>()));
if (shards != null) {
final SnapshotsInProgress.Entry updatedSnapshot = snapshot.withShardStates(shards);
changed = true;
if (updatedSnapshot.state().completed()) {
finishedSnapshots.add(updatedSnapshot);
}
updatedSnapshotEntries.add(updatedSnapshot);
} else {
updatedSnapshotEntries.add(snapshot);
}
}
} else if (snapshot.repositoryStateId() == RepositoryData.UNKNOWN_REPO_GEN) {
// BwC path, older versions could create entries with unknown repo GEN in INIT or ABORTED state that did not yet
@ -996,6 +1285,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
}
}
}
startExecutableClones(newState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY), null);
// run newly ready deletes
for (SnapshotDeletionsInProgress.Entry entry : deletionsToExecute) {
if (tryEnterRepoLoop(entry.repository())) {
@ -1165,6 +1455,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
entry.snapshot(), new SnapshotException(snapshot, "Aborted on initialization"), repositoryData, null);
return;
}
if (entry.isClone() && entry.state() == State.FAILED) {
logger.debug("Removing failed snapshot clone [{}] from cluster state", entry);
removeFailedSnapshotFromClusterState(entry.snapshot(), new SnapshotException(entry.snapshot(), entry.failure()), null, null);
return;
}
final String repoName = entry.repository();
if (tryEnterRepoLoop(repoName)) {
if (repositoryData == null) {
@ -1238,10 +1533,24 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
entry.startTime(), failure, threadPool.absoluteTimeInMillis(),
entry.partial() ? shardGenerations.totalShards() : entry.shards().size(), shardFailures,
entry.includeGlobalState(), entry.userMetadata());
repositoriesService.repository(snapshot.getRepository()).finalizeSnapshot(
final StepListener<Metadata> metadataListener = new StepListener<>();
final Repository repo = repositoriesService.repository(snapshot.getRepository());
if (entry.isClone()) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(
ActionRunnable.supply(metadataListener, () -> {
final Metadata.Builder metaBuilder = Metadata.builder(repo.getSnapshotGlobalMetadata(entry.source()));
for (IndexId index : entry.indices()) {
metaBuilder.put(repo.getSnapshotIndexMetaData(repositoryData, entry.source(), index), false);
}
return metaBuilder.build();
}));
} else {
metadataListener.onResponse(metadata);
}
metadataListener.whenComplete(meta -> repo.finalizeSnapshot(
shardGenerations,
repositoryData.getGenId(),
metadataForSnapshot(entry, metadata),
repositoryData.getGenId(),
metadataForSnapshot(entry, meta),
snapshotInfo,
entry.version(),
state -> stateWithoutSnapshot(state, snapshot),
@ -1251,7 +1560,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
snapshotCompletionListeners.remove(snapshot), Tuple.tuple(newRepoData, snapshotInfo));
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
runNextQueuedOperation(newRepoData, repository, true);
}, e -> handleFinalizationFailure(e, entry, repositoryData)));
}, e -> handleFinalizationFailure(e, entry, repositoryData))),
e -> handleFinalizationFailure(e, entry, repositoryData));
} catch (Exception e) {
assert false : new AssertionError(e);
handleFinalizationFailure(e, entry, repositoryData);
@ -1428,8 +1738,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
private void removeFailedSnapshotFromClusterState(Snapshot snapshot, Exception failure, @Nullable RepositoryData repositoryData,
@Nullable CleanupAfterErrorListener listener) {
assert failure != null : "Failure must be supplied";
assert (listener == null || repositoryData == null) && (listener == null && repositoryData == null) == false :
"Either repository data or a listener but not both must be null but saw [" + listener + "] and [" + repositoryData + "]";
clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() {
@Override
@ -1466,7 +1774,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
failSnapshotCompletionListeners(snapshot, failure);
if (listener == null) {
runNextQueuedOperation(repositoryData, snapshot.getRepository(), true);
if (repositoryData != null) {
runNextQueuedOperation(repositoryData, snapshot.getRepository(), true);
}
} else {
listener.onFailure(null);
}
@ -1519,11 +1829,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionListener<Void> listener) {
final String[] snapshotNames = request.snapshots();
final String repositoryName = request.repository();
final String repoName = request.repository();
logger.info(() -> new ParameterizedMessage("deleting snapshots [{}] from repository [{}]",
Strings.arrayToCommaDelimitedString(snapshotNames), repositoryName));
Strings.arrayToCommaDelimitedString(snapshotNames), repoName));
final Repository repository = repositoriesService.repository(repositoryName);
final Repository repository = repositoriesService.repository(repoName);
repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(Priority.NORMAL) {
private Snapshot runningSnapshot;
@ -1543,12 +1853,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
+ "]");
}
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
final List<SnapshotsInProgress.Entry> snapshotEntries = findInProgressSnapshots(snapshots, snapshotNames, repositoryName);
final List<SnapshotsInProgress.Entry> snapshotEntries = findInProgressSnapshots(snapshots, snapshotNames, repoName);
final List<SnapshotId> snapshotIds = matchingSnapshotIds(
snapshotEntries.stream().map(e -> e.snapshot().getSnapshotId()).collect(Collectors.toList()), repositoryData,
snapshotNames, repositoryName);
snapshotNames, repoName);
if (snapshotEntries.isEmpty() || minNodeVersion.onOrAfter(SnapshotsService.FULL_CONCURRENCY_VERSION)) {
deleteFromRepoTask = createDeleteStateUpdate(snapshotIds, repositoryName, repositoryData, Priority.NORMAL, listener);
deleteFromRepoTask = createDeleteStateUpdate(snapshotIds, repoName, repositoryData, Priority.NORMAL, listener);
return deleteFromRepoTask.execute(currentState);
}
assert snapshotEntries.size() == 1 : "Expected just a single running snapshot but saw " + snapshotEntries;
@ -1629,7 +1939,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
listener.onResponse(null);
} else {
clusterService.submitStateUpdateTask("delete snapshot",
createDeleteStateUpdate(outstandingDeletes, repositoryName, repositoryData, Priority.IMMEDIATE, listener));
createDeleteStateUpdate(outstandingDeletes, repoName, repositoryData, Priority.IMMEDIATE, listener));
}
return;
}
@ -1638,7 +1948,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
result -> {
logger.debug("deleted snapshot completed - deleting files");
clusterService.submitStateUpdateTask("delete snapshot",
createDeleteStateUpdate(outstandingDeletes, repositoryName, result.v1(), Priority.IMMEDIATE, listener));
createDeleteStateUpdate(outstandingDeletes, repoName, result.v1(), Priority.IMMEDIATE, listener));
},
e -> {
if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) {
@ -1758,6 +2068,17 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
}
}
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
final Set<SnapshotId> activeCloneSources = snapshots.entries()
.stream()
.filter(SnapshotsInProgress.Entry::isClone)
.map(SnapshotsInProgress.Entry::source)
.collect(Collectors.toSet());
for (SnapshotId snapshotId : snapshotIds) {
if (activeCloneSources.contains(snapshotId)) {
throw new ConcurrentSnapshotExecutionException(new Snapshot(repoName, snapshotId),
"cannot delete snapshot while it is being cloned");
}
}
final SnapshotsInProgress updatedSnapshots;
if (minNodeVersion.onOrAfter(FULL_CONCURRENCY_VERSION)) {
updatedSnapshots = SnapshotsInProgress.of(snapshots.entries().stream()
@ -2266,7 +2587,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
RepositoryData repositoryData, String repoName) {
ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
final ShardGenerations shardGenerations = repositoryData.shardGenerations();
final Set<ShardId> inProgressShards = busyShardsForRepo(repoName, snapshotsInProgress);
final Set<ShardId> inProgressShards = busyShardsForRepo(repoName, snapshotsInProgress, metadata);
final boolean readyToExecute = deletionsInProgress == null || deletionsInProgress.getEntries().stream()
.noneMatch(entry -> entry.repository().equals(repoName) && entry.state() == SnapshotDeletionsInProgress.State.STARTED);
for (IndexId index : indices) {
@ -2330,16 +2651,32 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
* @param snapshots snapshots in progress
* @return shard ids that currently have an actively executing shard snapshot on a data node
*/
private static Set<ShardId> busyShardsForRepo(String repoName, @Nullable SnapshotsInProgress snapshots) {
private static Set<ShardId> busyShardsForRepo(String repoName, @Nullable SnapshotsInProgress snapshots, Metadata metadata) {
final List<SnapshotsInProgress.Entry> runningSnapshots = snapshots == null ? Collections.emptyList() : snapshots.entries();
final Set<ShardId> inProgressShards = new HashSet<>();
for (SnapshotsInProgress.Entry runningSnapshot : runningSnapshots) {
if (runningSnapshot.repository().equals(repoName) == false) {
continue;
}
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shard : runningSnapshot.shards()) {
if (shard.value.isActive()) {
inProgressShards.add(shard.key);
if (runningSnapshot.isClone()) {
for (ObjectObjectCursor<RepositoryShardId, ShardSnapshotStatus> clone : runningSnapshot.clones()) {
final ShardSnapshotStatus shardState = clone.value;
if (shardState.isActive()) {
IndexMetadata indexMeta = metadata.index(clone.key.indexName());
final Index index;
if (indexMeta == null) {
index = new Index(clone.key.indexName(), IndexMetadata.INDEX_UUID_NA_VALUE);
} else {
index = indexMeta.getIndex();
}
inProgressShards.add(new ShardId(index, clone.key.shardId()));
}
}
} else {
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shard : runningSnapshot.shards()) {
if (shard.value.isActive()) {
inProgressShards.add(shard.key);
}
}
}
}
@ -2431,97 +2768,282 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
return true;
}
private static final ClusterStateTaskExecutor<ShardSnapshotUpdate> SHARD_STATE_EXECUTOR = (currentState, tasks) -> {
/**
* Executor that applies {@link ShardSnapshotUpdate}s to the current cluster state. The algorithm implemented below works as described
* below:
* Every shard snapshot or clone state update can result in multiple snapshots being updated. In order to determine whether or not a
* shard update has an effect we use an outer loop over all current executing snapshot operations that iterates over them in the order
* they were started in and an inner loop over the list of shard update tasks.
*
* If the inner loop finds that a shard update task applies to a given snapshot and either a shard-snapshot or shard-clone operation in
* it then it will update the state of the snapshot entry accordingly. If that update was a noop, then the task is removed from the
* iteration as it was already applied before and likely just arrived on the master node again due to retries upstream.
* If the update was not a noop, then it means that the shard it applied to is now available for another snapshot or clone operation
* to be re-assigned if there is another snapshot operation that is waiting for the shard to become available. We therefore record the
* fact that a task was executed by adding it to a collection of executed tasks. If a subsequent execution of the outer loop finds that
* a task in the executed tasks collection applied to a shard it was waiting for to become available, then the shard snapshot operation
* will be started for that snapshot entry and the task removed from the collection of tasks that need to be applied to snapshot
* entries since it can not have any further effects.
*
* Package private to allow for tests.
*/
static final ClusterStateTaskExecutor<ShardSnapshotUpdate> SHARD_STATE_EXECUTOR = (currentState, tasks) -> {
int changedCount = 0;
int startedCount = 0;
final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
final String localNodeId = currentState.nodes().getLocalNodeId();
// Tasks to check for updates for running snapshots.
final List<ShardSnapshotUpdate> unconsumedTasks = new ArrayList<>(tasks);
// Tasks that were used to complete an existing in-progress shard snapshot
final Set<ShardSnapshotUpdate> executedTasks = new HashSet<>();
// Outer loop over all snapshot entries in the order they were created in
for (SnapshotsInProgress.Entry entry : currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
if (entry.state().completed()) {
// completed snapshots do not require any updates so we just add them to the new list and keep going
entries.add(entry);
continue;
}
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = null;
ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> clones = null;
Map<String, IndexId> indicesLookup = null;
// inner loop over all the shard updates that are potentially applicable to the current snapshot entry
for (Iterator<ShardSnapshotUpdate> iterator = unconsumedTasks.iterator(); iterator.hasNext(); ) {
final ShardSnapshotUpdate updateSnapshotState = iterator.next();
final Snapshot updatedSnapshot = updateSnapshotState.snapshot;
final String updatedRepository = updatedSnapshot.getRepository();
if (entry.repository().equals(updatedRepository) == false) {
// the update applies to a different repository so it is irrelevant here
continue;
}
final ShardId finishedShardId = updateSnapshotState.shardId;
if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
final ShardSnapshotStatus existing = entry.shards().get(finishedShardId);
if (existing == null) {
logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
updateSnapshotState, entry);
assert false : "This should never happen, data nodes should only send updates for expected shards";
continue;
if (updateSnapshotState.isClone()) {
// The update applied to a shard clone operation
final RepositoryShardId finishedShardId = updateSnapshotState.repoShardId;
if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
assert entry.isClone() : "Non-clone snapshot [" + entry + "] received update for clone ["
+ updateSnapshotState + "]";
final ShardSnapshotStatus existing = entry.clones().get(finishedShardId);
if (existing == null) {
logger.warn("Received clone shard snapshot status update [{}] but this shard is not tracked in [{}]",
updateSnapshotState, entry);
assert false : "This should never happen, master will not submit a state update for a non-existing clone";
continue;
}
if (existing.state().completed()) {
// No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
iterator.remove();
continue;
}
logger.trace("[{}] Updating shard clone [{}] with status [{}]", updatedSnapshot,
finishedShardId, updateSnapshotState.updatedState.state());
if (clones == null) {
clones = ImmutableOpenMap.builder(entry.clones());
}
changedCount++;
clones.put(finishedShardId, updateSnapshotState.updatedState);
executedTasks.add(updateSnapshotState);
} else if (executedTasks.contains(updateSnapshotState)) {
// the update was already executed on the clone operation it applied to, now we check if it may be possible to
// start a shard snapshot or clone operation on the current entry
if (entry.isClone()) {
// current entry is a clone operation
final ShardSnapshotStatus existingStatus = entry.clones().get(finishedShardId);
if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
continue;
}
if (clones == null) {
clones = ImmutableOpenMap.builder(entry.clones());
}
final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
logger.trace("Starting clone [{}] on [{}] with generation [{}]", finishedShardId,
finishedStatus.nodeId(), finishedStatus.generation());
assert finishedStatus.nodeId().equals(localNodeId) : "Clone updated with node id [" + finishedStatus.nodeId() +
"] but local node id is [" + localNodeId + "]";
clones.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
iterator.remove();
} else {
// current entry is a snapshot operation so we must translate the repository shard id to a routing shard id
final IndexMetadata indexMeta = currentState.metadata().index(finishedShardId.indexName());
if (indexMeta == null) {
// The index name that finished cloning does not exist in the cluster state so it isn't relevant to a
// normal snapshot
continue;
}
final ShardId finishedRoutingShardId = new ShardId(indexMeta.getIndex(), finishedShardId.shardId());
final ShardSnapshotStatus existingStatus = entry.shards().get(finishedRoutingShardId);
if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
continue;
}
if (shards == null) {
shards = ImmutableOpenMap.builder(entry.shards());
}
final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
finishedStatus.nodeId(), finishedStatus.generation());
// A clone was updated, so we must use the correct data node id for the reassignment as actual shard
// snapshot
final ShardSnapshotStatus shardSnapshotStatus = startShardSnapshotAfterClone(currentState,
updateSnapshotState.updatedState.generation(), finishedRoutingShardId);
shards.put(finishedRoutingShardId, shardSnapshotStatus);
if (shardSnapshotStatus.isActive()) {
// only remove the update from the list of tasks that might hold a reusable shard if we actually
// started a snapshot and didn't just fail
iterator.remove();
}
}
}
if (existing.state().completed()) {
// No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
iterator.remove();
continue;
} else {
// a (non-clone) shard snapshot operation was updated
final ShardId finishedShardId = updateSnapshotState.shardId;
if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
final ShardSnapshotStatus existing = entry.shards().get(finishedShardId);
if (existing == null) {
logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
updateSnapshotState, entry);
assert false : "This should never happen, data nodes should only send updates for expected shards";
continue;
}
if (existing.state().completed()) {
// No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
iterator.remove();
continue;
}
logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot,
finishedShardId, updateSnapshotState.updatedState.state());
if (shards == null) {
shards = ImmutableOpenMap.builder(entry.shards());
}
shards.put(finishedShardId, updateSnapshotState.updatedState);
executedTasks.add(updateSnapshotState);
changedCount++;
} else if (executedTasks.contains(updateSnapshotState)) {
// We applied the update for a shard snapshot state to its snapshot entry, now check if we can update
// either a clone or a snapshot
if (entry.isClone()) {
// Since we updated a normal snapshot we need to translate its shard ids to repository shard ids which requires
// a lookup for the index ids
if (indicesLookup == null) {
indicesLookup = entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()));
}
// shard snapshot was completed, we check if we can start a clone operation for the same repo shard
final IndexId indexId = indicesLookup.get(finishedShardId.getIndexName());
// If the lookup finds the index id then at least the entry is concerned with the index id just updated
// so we check on a shard level
if (indexId != null) {
final RepositoryShardId repoShardId = new RepositoryShardId(indexId, finishedShardId.getId());
final ShardSnapshotStatus existingStatus = entry.clones().get(repoShardId);
if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
continue;
}
if (clones == null) {
clones = ImmutableOpenMap.builder(entry.clones());
}
final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
logger.trace("Starting clone [{}] on [{}] with generation [{}]", finishedShardId,
finishedStatus.nodeId(), finishedStatus.generation());
clones.put(repoShardId, new ShardSnapshotStatus(localNodeId, finishedStatus.generation()));
iterator.remove();
startedCount++;
}
} else {
// shard snapshot was completed, we check if we can start another snapshot
final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId);
if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
continue;
}
if (shards == null) {
shards = ImmutableOpenMap.builder(entry.shards());
}
final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
finishedStatus.nodeId(), finishedStatus.generation());
shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
iterator.remove();
}
}
logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot,
finishedShardId, updateSnapshotState.updatedState.state());
if (shards == null) {
shards = ImmutableOpenMap.builder(entry.shards());
}
shards.put(finishedShardId, updateSnapshotState.updatedState);
executedTasks.add(updateSnapshotState);
changedCount++;
} else if (executedTasks.contains(updateSnapshotState)) {
// tasks that completed a shard might allow starting a new shard snapshot for the current snapshot
final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId);
if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
continue;
}
if (shards == null) {
shards = ImmutableOpenMap.builder(entry.shards());
}
final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
finishedStatus.nodeId(), finishedStatus.generation());
shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
iterator.remove();
startedCount++;
}
}
if (shards == null) {
entries.add(entry);
final SnapshotsInProgress.Entry updatedEntry;
if (shards != null) {
assert clones == null : "Should not have updated clones when updating shard snapshots but saw " + clones +
" as well as " + shards;
updatedEntry = entry.withShardStates(shards.build());
} else if (clones != null) {
updatedEntry = entry.withClones(clones.build());
} else {
entries.add(entry.withShardStates(shards.build()));
updatedEntry = entry;
}
entries.add(updatedEntry);
}
if (changedCount > 0) {
logger.trace("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " +
"[{}] shard snapshots", changedCount, startedCount);
return ClusterStateTaskExecutor.ClusterTasksResult.<ShardSnapshotUpdate>builder().successes(tasks).build(
ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build());
return ClusterStateTaskExecutor.ClusterTasksResult.<ShardSnapshotUpdate>builder().successes(tasks)
.build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
SnapshotsInProgress.of(entries)).build());
}
return ClusterStateTaskExecutor.ClusterTasksResult.<ShardSnapshotUpdate>builder().successes(tasks).build(currentState);
};
/**
* An update to the snapshot state of a shard.
* Creates a {@link ShardSnapshotStatus} entry for a snapshot after the shard has become available for snapshotting as a result
* of a snapshot clone completing.
*
* @param currentState current cluster state
* @param shardGeneration shard generation of the shard in the repository
* @param shardId shard id of the shard that just finished cloning
* @return shard snapshot status
*/
private static final class ShardSnapshotUpdate {
private static ShardSnapshotStatus startShardSnapshotAfterClone(ClusterState currentState, String shardGeneration, ShardId shardId) {
final ShardRouting primary = currentState.routingTable().index(shardId.getIndex()).shard(shardId.id()).primaryShard();
final ShardSnapshotStatus shardSnapshotStatus;
if (primary == null || !primary.assignedToNode()) {
shardSnapshotStatus = new ShardSnapshotStatus(
null, ShardState.MISSING, "primary shard is not allocated", shardGeneration);
} else if (primary.relocating() || primary.initializing()) {
shardSnapshotStatus =
new ShardSnapshotStatus(primary.currentNodeId(), ShardState.WAITING, shardGeneration);
} else if (primary.started() == false) {
shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), ShardState.MISSING,
"primary shard hasn't been started yet", shardGeneration);
} else {
shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), shardGeneration);
}
return shardSnapshotStatus;
}
/**
* An update to the snapshot state of a shard.
*
* Package private for testing
*/
static final class ShardSnapshotUpdate {
private final Snapshot snapshot;
private final ShardId shardId;
private final RepositoryShardId repoShardId;
private final ShardSnapshotStatus updatedState;
private ShardSnapshotUpdate(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus updatedState) {
ShardSnapshotUpdate(Snapshot snapshot, RepositoryShardId repositoryShardId, ShardSnapshotStatus updatedState) {
this.snapshot = snapshot;
this.shardId = null;
this.updatedState = updatedState;
this.repoShardId = repositoryShardId;
}
ShardSnapshotUpdate(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus updatedState) {
this.snapshot = snapshot;
this.shardId = shardId;
this.updatedState = updatedState;
repoShardId = null;
}
public boolean isClone() {
return repoShardId != null;
}
@Override
@ -2533,13 +3055,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
return false;
}
final ShardSnapshotUpdate that = (ShardSnapshotUpdate) other;
return this.snapshot.equals(that.snapshot) && this.shardId.equals(that.shardId) && this.updatedState == that.updatedState;
return this.snapshot.equals(that.snapshot) && Objects.equals(this.shardId, that.shardId)
&& Objects.equals(this.repoShardId, that.repoShardId) && this.updatedState == that.updatedState;
}
@Override
public int hashCode() {
return Objects.hash(snapshot, shardId, updatedState);
return Objects.hash(snapshot, shardId, updatedState, repoShardId);
}
}
@ -2568,19 +3091,35 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
} finally {
// Maybe this state update completed the snapshot. If we are not already ending it because of a concurrent
// state update we check if its state is completed and end it if it is.
final SnapshotsInProgress snapshotsInProgress =
newState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
if (endingSnapshots.contains(update.snapshot) == false) {
final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE);
final SnapshotsInProgress.Entry updatedEntry = snapshotsInProgress.snapshot(update.snapshot);
// If the entry is still in the cluster state and is completed, try finalizing the snapshot in the repo
if (updatedEntry != null && updatedEntry.state().completed()) {
endSnapshot(updatedEntry, newState.metadata(), null);
}
}
startExecutableClones(snapshotsInProgress, update.snapshot.getRepository());
}
}
});
}
private void startExecutableClones(SnapshotsInProgress snapshotsInProgress, @Nullable String repoName) {
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
if (entry.isClone() && entry.state() == State.STARTED && (repoName == null || entry.repository().equals(repoName))) {
// this is a clone, see if new work is ready
for (ObjectObjectCursor<RepositoryShardId, ShardSnapshotStatus> clone : entry.clones()) {
if (clone.value.state() == ShardState.INIT) {
runReadyClone(entry.snapshot(), entry.source(), clone.value, clone.key,
repositoriesService.repository(entry.repository()));
}
}
}
}
}
private class UpdateSnapshotStatusAction
extends TransportMasterNodeAction<UpdateIndexShardSnapshotStatusRequest, UpdateIndexShardSnapshotStatusResponse> {
UpdateSnapshotStatusAction(TransportService transportService, ClusterService clusterService,

View File

@ -41,7 +41,7 @@ public class UpdateIndexShardSnapshotStatusRequest extends MasterNodeRequest<Upd
super(in);
snapshot = new Snapshot(in);
shardId = new ShardId(in);
status = new SnapshotsInProgress.ShardSnapshotStatus(in);
status = SnapshotsInProgress.ShardSnapshotStatus.readFrom(in);
}
public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, SnapshotsInProgress.ShardSnapshotStatus status) {

View File

@ -99,6 +99,41 @@
* update to remove the deletion's entry in {@code SnapshotDeletionsInProgress} which concludes the process of deleting a snapshot.</li>
* </ol>
*
* <h2>Cloning a Snapshot</h2>
*
* <p>Cloning part of a snapshot is a process executed entirely on the master node. On a high level, the process of cloning a snapshot is
* analogous to that of creating a snapshot from data in the cluster except that the source of data files is the snapshot repository
* instead of the data nodes. It begins with cloning all shards and then finalizes the cloned snapshot the same way a normal snapshot would
* be finalized. Concretely, it is executed as follows:</p>
*
* <ol>
* <li>First, {@link org.elasticsearch.snapshots.SnapshotsService#cloneSnapshot} is invoked which will place a placeholder entry into
* {@code SnapshotsInProgress} that does not yet contain any shard clone assignments. Note that unlike in the case of snapshot
* creation, the shard level clone tasks in {@link org.elasticsearch.cluster.SnapshotsInProgress.Entry#clones} are not created in the
* initial cluster state update as is done for shard snapshot assignments in
* {@link org.elasticsearch.cluster.SnapshotsInProgress.Entry#shards}. This is due to the fact that shard snapshot assignments are
* computed purely from information in the current cluster state while shard clone assignments require information to be read from the
* repository, which is too slow of a process to be done inside a cluster state update. Loading this information ahead of creating a
* task in the cluster state, runs the risk of race conditions where the source snapshot is being deleted before the clone task is
* enqueued in the cluster state.</li>
* <li>Once a placeholder task for the clone operation is put into the cluster state, we must determine the number of shards in each
* index that is to be cloned as well as ensure the health of the index snapshots in the source snapshot. In order to determine the
* shard count for each index that is to be cloned, we load the index metadata for each such index using the repository's
* {@link org.elasticsearch.repositories.Repository#getSnapshotIndexMetaData} method. In order to ensure the health of the source index
* snapshots, we load the {@link org.elasticsearch.snapshots.SnapshotInfo} for the source snapshot and check for shard snapshot
* failures of the relevant indices.</li>
* <li>Once all shard counts are known and the health of all source indices data has been verified, we populate the
* {@code SnapshotsInProgress.Entry#clones} map for the clone operation with the the relevant shard clone tasks.</li>
* <li>After the clone tasks have been added to the {@code SnapshotsInProgress.Entry}, master executes them on its snapshot thread-pool
* by invoking {@link org.elasticsearch.repositories.Repository#cloneShardSnapshot} for each shard that is to be cloned. Each completed
* shard snapshot triggers a call to the {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_STATE_EXECUTOR} which updates the
* clone's {@code SnapshotsInProgress.Entry} to mark the shard clone operation completed.</li>
* <li>Once all the entries in {@code SnapshotsInProgress.Entry#clones} have completed, the clone is finalized just like any other
* snapshot through {@link org.elasticsearch.snapshots.SnapshotsService#endSnapshot}. The only difference being that the metadata that
* is written out for indices and the global metadata are read from the source snapshot in the repository instead of the cluster state.
* </li>
* </ol>
*
* <h2>Concurrent Snapshot Operations</h2>
*
* Snapshot create and delete operations may be started concurrently. Operations targeting different repositories run independently of

View File

@ -0,0 +1,427 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.snapshots;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoryShardId;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
import static org.hamcrest.Matchers.is;
public class SnapshotsServiceTests extends ESTestCase {
public void testNoopShardStateUpdates() throws Exception {
final String repoName = "test-repo";
final Snapshot snapshot = snapshot(repoName, "snapshot-1");
final SnapshotsInProgress.Entry snapshotNoShards = snapshotEntry(snapshot, Collections.emptyList(), ImmutableOpenMap.of());
final String indexName1 = "index-1";
final ShardId shardId1 = new ShardId(index(indexName1), 0);
{
final ClusterState state = stateWithSnapshots(snapshotNoShards);
final SnapshotsService.ShardSnapshotUpdate shardCompletion =
new SnapshotsService.ShardSnapshotUpdate(snapshot, shardId1, successfulShardStatus(uuid()));
assertIsNoop(state, shardCompletion);
}
{
final ClusterState state = stateWithSnapshots(
snapshotEntry(
snapshot, Collections.singletonList(indexId(indexName1)), shardsMap(shardId1, initShardStatus(uuid()))));
final SnapshotsService.ShardSnapshotUpdate shardCompletion = new SnapshotsService.ShardSnapshotUpdate(
snapshot("other-repo", snapshot.getSnapshotId().getName()), shardId1, successfulShardStatus(uuid()));
assertIsNoop(state, shardCompletion);
}
}
public void testUpdateSnapshotToSuccess() throws Exception {
final String repoName = "test-repo";
final Snapshot sn1 = snapshot(repoName, "snapshot-1");
final String indexName1 = "index-1";
final String dataNodeId = uuid();
final IndexId indexId1 = indexId(indexName1);
final ShardId shardId1 = new ShardId(index(indexName1), 0);
final SnapshotsInProgress.Entry snapshotSingleShard =
snapshotEntry(sn1, Collections.singletonList(indexId1), shardsMap(shardId1, initShardStatus(dataNodeId)));
assertThat(snapshotSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(sn1, shardId1, dataNodeId);
final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(snapshotSingleShard), completeShard);
final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
final SnapshotsInProgress.Entry updatedSnapshot1 = snapshotsInProgress.entries().get(0);
assertThat(updatedSnapshot1.state(), is(SnapshotsInProgress.State.SUCCESS));
assertIsNoop(updatedClusterState, completeShard);
}
public void testUpdateSnapshotMultipleShards() throws Exception {
final String repoName = "test-repo";
final Snapshot sn1 = snapshot(repoName, "snapshot-1");
final String indexName1 = "index-1";
final String dataNodeId = uuid();
final IndexId indexId1 = indexId(indexName1);
final Index routingIndex1 = index(indexName1);
final ShardId shardId1 = new ShardId(routingIndex1, 0);
final ShardId shardId2 = new ShardId(routingIndex1, 1);
final SnapshotsInProgress.ShardSnapshotStatus shardInitStatus = initShardStatus(dataNodeId);
final SnapshotsInProgress.Entry snapshotSingleShard = snapshotEntry(sn1, Collections.singletonList(indexId1),
ImmutableOpenMap.builder(shardsMap(shardId1, shardInitStatus)).fPut(shardId2, shardInitStatus).build());
assertThat(snapshotSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(sn1, shardId1, dataNodeId);
final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(snapshotSingleShard), completeShard);
final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
final SnapshotsInProgress.Entry updatedSnapshot1 = snapshotsInProgress.entries().get(0);
assertThat(updatedSnapshot1.state(), is(SnapshotsInProgress.State.STARTED));
assertIsNoop(updatedClusterState, completeShard);
}
public void testUpdateCloneToSuccess() throws Exception {
final String repoName = "test-repo";
final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot");
final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot");
final String indexName1 = "index-1";
final String dataNodeId = uuid();
final IndexId indexId1 = indexId(indexName1);
final RepositoryShardId shardId1 = new RepositoryShardId(indexId1, 0);
final SnapshotsInProgress.Entry cloneSingleShard =
cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(), clonesMap(shardId1, initShardStatus(dataNodeId)));
assertThat(cloneSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(targetSnapshot, shardId1, dataNodeId);
final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(cloneSingleShard), completeShard);
final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
final SnapshotsInProgress.Entry updatedSnapshot1 = snapshotsInProgress.entries().get(0);
assertThat(updatedSnapshot1.state(), is(SnapshotsInProgress.State.SUCCESS));
assertIsNoop(updatedClusterState, completeShard);
}
public void testUpdateCloneMultipleShards() throws Exception {
final String repoName = "test-repo";
final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot");
final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot");
final String indexName1 = "index-1";
final String dataNodeId = uuid();
final IndexId indexId1 = indexId(indexName1);
final RepositoryShardId shardId1 = new RepositoryShardId(indexId1, 0);
final RepositoryShardId shardId2 = new RepositoryShardId(indexId1, 1);
final SnapshotsInProgress.ShardSnapshotStatus shardInitStatus = initShardStatus(dataNodeId);
final SnapshotsInProgress.Entry cloneMultipleShards = cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(),
ImmutableOpenMap.builder(clonesMap(shardId1, shardInitStatus)).fPut(shardId2, shardInitStatus).build());
assertThat(cloneMultipleShards.state(), is(SnapshotsInProgress.State.STARTED));
final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(targetSnapshot, shardId1, dataNodeId);
final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(cloneMultipleShards), completeShard);
final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
final SnapshotsInProgress.Entry updatedSnapshot1 = snapshotsInProgress.entries().get(0);
assertThat(updatedSnapshot1.state(), is(SnapshotsInProgress.State.STARTED));
assertIsNoop(updatedClusterState, completeShard);
}
public void testCompletedCloneStartsSnapshot() throws Exception {
final String repoName = "test-repo";
final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot");
final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot");
final String indexName1 = "index-1";
final String dataNodeId = uuid();
final IndexId indexId1 = indexId(indexName1);
final RepositoryShardId shardId1 = new RepositoryShardId(indexId1, 0);
final SnapshotsInProgress.ShardSnapshotStatus shardInitStatus = initShardStatus(dataNodeId);
final SnapshotsInProgress.Entry cloneSingleShard = cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(),
clonesMap(shardId1, shardInitStatus));
final ClusterState stateWithIndex = stateWithUnassignedIndices(indexName1);
final Snapshot plainSnapshot = snapshot(repoName, "test-snapshot");
final ShardId routingShardId1 = new ShardId(stateWithIndex.metadata().index(indexName1).getIndex(), 0);
final SnapshotsInProgress.Entry snapshotSingleShard = snapshotEntry(plainSnapshot, Collections.singletonList(indexId1),
shardsMap(routingShardId1, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED));
assertThat(cloneSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
// 1. case: shard that just finished cloning is unassigned -> shard snapshot should go to MISSING state
final ClusterState stateWithUnassignedRoutingShard = stateWithSnapshots(stateWithIndex, cloneSingleShard, snapshotSingleShard);
final SnapshotsService.ShardSnapshotUpdate completeShardClone = successUpdate(targetSnapshot, shardId1, uuid());
{
final ClusterState updatedClusterState = applyUpdates(stateWithUnassignedRoutingShard, completeShardClone);
final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0);
assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS));
final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.SUCCESS));
assertThat(startedSnapshot.shards().get(routingShardId1).state(), is(SnapshotsInProgress.ShardState.MISSING));
assertIsNoop(updatedClusterState, completeShardClone);
}
// 2. case: shard that just finished cloning is assigned correctly -> shard snapshot should go to INIT state
final ClusterState stateWithAssignedRoutingShard =
ClusterState.builder(stateWithUnassignedRoutingShard).routingTable(
RoutingTable.builder(stateWithUnassignedRoutingShard.routingTable()).add(
IndexRoutingTable.builder(routingShardId1.getIndex()).addIndexShard(
new IndexShardRoutingTable.Builder(routingShardId1).addShard(
TestShardRouting.newShardRouting(
routingShardId1, dataNodeId, true, ShardRoutingState.STARTED)
).build())).build()).build();
{
final ClusterState updatedClusterState = applyUpdates(stateWithAssignedRoutingShard, completeShardClone);
final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0);
assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS));
final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED));
final SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus = startedSnapshot.shards().get(routingShardId1);
assertThat(shardSnapshotStatus.state(), is(SnapshotsInProgress.ShardState.INIT));
assertThat(shardSnapshotStatus.nodeId(), is(dataNodeId));
assertIsNoop(updatedClusterState, completeShardClone);
}
// 3. case: shard that just finished cloning is currently initializing -> shard snapshot should go to WAITING state
final ClusterState stateWithInitializingRoutingShard =
ClusterState.builder(stateWithUnassignedRoutingShard).routingTable(
RoutingTable.builder(stateWithUnassignedRoutingShard.routingTable()).add(
IndexRoutingTable.builder(routingShardId1.getIndex()).addIndexShard(
new IndexShardRoutingTable.Builder(routingShardId1).addShard(
TestShardRouting.newShardRouting(
routingShardId1, dataNodeId, true, ShardRoutingState.INITIALIZING)
).build())).build()).build();
{
final ClusterState updatedClusterState = applyUpdates(stateWithInitializingRoutingShard, completeShardClone);
final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0);
assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS));
final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED));
assertThat(startedSnapshot.shards().get(routingShardId1).state(), is(SnapshotsInProgress.ShardState.WAITING));
assertIsNoop(updatedClusterState, completeShardClone);
}
}
public void testCompletedSnapshotStartsClone() throws Exception {
final String repoName = "test-repo";
final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot");
final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot");
final String indexName = "index-1";
final String dataNodeId = uuid();
final IndexId indexId1 = indexId(indexName);
final RepositoryShardId repositoryShardId = new RepositoryShardId(indexId1, 0);
final SnapshotsInProgress.Entry cloneSingleShard = cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(),
clonesMap(repositoryShardId, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED));
final ClusterState stateWithIndex = stateWithUnassignedIndices(indexName);
final Snapshot plainSnapshot = snapshot(repoName, "test-snapshot");
final ShardId routingShardId = new ShardId(stateWithIndex.metadata().index(indexName).getIndex(), 0);
final SnapshotsInProgress.Entry snapshotSingleShard = snapshotEntry(plainSnapshot, Collections.singletonList(indexId1),
shardsMap(routingShardId, initShardStatus(dataNodeId)));
assertThat(cloneSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(plainSnapshot, routingShardId, dataNodeId);
final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(snapshotSingleShard, cloneSingleShard), completeShard);
final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0);
assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS));
final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED));
final SnapshotsInProgress.ShardSnapshotStatus shardCloneStatus = startedSnapshot.clones().get(repositoryShardId);
assertThat(shardCloneStatus.state(), is(SnapshotsInProgress.ShardState.INIT));
assertThat(shardCloneStatus.nodeId(), is(updatedClusterState.nodes().getLocalNodeId()));
assertIsNoop(updatedClusterState, completeShard);
}
public void testCompletedSnapshotStartsNextSnapshot() throws Exception {
final String repoName = "test-repo";
final String indexName = "index-1";
final String dataNodeId = uuid();
final IndexId indexId1 = indexId(indexName);
final ClusterState stateWithIndex = stateWithUnassignedIndices(indexName);
final Snapshot plainSnapshot = snapshot(repoName, "test-snapshot-1");
final ShardId routingShardId = new ShardId(stateWithIndex.metadata().index(indexName).getIndex(), 0);
final SnapshotsInProgress.Entry snapshotSingleShard = snapshotEntry(plainSnapshot, Collections.singletonList(indexId1),
shardsMap(routingShardId, initShardStatus(dataNodeId)));
final Snapshot queuedSnapshot = snapshot(repoName, "test-snapshot-2");
final SnapshotsInProgress.Entry queuedSnapshotSingleShard = snapshotEntry(queuedSnapshot, Collections.singletonList(indexId1),
shardsMap(routingShardId, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED));
final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(plainSnapshot, routingShardId, dataNodeId);
final ClusterState updatedClusterState =
applyUpdates(stateWithSnapshots(snapshotSingleShard, queuedSnapshotSingleShard), completeShard);
final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
final SnapshotsInProgress.Entry completedSnapshot = snapshotsInProgress.entries().get(0);
assertThat(completedSnapshot.state(), is(SnapshotsInProgress.State.SUCCESS));
final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED));
final SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus = startedSnapshot.shards().get(routingShardId);
assertThat(shardSnapshotStatus.state(), is(SnapshotsInProgress.ShardState.INIT));
assertThat(shardSnapshotStatus.nodeId(), is(dataNodeId));
assertIsNoop(updatedClusterState, completeShard);
}
public void testCompletedCloneStartsNextClone() throws Exception {
final String repoName = "test-repo";
final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot");
final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot");
final String indexName1 = "index-1";
final IndexId indexId1 = indexId(indexName1);
final RepositoryShardId shardId1 = new RepositoryShardId(indexId1, 0);
final String masterNodeId = uuid();
final SnapshotsInProgress.Entry cloneSingleShard = cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(),
clonesMap(shardId1, initShardStatus(masterNodeId)));
final Snapshot queuedTargetSnapshot = snapshot(repoName, "test-snapshot");
final SnapshotsInProgress.Entry queuedClone = cloneEntry(queuedTargetSnapshot, sourceSnapshot.getSnapshotId(),
clonesMap(shardId1, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED));
assertThat(cloneSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
final ClusterState stateWithUnassignedRoutingShard = stateWithSnapshots(
ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes(masterNodeId)).build(), cloneSingleShard, queuedClone);
final SnapshotsService.ShardSnapshotUpdate completeShardClone = successUpdate(targetSnapshot, shardId1, masterNodeId);
final ClusterState updatedClusterState = applyUpdates(stateWithUnassignedRoutingShard, completeShardClone);
final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0);
assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS));
final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED));
assertThat(startedSnapshot.clones().get(shardId1).state(), is(SnapshotsInProgress.ShardState.INIT));
assertIsNoop(updatedClusterState, completeShardClone);
}
private static DiscoveryNodes discoveryNodes(String localNodeId) {
return DiscoveryNodes.builder().localNodeId(localNodeId).build();
}
private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shardsMap(
ShardId shardId, SnapshotsInProgress.ShardSnapshotStatus shardStatus) {
return ImmutableOpenMap.<ShardId, SnapshotsInProgress.ShardSnapshotStatus>builder().fPut(shardId, shardStatus).build();
}
private static ImmutableOpenMap<RepositoryShardId, SnapshotsInProgress.ShardSnapshotStatus> clonesMap(
RepositoryShardId shardId, SnapshotsInProgress.ShardSnapshotStatus shardStatus) {
return ImmutableOpenMap.<RepositoryShardId, SnapshotsInProgress.ShardSnapshotStatus>builder().fPut(shardId, shardStatus).build();
}
private static SnapshotsService.ShardSnapshotUpdate successUpdate(Snapshot snapshot, ShardId shardId, String nodeId) {
return new SnapshotsService.ShardSnapshotUpdate(snapshot, shardId, successfulShardStatus(nodeId));
}
private static SnapshotsService.ShardSnapshotUpdate successUpdate(Snapshot snapshot, RepositoryShardId shardId, String nodeId) {
return new SnapshotsService.ShardSnapshotUpdate(snapshot, shardId, successfulShardStatus(nodeId));
}
private static ClusterState stateWithUnassignedIndices(String... indexNames) {
final Metadata.Builder metaBuilder = Metadata.builder(Metadata.EMPTY_METADATA);
for (String index : indexNames) {
metaBuilder.put(IndexMetadata.builder(index)
.settings(Settings.builder().put(SETTING_VERSION_CREATED, Version.CURRENT.id))
.numberOfShards(1).numberOfReplicas(0)
.build(), false);
}
final RoutingTable.Builder routingTable = RoutingTable.builder();
for (String index : indexNames) {
final Index idx = metaBuilder.get(index).getIndex();
routingTable.add(IndexRoutingTable.builder(idx).addIndexShard(
new IndexShardRoutingTable.Builder(new ShardId(idx, 0)).build()));
}
return ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metaBuilder).routingTable(routingTable.build()).build();
}
private static ClusterState stateWithSnapshots(ClusterState state, SnapshotsInProgress.Entry... entries) {
return ClusterState.builder(state).version(state.version() + 1L)
.putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(Arrays.asList(entries))).build();
}
private static ClusterState stateWithSnapshots(SnapshotsInProgress.Entry... entries) {
return stateWithSnapshots(ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes(uuid())).build(), entries);
}
private static void assertIsNoop(ClusterState state, SnapshotsService.ShardSnapshotUpdate shardCompletion) throws Exception {
assertSame(applyUpdates(state, shardCompletion), state);
}
private static ClusterState applyUpdates(ClusterState state, SnapshotsService.ShardSnapshotUpdate... updates) throws Exception {
return SnapshotsService.SHARD_STATE_EXECUTOR.execute(state, Arrays.asList(updates)).resultingState;
}
private static SnapshotsInProgress.Entry snapshotEntry(Snapshot snapshot, List<IndexId> indexIds,
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards) {
return SnapshotsInProgress.startedEntry(snapshot, randomBoolean(), randomBoolean(), indexIds, Collections.emptyList(),
1L, randomNonNegativeLong(), shards, Collections.emptyMap(), Version.CURRENT);
}
private static SnapshotsInProgress.Entry cloneEntry(
Snapshot snapshot, SnapshotId source, ImmutableOpenMap<RepositoryShardId, SnapshotsInProgress.ShardSnapshotStatus> clones) {
final List<IndexId> indexIds = StreamSupport.stream(clones.keys().spliterator(), false)
.map(k -> k.value.index()).distinct().collect(Collectors.toList());
return SnapshotsInProgress.startClone(snapshot, source, indexIds, 1L, randomNonNegativeLong(), Version.CURRENT).withClones(clones);
}
private static SnapshotsInProgress.ShardSnapshotStatus initShardStatus(String nodeId) {
return new SnapshotsInProgress.ShardSnapshotStatus(nodeId, uuid());
}
private static SnapshotsInProgress.ShardSnapshotStatus successfulShardStatus(String nodeId) {
return new SnapshotsInProgress.ShardSnapshotStatus(nodeId, SnapshotsInProgress.ShardState.SUCCESS, uuid());
}
private static Snapshot snapshot(String repoName, String name) {
return new Snapshot(repoName, new SnapshotId(name, uuid()));
}
private static Index index(String name) {
return new Index(name, uuid());
}
private static IndexId indexId(String name) {
return new IndexId(name, uuid());
}
private static String uuid() {
return UUIDs.randomBase64UUID(random());
}
}

View File

@ -118,6 +118,10 @@ public class MockRepository extends FsRepository {
/** Allows blocking on writing the snapshot file at the end of snapshot creation to simulate a died master node */
private volatile boolean blockAndFailOnWriteSnapFile;
private volatile boolean blockOnWriteShardLevelMeta;
private volatile boolean blockOnReadIndexMeta;
/**
* Writes to the blob {@code index.latest} at the repository root will fail with an {@link IOException} if {@code true}.
*/
@ -189,6 +193,8 @@ public class MockRepository extends FsRepository {
blockOnWriteIndexFile = false;
blockAndFailOnWriteSnapFile = false;
blockOnDeleteIndexN = false;
blockOnWriteShardLevelMeta = false;
blockOnReadIndexMeta = false;
this.notifyAll();
}
@ -212,6 +218,14 @@ public class MockRepository extends FsRepository {
blockOnDeleteIndexN = true;
}
public void setBlockOnWriteShardLevelMeta() {
blockOnWriteShardLevelMeta = true;
}
public void setBlockOnReadIndexMeta() {
blockOnReadIndexMeta = true;
}
public void setFailReadsAfterUnblock(boolean failReadsAfterUnblock) {
this.failReadsAfterUnblock = failReadsAfterUnblock;
}
@ -229,7 +243,7 @@ public class MockRepository extends FsRepository {
boolean wasBlocked = false;
try {
while (blockOnDataFiles || blockOnAnyFiles || blockOnWriteIndexFile ||
blockAndFailOnWriteSnapFile || blockOnDeleteIndexN) {
blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta) {
blocked = true;
this.wait();
wasBlocked = true;
@ -365,8 +379,12 @@ public class MockRepository extends FsRepository {
@Override
public InputStream readBlob(String name) throws IOException {
maybeReadErrorAfterBlock(name);
maybeIOExceptionOrBlock(name);
if (blockOnReadIndexMeta && name.startsWith(BlobStoreRepository.METADATA_PREFIX) && path().equals(basePath()) == false) {
blockExecutionAndMaybeWait(name);
} else {
maybeReadErrorAfterBlock(name);
maybeIOExceptionOrBlock(name);
}
return super.readBlob(name);
}
@ -430,6 +448,10 @@ public class MockRepository extends FsRepository {
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws IOException {
maybeIOExceptionOrBlock(blobName);
if (blockOnWriteShardLevelMeta && blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX)
&& path().equals(basePath()) == false) {
blockExecutionAndMaybeWait(blobName);
}
super.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
if (RandomizedContext.current().getRandom().nextBoolean()) {
// for network based repositories, the blob may have been written but we may still