Snapshot/Restore: add ability to retrieve currently running snapshots

Together with #8782 it should help in the situations simliar to #8887 by adding an ability to get information about currently running snapshot without accessing the repository itself.

Closes #8887
This commit is contained in:
Igor Motov 2015-01-23 15:31:29 -05:00
parent 06417a85e0
commit c5ebdf11bb
7 changed files with 66 additions and 16 deletions

View File

@ -176,6 +176,13 @@ All snapshots currently stored in the repository can be listed using the followi
$ curl -XGET "localhost:9200/_snapshot/my_backup/_all"
-----------------------------------
coming[2.0] A currently running snapshot can be retrieved using the following command:
[source,shell]
-----------------------------------
$ curl -XGET "localhost:9200/_snapshot/my_backup/_current"
-----------------------------------
A snapshot can be deleted from the repository using the following command:
[source,shell]

View File

@ -34,6 +34,9 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
*/
public class GetSnapshotsRequest extends MasterNodeOperationRequest<GetSnapshotsRequest> {
public static final String ALL_SNAPSHOTS = "_all";
public static final String CURRENT_SNAPSHOT = "_current";
private String repository;
private String[] snapshots = Strings.EMPTY_ARRAY;

View File

@ -70,6 +70,16 @@ public class GetSnapshotsRequestBuilder extends MasterNodeOperationRequestBuilde
return this;
}
/**
* Makes the request to return the current snapshot
*
* @return this builder
*/
public GetSnapshotsRequestBuilder setCurrentSnapshot() {
request.snapshots(new String[] {GetSnapshotsRequest.CURRENT_SNAPSHOT});
return this;
}
/**
* Adds additional snapshots to the list of snapshots to return
*

View File

@ -52,7 +52,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeOperationAct
@Override
protected String executor() {
return ThreadPool.Names.SNAPSHOT;
return ThreadPool.Names.GENERIC;
}
@Override
@ -72,26 +72,35 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeOperationAct
@Override
protected void masterOperation(final GetSnapshotsRequest request, ClusterState state, final ActionListener<GetSnapshotsResponse> listener) throws ElasticsearchException {
SnapshotId[] snapshotIds = new SnapshotId[request.snapshots().length];
for (int i = 0; i < snapshotIds.length; i++) {
snapshotIds[i] = new SnapshotId(request.repository(), request.snapshots()[i]);
}
try {
ImmutableList.Builder<SnapshotInfo> snapshotInfoBuilder = ImmutableList.builder();
if (snapshotIds.length > 0) {
for (SnapshotId snapshotId : snapshotIds) {
snapshotInfoBuilder.add(new SnapshotInfo(snapshotsService.snapshot(snapshotId)));
}
} else {
if (isAllSnapshots(request.snapshots())) {
ImmutableList<Snapshot> snapshots = snapshotsService.snapshots(request.repository());
for (Snapshot snapshot : snapshots) {
snapshotInfoBuilder.add(new SnapshotInfo(snapshot));
}
} else if (isCurrentSnapshots(request.snapshots())) {
ImmutableList<Snapshot> snapshots = snapshotsService.currentSnapshots(request.repository());
for (Snapshot snapshot : snapshots) {
snapshotInfoBuilder.add(new SnapshotInfo(snapshot));
}
} else {
for (int i = 0; i < request.snapshots().length; i++) {
SnapshotId snapshotId = new SnapshotId(request.repository(), request.snapshots()[i]);
snapshotInfoBuilder.add(new SnapshotInfo(snapshotsService.snapshot(snapshotId)));
}
}
listener.onResponse(new GetSnapshotsResponse(snapshotInfoBuilder.build()));
} catch (Throwable t) {
listener.onFailure(t);
}
}
private boolean isAllSnapshots(String[] snapshots) {
return (snapshots.length == 0) || (snapshots.length == 1 && GetSnapshotsRequest.ALL_SNAPSHOTS.equalsIgnoreCase(snapshots[0]));
}
private boolean isCurrentSnapshots(String[] snapshots) {
return (snapshots.length == 1 && GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshots[0]));
}
}

View File

@ -47,9 +47,6 @@ public class RestGetSnapshotsAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
String repository = request.param("repository");
String[] snapshots = request.paramAsStringArray("snapshot", Strings.EMPTY_ARRAY);
if (snapshots.length == 1 && "_all".equalsIgnoreCase(snapshots[0])) {
snapshots = Strings.EMPTY_ARRAY;
}
GetSnapshotsRequest getSnapshotsRequest = getSnapshotsRequest(repository).snapshots(snapshots);
getSnapshotsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getSnapshotsRequest.masterNodeTimeout()));
client.admin().cluster().getSnapshots(getSnapshotsRequest, new RestToXContentListener<GetSnapshotsResponse>(channel));

View File

@ -161,6 +161,22 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
return ImmutableList.copyOf(snapshotList);
}
/**
* Returns a list of currently running snapshots from repository sorted by snapshot creation date
*
* @param repositoryName repository name
* @return list of snapshots
*/
public ImmutableList<Snapshot> currentSnapshots(String repositoryName) {
List<Snapshot> snapshotList = newArrayList();
ImmutableList<SnapshotMetaData.Entry> entries = currentSnapshots(repositoryName, null);
for (SnapshotMetaData.Entry entry : entries) {
snapshotList.add(inProgressSnapshot(entry));
}
CollectionUtil.timSort(snapshotList);
return ImmutableList.copyOf(snapshotList);
}
/**
* Initializes the snapshotting process.
* <p/>

View File

@ -1325,7 +1325,6 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
// Pick one node and block it
String blockedNode = blockNodeWithIndex("test-idx");
logger.info("--> snapshot");
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
@ -1358,10 +1357,16 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
}
}
logger.info("--> checking that _current returns the currently running snapshot", blockedNode);
GetSnapshotsResponse getResponse = client.admin().cluster().prepareGetSnapshots("test-repo").setCurrentSnapshot().execute().actionGet();
assertThat(getResponse.getSnapshots().size(), equalTo(1));
SnapshotInfo snapshotInfo = getResponse.getSnapshots().get(0);
assertThat(snapshotInfo.state(), equalTo(SnapshotState.IN_PROGRESS));
logger.info("--> unblocking blocked node");
unblockNode(blockedNode);
SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600));
snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600));
logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size());
logger.info("--> done");
@ -1381,6 +1386,9 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
response = client.admin().cluster().prepareSnapshotStatus().execute().actionGet();
assertThat(response.getSnapshots().size(), equalTo(0));
logger.info("--> checking that _current no longer returns the snapshot", blockedNode);
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").addSnapshots("_current").execute().actionGet().getSnapshots().isEmpty(), equalTo(true));
try {
client.admin().cluster().prepareSnapshotStatus("test-repo").addSnapshots("test-snap-doesnt-exist").execute().actionGet();
fail();