Merge pull request #14471 from xuzha/snapshot_list

Add ignore_unavailable parameter to skip unavailable snapshot

closes #13887
This commit is contained in:
Xu Zhang 2015-11-18 22:09:08 -08:00
commit a077f4a933
9 changed files with 113 additions and 6 deletions

View File

@ -41,6 +41,8 @@ public class GetSnapshotsRequest extends MasterNodeRequest<GetSnapshotsRequest>
private String[] snapshots = Strings.EMPTY_ARRAY; private String[] snapshots = Strings.EMPTY_ARRAY;
private boolean ignoreUnavailable;
public GetSnapshotsRequest() { public GetSnapshotsRequest() {
} }
@ -112,11 +114,28 @@ public class GetSnapshotsRequest extends MasterNodeRequest<GetSnapshotsRequest>
return this; return this;
} }
/**
* Set to true to ignore unavailable snapshots
*
* @return this request
*/
public GetSnapshotsRequest ignoreUnavailable(boolean ignoreUnavailable) {
this.ignoreUnavailable = ignoreUnavailable;
return this;
}
/**
* @return Whether snapshots should be ignored when unavailable (corrupt or temporarily not fetchable)
*/
public boolean ignoreUnavailable() {
return ignoreUnavailable;
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
repository = in.readString(); repository = in.readString();
snapshots = in.readStringArray(); snapshots = in.readStringArray();
ignoreUnavailable = in.readBoolean();
} }
@Override @Override
@ -124,5 +143,6 @@ public class GetSnapshotsRequest extends MasterNodeRequest<GetSnapshotsRequest>
super.writeTo(out); super.writeTo(out);
out.writeString(repository); out.writeString(repository);
out.writeStringArray(snapshots); out.writeStringArray(snapshots);
out.writeBoolean(ignoreUnavailable);
} }
} }

View File

@ -84,4 +84,16 @@ public class GetSnapshotsRequestBuilder extends MasterNodeOperationRequestBuilde
request.snapshots(ArrayUtils.concat(request.snapshots(), snapshots)); request.snapshots(ArrayUtils.concat(request.snapshots(), snapshots));
return this; return this;
} }
/**
* Makes the request ignore unavailable snapshots
*
* @param ignoreUnavailable true to ignore unavailable snapshots.
* @return this builder
*/
public GetSnapshotsRequestBuilder setIgnoreUnavailable(boolean ignoreUnavailable) {
request.ignoreUnavailable(ignoreUnavailable);
return this;
}
} }

View File

@ -74,7 +74,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
try { try {
List<SnapshotInfo> snapshotInfoBuilder = new ArrayList<>(); List<SnapshotInfo> snapshotInfoBuilder = new ArrayList<>();
if (isAllSnapshots(request.snapshots())) { if (isAllSnapshots(request.snapshots())) {
List<Snapshot> snapshots = snapshotsService.snapshots(request.repository()); List<Snapshot> snapshots = snapshotsService.snapshots(request.repository(), request.ignoreUnavailable());
for (Snapshot snapshot : snapshots) { for (Snapshot snapshot : snapshots) {
snapshotInfoBuilder.add(new SnapshotInfo(snapshot)); snapshotInfoBuilder.add(new SnapshotInfo(snapshot));
} }

View File

@ -47,7 +47,10 @@ public class RestGetSnapshotsAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
String repository = request.param("repository"); String repository = request.param("repository");
String[] snapshots = request.paramAsStringArray("snapshot", Strings.EMPTY_ARRAY); String[] snapshots = request.paramAsStringArray("snapshot", Strings.EMPTY_ARRAY);
GetSnapshotsRequest getSnapshotsRequest = getSnapshotsRequest(repository).snapshots(snapshots); GetSnapshotsRequest getSnapshotsRequest = getSnapshotsRequest(repository).snapshots(snapshots);
getSnapshotsRequest.ignoreUnavailable(request.paramAsBoolean("ignore_unavailable", getSnapshotsRequest.ignoreUnavailable()));
getSnapshotsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getSnapshotsRequest.masterNodeTimeout())); getSnapshotsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getSnapshotsRequest.masterNodeTimeout()));
client.admin().cluster().getSnapshots(getSnapshotsRequest, new RestToXContentListener<GetSnapshotsResponse>(channel)); client.admin().cluster().getSnapshots(getSnapshotsRequest, new RestToXContentListener<GetSnapshotsResponse>(channel));
} }

View File

