Catch exception when reading corrupted snapshot.
Single corrupted snapshot file shouldn't prevent listing all other snapshot in repository.
This commit is contained in:
parent
0d349854d3
commit
2e6d72de27
|
@ -41,6 +41,8 @@ public class GetSnapshotsRequest extends MasterNodeRequest<GetSnapshotsRequest>
|
|||
|
||||
private String[] snapshots = Strings.EMPTY_ARRAY;
|
||||
|
||||
private boolean ignoreUnavailable;
|
||||
|
||||
public GetSnapshotsRequest() {
|
||||
}
|
||||
|
||||
|
@ -112,11 +114,28 @@ public class GetSnapshotsRequest extends MasterNodeRequest<GetSnapshotsRequest>
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set to true to ignore unavailable snapshots
|
||||
*
|
||||
* @return this request
|
||||
*/
|
||||
public GetSnapshotsRequest ignoreUnavailable(boolean ignoreUnavailable) {
|
||||
this.ignoreUnavailable = ignoreUnavailable;
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* @return Whether snapshots should be ignored when unavailable (corrupt or temporarily not fetchable)
|
||||
*/
|
||||
public boolean ignoreUnavailable() {
|
||||
return ignoreUnavailable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
repository = in.readString();
|
||||
snapshots = in.readStringArray();
|
||||
ignoreUnavailable = in.readBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -124,5 +143,6 @@ public class GetSnapshotsRequest extends MasterNodeRequest<GetSnapshotsRequest>
|
|||
super.writeTo(out);
|
||||
out.writeString(repository);
|
||||
out.writeStringArray(snapshots);
|
||||
out.writeBoolean(ignoreUnavailable);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -84,4 +84,16 @@ public class GetSnapshotsRequestBuilder extends MasterNodeOperationRequestBuilde
|
|||
request.snapshots(ArrayUtils.concat(request.snapshots(), snapshots));
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Makes the request ignore unavailable snapshots
|
||||
*
|
||||
* @param ignoreUnavailable true to ignore unavailable snapshots.
|
||||
* @return this builder
|
||||
*/
|
||||
public GetSnapshotsRequestBuilder setIgnoreUnavailable(boolean ignoreUnavailable) {
|
||||
request.ignoreUnavailable(ignoreUnavailable);
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -74,7 +74,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|||
try {
|
||||
List<SnapshotInfo> snapshotInfoBuilder = new ArrayList<>();
|
||||
if (isAllSnapshots(request.snapshots())) {
|
||||
List<Snapshot> snapshots = snapshotsService.snapshots(request.repository());
|
||||
List<Snapshot> snapshots = snapshotsService.snapshots(request.repository(), request.ignoreUnavailable());
|
||||
for (Snapshot snapshot : snapshots) {
|
||||
snapshotInfoBuilder.add(new SnapshotInfo(snapshot));
|
||||
}
|
||||
|
|
|
@ -47,7 +47,10 @@ public class RestGetSnapshotsAction extends BaseRestHandler {
|
|||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
String repository = request.param("repository");
|
||||
String[] snapshots = request.paramAsStringArray("snapshot", Strings.EMPTY_ARRAY);
|
||||
|
||||
GetSnapshotsRequest getSnapshotsRequest = getSnapshotsRequest(repository).snapshots(snapshots);
|
||||
getSnapshotsRequest.ignoreUnavailable(request.paramAsBoolean("ignore_unavailable", getSnapshotsRequest.ignoreUnavailable()));
|
||||
|
||||
getSnapshotsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getSnapshotsRequest.masterNodeTimeout()));
|
||||
client.admin().cluster().getSnapshots(getSnapshotsRequest, new RestToXContentListener<GetSnapshotsResponse>(channel));
|
||||
}
|
||||
|
|
|
@ -54,9 +54,12 @@ public class RestSnapshotAction extends AbstractCatAction {
|
|||
|
||||
@Override
|
||||
protected void doRequest(final RestRequest request, RestChannel channel, Client client) {
|
||||
GetSnapshotsRequest getSnapshotsRequest = new GetSnapshotsRequest();
|
||||
getSnapshotsRequest.repository(request.param("repository"));
|
||||
getSnapshotsRequest.snapshots(new String[] { GetSnapshotsRequest.ALL_SNAPSHOTS });
|
||||
GetSnapshotsRequest getSnapshotsRequest = new GetSnapshotsRequest()
|
||||
.repository(request.param("repository"))
|
||||
.snapshots(new String[]{GetSnapshotsRequest.ALL_SNAPSHOTS});
|
||||
|
||||
getSnapshotsRequest.ignoreUnavailable(request.paramAsBoolean("ignore_unavailable", getSnapshotsRequest.ignoreUnavailable()));
|
||||
|
||||
getSnapshotsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getSnapshotsRequest.masterNodeTimeout()));
|
||||
|
||||
client.admin().cluster().getSnapshots(getSnapshotsRequest, new RestResponseListener<GetSnapshotsResponse>(channel) {
|
||||
|
|
|
@ -141,7 +141,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
|
|||
* @param repositoryName repository name
|
||||
* @return list of snapshots
|
||||
*/
|
||||
public List<Snapshot> snapshots(String repositoryName) {
|
||||
public List<Snapshot> snapshots(String repositoryName, boolean ignoreUnavailable) {
|
||||
Set<Snapshot> snapshotSet = new HashSet<>();
|
||||
List<SnapshotsInProgress.Entry> entries = currentSnapshots(repositoryName, null);
|
||||
for (SnapshotsInProgress.Entry entry : entries) {
|
||||
|
@ -150,8 +150,17 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
|
|||
Repository repository = repositoriesService.repository(repositoryName);
|
||||
List<SnapshotId> snapshotIds = repository.snapshots();
|
||||
for (SnapshotId snapshotId : snapshotIds) {
|
||||
snapshotSet.add(repository.readSnapshot(snapshotId));
|
||||
try {
|
||||
snapshotSet.add(repository.readSnapshot(snapshotId));
|
||||
} catch (Exception ex) {
|
||||
if (ignoreUnavailable) {
|
||||
logger.warn("failed to get snapshot [{}]", ex, snapshotId);
|
||||
} else {
|
||||
throw new SnapshotException(snapshotId, "Snapshot could not be read", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ArrayList<Snapshot> snapshotList = new ArrayList<>(snapshotSet);
|
||||
CollectionUtil.timSort(snapshotList);
|
||||
return Collections.unmodifiableList(snapshotList);
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.action.ListenableActionFuture;
|
|||
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage;
|
||||
|
@ -54,6 +55,7 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
|
|||
import org.elasticsearch.cluster.metadata.SnapshotId;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
|
@ -77,6 +79,7 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.client.Requests.getSnapshotsRequest;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
|
||||
|
@ -2047,4 +2050,53 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
assertThat(ex.getMessage(), containsString("Invalid snapshot name"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testListCorruptedSnapshot() throws Exception {
|
||||
Client client = client();
|
||||
Path repo = randomRepoPath();
|
||||
logger.info("--> creating repository at " + repo.toAbsolutePath());
|
||||
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("fs").setSettings(Settings.settingsBuilder()
|
||||
.put("location", repo)
|
||||
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
||||
|
||||
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
|
||||
ensureYellow();
|
||||
logger.info("--> indexing some data");
|
||||
indexRandom(true,
|
||||
client().prepareIndex("test-idx-1", "doc").setSource("foo", "bar"),
|
||||
client().prepareIndex("test-idx-2", "doc").setSource("foo", "bar"),
|
||||
client().prepareIndex("test-idx-3", "doc").setSource("foo", "bar"));
|
||||
|
||||
logger.info("--> creating 2 snapshots");
|
||||
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).setIndices("test-idx-*").get();
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||
|
||||
createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2").setWaitForCompletion(true).setIndices("test-idx-*").get();
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||
|
||||
logger.info("--> truncate snapshot file to make it unreadable");
|
||||
Path snapshotPath = repo.resolve("snap-test-snap-2.dat");
|
||||
try(SeekableByteChannel outChan = Files.newByteChannel(snapshotPath, StandardOpenOption.WRITE)) {
|
||||
outChan.truncate(randomInt(10));
|
||||
}
|
||||
|
||||
logger.info("--> get snapshots request should return both snapshots");
|
||||
List<SnapshotInfo> snapshotInfos = client.admin().cluster()
|
||||
.prepareGetSnapshots("test-repo")
|
||||
.setIgnoreUnavailable(true).get().getSnapshots();
|
||||
|
||||
assertThat(snapshotInfos.size(), equalTo(1));
|
||||
assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS));
|
||||
assertThat(snapshotInfos.get(0).name(), equalTo("test-snap-1"));
|
||||
|
||||
try {
|
||||
client.admin().cluster().prepareGetSnapshots("test-repo").setIgnoreUnavailable(false).get().getSnapshots();
|
||||
} catch (SnapshotException ex) {
|
||||
assertThat(ex.snapshot().getRepository(), equalTo("test-repo"));
|
||||
assertThat(ex.snapshot().getSnapshot(), equalTo("test-snap-2"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -259,6 +259,9 @@ GET /_snapshot/my_backup/_all
|
|||
-----------------------------------
|
||||
// AUTOSENSE
|
||||
|
||||
The command fails if some of the snapshots are unavailable. The boolean parameter `ignore_unvailable` can be used to
|
||||
return all snapshots that are currently available.
|
||||
|
||||
A currently running snapshot can be retrieved using the following command:
|
||||
|
||||
[source,sh]
|
||||
|
|
|
@ -12,6 +12,11 @@
|
|||
}
|
||||
},
|
||||
"params": {
|
||||
"ignore_unavailable": {
|
||||
"type": "boolean",
|
||||
"description": "Set to true to ignore unavailable snapshots",
|
||||
"default": false
|
||||
},
|
||||
"master_timeout": {
|
||||
"type" : "time",
|
||||
"description" : "Explicit operation timeout for connection to master node"
|
||||
|
|
Loading…
Reference in New Issue