@ -54,9 +54,12 @@ public class RestSnapshotAction extends AbstractCatAction {
@Override @Override
protected void doRequest(final RestRequest request, RestChannel channel, Client client) { protected void doRequest(final RestRequest request, RestChannel channel, Client client) {
GetSnapshotsRequest getSnapshotsRequest = new GetSnapshotsRequest(); GetSnapshotsRequest getSnapshotsRequest = new GetSnapshotsRequest()
getSnapshotsRequest.repository(request.param("repository")); .repository(request.param("repository"))
getSnapshotsRequest.snapshots(new String[] { GetSnapshotsRequest.ALL_SNAPSHOTS }); .snapshots(new String[]{GetSnapshotsRequest.ALL_SNAPSHOTS});
getSnapshotsRequest.ignoreUnavailable(request.paramAsBoolean("ignore_unavailable", getSnapshotsRequest.ignoreUnavailable()));
getSnapshotsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getSnapshotsRequest.masterNodeTimeout())); getSnapshotsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getSnapshotsRequest.masterNodeTimeout()));
client.admin().cluster().getSnapshots(getSnapshotsRequest, new RestResponseListener<GetSnapshotsResponse>(channel) { client.admin().cluster().getSnapshots(getSnapshotsRequest, new RestResponseListener<GetSnapshotsResponse>(channel) {

View File

@ -141,7 +141,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
* @param repositoryName repository name * @param repositoryName repository name
* @return list of snapshots * @return list of snapshots
*/ */
public List<Snapshot> snapshots(String repositoryName) { public List<Snapshot> snapshots(String repositoryName, boolean ignoreUnavailable) {
Set<Snapshot> snapshotSet = new HashSet<>(); Set<Snapshot> snapshotSet = new HashSet<>();
List<SnapshotsInProgress.Entry> entries = currentSnapshots(repositoryName, null); List<SnapshotsInProgress.Entry> entries = currentSnapshots(repositoryName, null);
for (SnapshotsInProgress.Entry entry : entries) { for (SnapshotsInProgress.Entry entry : entries) {
@ -150,8 +150,17 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
Repository repository = repositoriesService.repository(repositoryName); Repository repository = repositoriesService.repository(repositoryName);
List<SnapshotId> snapshotIds = repository.snapshots(); List<SnapshotId> snapshotIds = repository.snapshots();
for (SnapshotId snapshotId : snapshotIds) { for (SnapshotId snapshotId : snapshotIds) {
snapshotSet.add(repository.readSnapshot(snapshotId)); try {
snapshotSet.add(repository.readSnapshot(snapshotId));
} catch (Exception ex) {
if (ignoreUnavailable) {
logger.warn("failed to get snapshot [{}]", ex, snapshotId);
} else {
throw new SnapshotException(snapshotId, "Snapshot could not be read", ex);
}
}
} }
ArrayList<Snapshot> snapshotList = new ArrayList<>(snapshotSet); ArrayList<Snapshot> snapshotList = new ArrayList<>(snapshotSet);
CollectionUtil.timSort(snapshotList); CollectionUtil.timSort(snapshotList);
return Collections.unmodifiableList(snapshotList); return Collections.unmodifiableList(snapshotList);

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage;
@ -54,6 +55,7 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
@ -77,6 +79,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.elasticsearch.client.Requests.getSnapshotsRequest;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.settings.Settings.settingsBuilder;
@ -2047,4 +2050,53 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(ex.getMessage(), containsString("Invalid snapshot name")); assertThat(ex.getMessage(), containsString("Invalid snapshot name"));
} }
} }
public void testListCorruptedSnapshot() throws Exception {
Client client = client();
Path repo = randomRepoPath();
logger.info("--> creating repository at " + repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.settingsBuilder()
.put("location", repo)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
ensureYellow();
logger.info("--> indexing some data");
indexRandom(true,
client().prepareIndex("test-idx-1", "doc").setSource("foo", "bar"),
client().prepareIndex("test-idx-2", "doc").setSource("foo", "bar"),
client().prepareIndex("test-idx-3", "doc").setSource("foo", "bar"));
logger.info("--> creating 2 snapshots");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).setIndices("test-idx-*").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2").setWaitForCompletion(true).setIndices("test-idx-*").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
logger.info("--> truncate snapshot file to make it unreadable");
Path snapshotPath = repo.resolve("snap-test-snap-2.dat");
try(SeekableByteChannel outChan = Files.newByteChannel(snapshotPath, StandardOpenOption.WRITE)) {
outChan.truncate(randomInt(10));
}
logger.info("--> get snapshots request should return both snapshots");
List<SnapshotInfo> snapshotInfos = client.admin().cluster()
.prepareGetSnapshots("test-repo")
.setIgnoreUnavailable(true).get().getSnapshots();
assertThat(snapshotInfos.size(), equalTo(1));
assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfos.get(0).name(), equalTo("test-snap-1"));
try {
client.admin().cluster().prepareGetSnapshots("test-repo").setIgnoreUnavailable(false).get().getSnapshots();
} catch (SnapshotException ex) {
assertThat(ex.snapshot().getRepository(), equalTo("test-repo"));
assertThat(ex.snapshot().getSnapshot(), equalTo("test-snap-2"));
}
}
} }

View File

@ -259,6 +259,9 @@ GET /_snapshot/my_backup/_all
----------------------------------- -----------------------------------
// AUTOSENSE // AUTOSENSE
The command fails if some of the snapshots are unavailable. The boolean parameter `ignore_unvailable` can be used to
return all snapshots that are currently available.
A currently running snapshot can be retrieved using the following command: A currently running snapshot can be retrieved using the following command:
[source,sh] [source,sh]

View File

@ -12,6 +12,11 @@
} }
}, },
"params": { "params": {
"ignore_unavailable": {
"type": "boolean",
"description": "Set to true to ignore unavailable snapshots",
"default": false
},
"master_timeout": { "master_timeout": {
"type" : "time", "type" : "time",
"description" : "Explicit operation timeout for connection to master node" "description" : "Explicit operation timeout for connection to master node"