diff --git a/docs/reference/modules/snapshots.asciidoc b/docs/reference/modules/snapshots.asciidoc index b3964af4fb2..d1d07a74aa0 100644 --- a/docs/reference/modules/snapshots.asciidoc +++ b/docs/reference/modules/snapshots.asciidoc @@ -175,7 +175,7 @@ started by mistake. [float] === Restore -A snapshot can be restored using this following command: +A snapshot can be restored using the following command: [source,shell] ----------------------------------- @@ -205,3 +205,37 @@ closed. The restore operation automatically opens restored indices if they were didn't exist in the cluster. If cluster state is restored, the restored templates that don't currently exist in the cluster are added and existing templates with the same name are replaced by the restored templates. The restored persistent settings are added to the existing persistent settings. + + +[float] +=== Snapshot status + +A list of currently running snapshots with their detailed status information can be obtained using the following command: + +[source,shell] +----------------------------------- +$ curl -XGET "localhost:9200/_snapshot/_status" +----------------------------------- + +In this format, the command will return information about all currently running snapshots. By specifying a repository name, it's possible +to limit the results to a particular repository: + +[source,shell] +----------------------------------- +$ curl -XGET "localhost:9200/_snapshot/my_backup/_status" +----------------------------------- + +If both repository name and snapshot id are specified, this command will return detailed status information for the given snapshot even +if it's not currently running: + +[source,shell] +----------------------------------- +$ curl -XGET "localhost:9200/_snapshot/my_backup/snapshot_1/_status" +----------------------------------- + +Multiple ids are also supported: + +[source,shell] +----------------------------------- +$ curl -XGET "localhost:9200/_snapshot/my_backup/snapshot_1,snapshot_2/_status" +----------------------------------- diff --git a/rest-api-spec/api/snapshot.status.json b/rest-api-spec/api/snapshot.status.json new file mode 100644 index 00000000000..a258713f601 --- /dev/null +++ b/rest-api-spec/api/snapshot.status.json @@ -0,0 +1,27 @@ +{ + "snapshot.status": { + "documentation": "http://www.elasticsearch.org/guide/en/elasticsearch/reference/master/modules-snapshots.html", + "methods": ["GET"], + "url": { + "path": "/_snapshot/_status", + "paths": ["/_snapshot/_status", "/_snapshot/{repository}/_status", "/_snapshot/{repository}/{snapshot}/_status"], + "parts": { + "repository": { + "type": "string", + "description": "A repository name" + }, + "snapshot": { + "type": "list", + "description": "A comma-separated list of snapshot names" + } + }, + "params": { + "master_timeout": { + "type" : "time", + "description" : "Explicit operation timeout for connection to master node" + } + } + }, + "body": null + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/ActionModule.java b/src/main/java/org/elasticsearch/action/ActionModule.java index 8ef29d61ef4..3eda2b1e036 100644 --- a/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/src/main/java/org/elasticsearch/action/ActionModule.java @@ -52,6 +52,8 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsAction; import org.elasticsearch.action.admin.cluster.snapshots.get.TransportGetSnapshotsAction; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotAction; import org.elasticsearch.action.admin.cluster.snapshots.restore.TransportRestoreSnapshotAction; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusAction; +import org.elasticsearch.action.admin.cluster.snapshots.status.TransportSnapshotsStatusAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction; @@ -214,6 +216,7 @@ public class ActionModule extends AbstractModule { registerAction(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class); registerAction(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class); registerAction(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class); + registerAction(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class); registerAction(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class); registerAction(IndicesStatusAction.INSTANCE, TransportIndicesStatusAction.class); diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStage.java b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStage.java new file mode 100644 index 00000000000..38be8b24d42 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStage.java @@ -0,0 +1,98 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.status; + +import org.elasticsearch.ElasticsearchIllegalArgumentException; + +/** + */ +public enum SnapshotIndexShardStage { + + /** + * Snapshot hasn't started yet + */ + INIT((byte)0, false), + /** + * Index files are being copied + */ + STARTED((byte)1, false), + /** + * Snapshot metadata is being written + */ + FINALIZE((byte)2, false), + /** + * Snapshot completed successfully + */ + DONE((byte)3, true), + /** + * Snapshot failed + */ + FAILURE((byte)4, true); + + private byte value; + + private boolean completed; + + private SnapshotIndexShardStage(byte value, boolean completed) { + this.value = value; + this.completed = completed; + } + + /** + * Returns code that represents the snapshot state + * + * @return code for the state + */ + public byte value() { + return value; + } + + /** + * Returns true if snapshot completed (successfully or not) + * + * @return true if snapshot completed, false otherwise + */ + public boolean completed() { + return completed; + } + + /** + * Generate snapshot state from code + * + * @param value the state code + * @return state + */ + public static SnapshotIndexShardStage fromValue(byte value) { + switch (value) { + case 0: + return INIT; + case 1: + return STARTED; + case 2: + return FINALIZE; + case 3: + return DONE; + case 4: + return FAILURE; + default: + throw new ElasticsearchIllegalArgumentException("No snapshot shard stage for value [" + value + "]"); + } + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java new file mode 100644 index 00000000000..9d0546b8d72 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java @@ -0,0 +1,157 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.status; + +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; + +import java.io.IOException; + +/** + */ +public class SnapshotIndexShardStatus extends BroadcastShardOperationResponse implements ToXContent { + + private SnapshotIndexShardStage stage = SnapshotIndexShardStage.INIT; + + private SnapshotStats stats; + + private String nodeId; + + private String failure; + + private SnapshotIndexShardStatus() { + } + + SnapshotIndexShardStatus(String index, int shardId, SnapshotIndexShardStage stage) { + super(index, shardId); + this.stage = stage; + } + + SnapshotIndexShardStatus(ShardId shardId, IndexShardSnapshotStatus indexShardStatus) { + this(shardId, indexShardStatus, null); + } + + SnapshotIndexShardStatus(ShardId shardId, IndexShardSnapshotStatus indexShardStatus, String nodeId) { + super(shardId.getIndex(), shardId.getId()); + switch (indexShardStatus.stage()) { + case INIT: + stage = SnapshotIndexShardStage.INIT; + break; + case STARTED: + stage = SnapshotIndexShardStage.STARTED; + break; + case FINALIZE: + stage = SnapshotIndexShardStage.FINALIZE; + break; + case DONE: + stage = SnapshotIndexShardStage.DONE; + break; + case FAILURE: + stage = SnapshotIndexShardStage.FAILURE; + break; + default: + throw new ElasticsearchIllegalArgumentException("Unknown stage type " + indexShardStatus.stage()); + } + stats = new SnapshotStats(indexShardStatus); + failure = indexShardStatus.failure(); + this.nodeId = nodeId; + } + + /** + * Returns snapshot stage + */ + public SnapshotIndexShardStage getStage() { + return stage; + } + + /** + * Returns snapshot stats + */ + public SnapshotStats getStats() { + return stats; + } + + /** + * Returns node id of the node where snapshot is currently running + */ + public String getNodeId() { + return nodeId; + } + + /** + * Returns reason for snapshot failure + */ + public String getFailure() { + return failure; + } + + + public static SnapshotIndexShardStatus readShardSnapshotStatus(StreamInput in) throws IOException { + SnapshotIndexShardStatus shardStatus = new SnapshotIndexShardStatus(); + shardStatus.readFrom(in); + return shardStatus; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeByte(stage.value()); + stats.writeTo(out); + out.writeOptionalString(nodeId); + out.writeOptionalString(failure); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + stage = SnapshotIndexShardStage.fromValue(in.readByte()); + stats = SnapshotStats.readSnapshotStats(in); + nodeId = in.readOptionalString(); + failure = in.readOptionalString(); + } + + static final class Fields { + static final XContentBuilderString STAGE = new XContentBuilderString("stage"); + static final XContentBuilderString REASON = new XContentBuilderString("reason"); + static final XContentBuilderString NODE = new XContentBuilderString("node"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Integer.toString(getShardId())); + builder.field(Fields.STAGE, getStage()); + stats.toXContent(builder, params); + if (getNodeId() != null) { + builder.field(Fields.NODE, getNodeId()); + } + if (getFailure() != null) { + builder.field(Fields.REASON, getFailure()); + } + builder.endObject(); + return builder; + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexStatus.java b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexStatus.java new file mode 100644 index 00000000000..961914e0688 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexStatus.java @@ -0,0 +1,108 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.status; + +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; + +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; + +/** + * Represents snapshot status of all shards in the index + */ +public class SnapshotIndexStatus implements Iterable, ToXContent { + + private final String index; + + private final Map indexShards; + + private final SnapshotShardsStats shardsStats; + + private final SnapshotStats stats; + + SnapshotIndexStatus(String index, Collection shards) { + this.index = index; + + ImmutableMap.Builder builder = ImmutableMap.builder(); + stats = new SnapshotStats(); + for (SnapshotIndexShardStatus shard : shards) { + builder.put(shard.getShardId(), shard); + stats.add(shard.getStats()); + } + shardsStats = new SnapshotShardsStats(shards); + indexShards = builder.build(); + } + + /** + * Returns the index name + */ + public String getIndex() { + return this.index; + } + + /** + * A shard id to index snapshot shard status map + */ + public Map getShards() { + return this.indexShards; + } + + /** + * Shards stats + */ + public SnapshotShardsStats getShardsStats() { + return shardsStats; + } + + /** + * Returns snapshot stats + */ + public SnapshotStats getStats() { + return stats; + } + + @Override + public Iterator iterator() { + return indexShards.values().iterator(); + } + + static final class Fields { + static final XContentBuilderString SHARDS = new XContentBuilderString("shards"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(getIndex(), XContentBuilder.FieldCaseConversion.NONE); + shardsStats.toXContent(builder, params); + stats.toXContent(builder, params); + builder.startObject(Fields.SHARDS); + for (SnapshotIndexShardStatus shard : indexShards.values()) { + shard.toXContent(builder, params); + } + builder.endObject(); + builder.endObject(); + return builder; + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotShardsStats.java b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotShardsStats.java new file mode 100644 index 00000000000..8eb9b35b040 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotShardsStats.java @@ -0,0 +1,132 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.status; + +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; + +import java.io.IOException; +import java.util.Collection; + +/** + * Status of a snapshot shards + */ +public class SnapshotShardsStats implements ToXContent { + + private int initializingShards; + private int startedShards; + private int finalizingShards; + private int doneShards; + private int failedShards; + private int totalShards; + + SnapshotShardsStats(Collection shards) { + for (SnapshotIndexShardStatus shard : shards) { + totalShards++; + switch (shard.getStage()) { + case INIT: + initializingShards++; + break; + case STARTED: + startedShards++; + break; + case FINALIZE: + finalizingShards++; + break; + case DONE: + doneShards++; + break; + case FAILURE: + failedShards++; + break; + default: + throw new ElasticsearchIllegalArgumentException("Unknown stage type " + shard.getStage()); + } + } + } + + /** + * Number of shards with the snapshot in the initializing stage + */ + public int getInitializingShards() { + return initializingShards; + } + + /** + * Number of shards with the snapshot in the started stage + */ + public int getStartedShards() { + return startedShards; + } + + /** + * Number of shards with the snapshot in the finalizing stage + */ + public int getFinalizingShards() { + return finalizingShards; + } + + /** + * Number of shards with completed snapshot + */ + public int getDoneShards() { + return doneShards; + } + + /** + * Number of shards with failed snapshot + */ + public int getFailedShards() { + return failedShards; + } + + /** + * Total number of shards + */ + public int getTotalShards() { + return totalShards; + } + + static final class Fields { + static final XContentBuilderString SHARDS_STATS = new XContentBuilderString("shards_stats"); + static final XContentBuilderString INITIALIZING = new XContentBuilderString("initializing"); + static final XContentBuilderString STARTED = new XContentBuilderString("started"); + static final XContentBuilderString FINALIZING = new XContentBuilderString("finalizing"); + static final XContentBuilderString DONE = new XContentBuilderString("done"); + static final XContentBuilderString FAILED = new XContentBuilderString("failed"); + static final XContentBuilderString TOTAL = new XContentBuilderString("total"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(Fields.SHARDS_STATS); + builder.field(Fields.INITIALIZING, getInitializingShards()); + builder.field(Fields.STARTED, getStartedShards()); + builder.field(Fields.FINALIZING, getFinalizingShards()); + builder.field(Fields.DONE, getDoneShards()); + builder.field(Fields.FAILED, getFailedShards()); + builder.field(Fields.TOTAL, getTotalShards()); + builder.endObject(); + return builder; + } + +} diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java new file mode 100644 index 00000000000..5dd2617fea2 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java @@ -0,0 +1,181 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.status; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; + +import java.io.IOException; + +/** + */ +public class SnapshotStats implements Streamable, ToXContent { + private long startTime; + + private long time; + + private int numberOfFiles; + + private int processedFiles; + + private long totalSize; + + private long processedSize; + + SnapshotStats() { + } + + SnapshotStats(IndexShardSnapshotStatus indexShardStatus) { + startTime = indexShardStatus.startTime(); + time = indexShardStatus.time(); + numberOfFiles = indexShardStatus.numberOfFiles(); + processedFiles = indexShardStatus.processedFiles(); + totalSize = indexShardStatus.totalSize(); + processedSize = indexShardStatus.processedSize(); + } + + /** + * Returns time when snapshot started + */ + public long getStartTime() { + return startTime; + } + + /** + * Returns snapshot running time + */ + public long getTime() { + return time; + } + + /** + * Returns number of files in the snapshot + */ + public int getNumberOfFiles() { + return numberOfFiles; + } + + /** + * Returns number of files in the snapshot that were processed so far + */ + public int getProcessedFiles() { + return processedFiles; + } + + /** + * Returns total size of files in the snapshot + */ + public long getTotalSize() { + return totalSize; + } + + /** + * Returns total size of files in the snapshot that were processed so far + */ + public long getProcessedSize() { + return processedSize; + } + + + public static SnapshotStats readSnapshotStats(StreamInput in) throws IOException { + SnapshotStats stats = new SnapshotStats(); + stats.readFrom(in); + return stats; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(startTime); + out.writeVLong(time); + + out.writeVInt(numberOfFiles); + out.writeVInt(processedFiles); + + out.writeVLong(totalSize); + out.writeVLong(processedSize); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + startTime = in.readVLong(); + time = in.readVLong(); + + numberOfFiles = in.readVInt(); + processedFiles = in.readVInt(); + + totalSize = in.readVLong(); + processedSize = in.readVLong(); + } + + static final class Fields { + static final XContentBuilderString STATS = new XContentBuilderString("stats"); + static final XContentBuilderString NUMBER_OF_FILES = new XContentBuilderString("number_of_files"); + static final XContentBuilderString PROCESSED_FILES = new XContentBuilderString("processed_files"); + static final XContentBuilderString TOTAL_SIZE_IN_BYTES = new XContentBuilderString("total_size_in_bytes"); + static final XContentBuilderString TOTAL_SIZE = new XContentBuilderString("total_size"); + static final XContentBuilderString PROCESSED_SIZE_IN_BYTES = new XContentBuilderString("processed_size_in_bytes"); + static final XContentBuilderString PROCESSED_SIZE = new XContentBuilderString("processed_size"); + static final XContentBuilderString START_TIME_IN_MILLIS = new XContentBuilderString("start_time_in_millis"); + static final XContentBuilderString TIME_IN_MILLIS = new XContentBuilderString("time_in_millis"); + static final XContentBuilderString TIME = new XContentBuilderString("time"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(Fields.STATS); + builder.field(Fields.NUMBER_OF_FILES, getNumberOfFiles()); + builder.field(Fields.PROCESSED_FILES, getProcessedFiles()); + builder.byteSizeField(Fields.TOTAL_SIZE_IN_BYTES, Fields.TOTAL_SIZE, getTotalSize()); + builder.byteSizeField(Fields.PROCESSED_SIZE_IN_BYTES, Fields.PROCESSED_SIZE, getProcessedSize()); + builder.field(Fields.START_TIME_IN_MILLIS, getStartTime()); + builder.timeValueField(Fields.TIME_IN_MILLIS, Fields.TIME, getTime()); + builder.endObject(); + return builder; + } + + void add(SnapshotStats stats) { + numberOfFiles += stats.numberOfFiles; + processedFiles += stats.processedFiles; + + totalSize += stats.totalSize; + processedSize += stats.processedSize; + + + if (startTime == 0) { + // First time here + startTime = stats.startTime; + time = stats.time; + } else { + // The time the last snapshot ends + long endTime = Math.max(startTime + time, stats.startTime + stats.time); + + // The time the first snapshot starts + startTime = Math.min(startTime, stats.startTime); + + // Update duration + time = endTime - startTime; + } + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java new file mode 100644 index 00000000000..79da77b89a6 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java @@ -0,0 +1,198 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.status; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.cluster.metadata.SnapshotId; +import org.elasticsearch.cluster.metadata.SnapshotMetaData.State; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static com.google.common.collect.Lists.newArrayList; +import static com.google.common.collect.Sets.newHashSet; + +/** + * Status of a snapshot + */ +public class SnapshotStatus implements ToXContent, Streamable { + + private SnapshotId snapshotId; + + private State state; + + private ImmutableList shards; + + private ImmutableMap indicesStatus; + + private SnapshotShardsStats shardsStats; + + private SnapshotStats stats; + + + SnapshotStatus(SnapshotId snapshotId, State state, ImmutableList shards) { + this.snapshotId = snapshotId; + this.state = state; + this.shards = shards; + shardsStats = new SnapshotShardsStats(shards); + updateShardStats(); + } + + SnapshotStatus() { + } + + /** + * Returns snapshot id + */ + public SnapshotId getSnapshotId() { + return snapshotId; + } + + /** + * Returns snapshot state + */ + public State getState() { + return state; + } + + /** + * Returns list of snapshot shards + */ + public List getShards() { + return shards; + } + + public SnapshotShardsStats getShardsStats() { + return shardsStats; + } + + /** + * Returns list of snapshot indices + */ + public Map getIndices() { + if (this.indicesStatus != null) { + return this.indicesStatus; + } + + ImmutableMap.Builder indicesStatus = ImmutableMap.builder(); + + Set indices = newHashSet(); + for (SnapshotIndexShardStatus shard : shards) { + indices.add(shard.getIndex()); + } + + for (String index : indices) { + List shards = newArrayList(); + for (SnapshotIndexShardStatus shard : this.shards) { + if (shard.getIndex().equals(index)) { + shards.add(shard); + } + } + indicesStatus.put(index, new SnapshotIndexStatus(index, shards)); + } + this.indicesStatus = indicesStatus.build(); + return this.indicesStatus; + + } + + @Override + public void readFrom(StreamInput in) throws IOException { + snapshotId = SnapshotId.readSnapshotId(in); + state = State.fromValue(in.readByte()); + int size = in.readVInt(); + ImmutableList.Builder builder = ImmutableList.builder(); + for (int i = 0; i < size; i++) { + builder.add(SnapshotIndexShardStatus.readShardSnapshotStatus(in)); + } + shards = builder.build(); + updateShardStats(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + snapshotId.writeTo(out); + out.writeByte(state.value()); + out.writeVInt(shards.size()); + for (SnapshotIndexShardStatus shard : shards) { + shard.writeTo(out); + } + } + + /** + * Reads snapshot status from stream input + * + * @param in stream input + * @return deserialized snapshot status + * @throws IOException + */ + public static SnapshotStatus readSnapshotStatus(StreamInput in) throws IOException { + SnapshotStatus snapshotInfo = new SnapshotStatus(); + snapshotInfo.readFrom(in); + return snapshotInfo; + } + + /** + * Returns number of files in the snapshot + */ + public SnapshotStats getStats() { + return stats; + } + + static final class Fields { + static final XContentBuilderString SNAPSHOT = new XContentBuilderString("snapshot"); + static final XContentBuilderString REPOSITORY = new XContentBuilderString("repository"); + static final XContentBuilderString STATE = new XContentBuilderString("state"); + static final XContentBuilderString INDICES = new XContentBuilderString("indices"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(Fields.SNAPSHOT, snapshotId.getSnapshot()); + builder.field(Fields.REPOSITORY, snapshotId.getRepository()); + builder.field(Fields.STATE, state.name()); + shardsStats.toXContent(builder, params); + stats.toXContent(builder, params); + builder.startObject(Fields.INDICES); + for (SnapshotIndexStatus indexStatus : getIndices().values()) { + indexStatus.toXContent(builder, params); + } + builder.endObject(); + builder.endObject(); + return builder; + } + + private void updateShardStats() { + stats = new SnapshotStats(); + shardsStats = new SnapshotShardsStats(shards); + for (SnapshotIndexShardStatus shard : shards) { + stats.add(shard.getStats()); + } + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusAction.java new file mode 100644 index 00000000000..0e4ba489020 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusAction.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.status; + +import org.elasticsearch.action.admin.cluster.ClusterAction; +import org.elasticsearch.client.ClusterAdminClient; + +/** + * Snapshots status action + */ +public class SnapshotsStatusAction extends ClusterAction { + + public static final SnapshotsStatusAction INSTANCE = new SnapshotsStatusAction(); + public static final String NAME = "cluster/snapshot/status"; + + private SnapshotsStatusAction() { + super(NAME); + } + + @Override + public SnapshotsStatusResponse newResponse() { + return new SnapshotsStatusResponse(); + } + + @Override + public SnapshotsStatusRequestBuilder newRequestBuilder(ClusterAdminClient client) { + return new SnapshotsStatusRequestBuilder(client); + } +} + diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java new file mode 100644 index 00000000000..b5b3815d5d3 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java @@ -0,0 +1,129 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.status; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.MasterNodeOperationRequest; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +/** + * Get snapshot status request + */ +public class SnapshotsStatusRequest extends MasterNodeOperationRequest { + + private String repository; + + private String[] snapshots = Strings.EMPTY_ARRAY; + + SnapshotsStatusRequest() { + } + + /** + * Constructs a new get snapshots request with given repository name and list of snapshots + * + * @param repository repository name + * @param snapshots list of snapshots + */ + public SnapshotsStatusRequest(String repository, String[] snapshots) { + this.repository = repository; + this.snapshots = snapshots; + } + + /** + * Constructs a new get snapshots request with given repository name + * + * @param repository repository name + */ + public SnapshotsStatusRequest(String repository) { + this.repository = repository; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (repository == null) { + validationException = addValidationError("repository is missing", validationException); + } + if (snapshots == null) { + validationException = addValidationError("snapshots is null", validationException); + } + return validationException; + } + + /** + * Sets repository name + * + * @param repository repository name + * @return this request + */ + public SnapshotsStatusRequest repository(String repository) { + this.repository = repository; + return this; + } + + /** + * Returns repository name + * + * @return repository name + */ + public String repository() { + return this.repository; + } + + /** + * Returns the names of the snapshots. + * + * @return the names of snapshots + */ + public String[] snapshots() { + return this.snapshots; + } + + /** + * Sets the list of snapshots to be returned + * + * @param snapshots + * @return this request + */ + public SnapshotsStatusRequest snapshots(String[] snapshots) { + this.snapshots = snapshots; + return this; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + repository = in.readString(); + snapshots = in.readStringArray(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(repository); + out.writeStringArray(snapshots); + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequestBuilder.java b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequestBuilder.java new file mode 100644 index 00000000000..e29fe8f56e6 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequestBuilder.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.status; + +import com.google.common.collect.ObjectArrays; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.client.ClusterAdminClient; +import org.elasticsearch.client.internal.InternalClusterAdminClient; + +/** + * Snapshots status request builder + */ +public class SnapshotsStatusRequestBuilder extends MasterNodeOperationRequestBuilder { + + /** + * Constructs the new snapshotstatus request + * + * @param clusterAdminClient cluster admin client + */ + public SnapshotsStatusRequestBuilder(ClusterAdminClient clusterAdminClient) { + super((InternalClusterAdminClient) clusterAdminClient, new SnapshotsStatusRequest()); + } + + /** + * Constructs the new snapshot status request with specified repository + * + * @param clusterAdminClient cluster admin client + * @param repository repository name + */ + public SnapshotsStatusRequestBuilder(ClusterAdminClient clusterAdminClient, String repository) { + super((InternalClusterAdminClient) clusterAdminClient, new SnapshotsStatusRequest(repository)); + } + + /** + * Sets the repository name + * + * @param repository repository name + * @return this builder + */ + public SnapshotsStatusRequestBuilder setRepository(String repository) { + request.repository(repository); + return this; + } + + /** + * Sets list of snapshots to return + * + * @param snapshots list of snapshots + * @return this builder + */ + public SnapshotsStatusRequestBuilder setSnapshots(String... snapshots) { + request.snapshots(snapshots); + return this; + } + + /** + * Adds additional snapshots to the list of snapshots to return + * + * @param snapshots additional snapshots + * @return this builder + */ + public SnapshotsStatusRequestBuilder addSnapshots(String... snapshots) { + request.snapshots(ObjectArrays.concat(request.snapshots(), snapshots, String.class)); + return this; + } + + @Override + protected void doExecute(ActionListener listener) { + ((ClusterAdminClient) client).snapshotsStatus(request, listener); + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponse.java b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponse.java new file mode 100644 index 00000000000..6191a45d6b3 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponse.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.status; + +import com.google.common.collect.ImmutableList; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; + +import java.io.IOException; + +/** + * Snapshot status response + */ +public class SnapshotsStatusResponse extends ActionResponse implements ToXContent { + + private ImmutableList snapshots = ImmutableList.of(); + + SnapshotsStatusResponse() { + } + + SnapshotsStatusResponse(ImmutableList snapshots) { + this.snapshots = snapshots; + } + + /** + * Returns the list of snapshots + * + * @return the list of snapshots + */ + public ImmutableList getSnapshots() { + return snapshots; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + int size = in.readVInt(); + ImmutableList.Builder builder = ImmutableList.builder(); + for (int i = 0; i < size; i++) { + builder.add(SnapshotStatus.readSnapshotStatus(in)); + } + snapshots = builder.build(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(snapshots.size()); + for (SnapshotStatus snapshotInfo : snapshots) { + snapshotInfo.writeTo(out); + } + } + + static final class Fields { + static final XContentBuilderString SNAPSHOTS = new XContentBuilderString("snapshots"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray(Fields.SNAPSHOTS); + for (SnapshotStatus snapshot : snapshots) { + snapshot.toXContent(builder, params); + } + builder.endArray(); + return builder; + } + +} diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java new file mode 100644 index 00000000000..65dedb20319 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java @@ -0,0 +1,308 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.status; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.nodes.*; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.SnapshotId; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicReferenceArray; + +/** + * Transport client that collects snapshot shard statuses from data nodes + */ +public class TransportNodesSnapshotsStatus extends TransportNodesOperationAction { + + private final SnapshotsService snapshotsService; + + @Inject + public TransportNodesSnapshotsStatus(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, SnapshotsService snapshotsService) { + super(settings, clusterName, threadPool, clusterService, transportService); + this.snapshotsService = snapshotsService; + } + + public void status(String[] nodesIds, SnapshotId[] snapshotIds, @Nullable TimeValue timeout, ActionListener listener) { + execute(new Request(nodesIds).snapshotIds(snapshotIds).timeout(timeout), listener); + } + + @Override + protected String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + protected String transportAction() { + return "cluster/snapshot/status/nodes"; + } + + @Override + protected boolean transportCompress() { + return true; // compress since the metadata can become large + } + + @Override + protected Request newRequest() { + return new Request(); + } + + @Override + protected NodeRequest newNodeRequest() { + return new NodeRequest(); + } + + @Override + protected NodeRequest newNodeRequest(String nodeId, Request request) { + return new NodeRequest(nodeId, request); + } + + @Override + protected NodeSnapshotStatus newNodeResponse() { + return new NodeSnapshotStatus(); + } + + @Override + protected NodesSnapshotStatus newResponse(Request request, AtomicReferenceArray responses) { + final List nodesList = Lists.newArrayList(); + final List failures = Lists.newArrayList(); + for (int i = 0; i < responses.length(); i++) { + Object resp = responses.get(i); + if (resp instanceof NodeSnapshotStatus) { // will also filter out null response for unallocated ones + nodesList.add((NodeSnapshotStatus) resp); + } else if (resp instanceof FailedNodeException) { + failures.add((FailedNodeException) resp); + } else { + logger.warn("unknown response type [{}], expected NodeSnapshotStatus or FailedNodeException", resp); + } + } + return new NodesSnapshotStatus(clusterName, nodesList.toArray(new NodeSnapshotStatus[nodesList.size()]), + failures.toArray(new FailedNodeException[failures.size()])); + } + + @Override + protected NodeSnapshotStatus nodeOperation(NodeRequest request) throws ElasticsearchException { + ImmutableMap.Builder> snapshotMapBuilder = ImmutableMap.builder(); + try { + String nodeId = clusterService.localNode().id(); + for (SnapshotId snapshotId : request.snapshotIds) { + ImmutableMap shardsStatus = snapshotsService.currentSnapshotShards(snapshotId); + if (shardsStatus == null) { + continue; + } + ImmutableMap.Builder shardMapBuilder = ImmutableMap.builder(); + for (ImmutableMap.Entry shardEntry : shardsStatus.entrySet()) { + SnapshotIndexShardStatus shardStatus; + IndexShardSnapshotStatus.Stage stage = shardEntry.getValue().stage(); + if (stage != IndexShardSnapshotStatus.Stage.DONE && stage != IndexShardSnapshotStatus.Stage.FAILURE) { + // Store node id for the snapshots that are currently running. + shardStatus = new SnapshotIndexShardStatus(shardEntry.getKey(), shardEntry.getValue(), nodeId); + } else { + shardStatus = new SnapshotIndexShardStatus(shardEntry.getKey(), shardEntry.getValue()); + } + shardMapBuilder.put(shardEntry.getKey(), shardStatus); + } + snapshotMapBuilder.put(snapshotId, shardMapBuilder.build()); + } + return new NodeSnapshotStatus(clusterService.localNode(), snapshotMapBuilder.build()); + } catch (Exception e) { + throw new ElasticsearchException("failed to load metadata", e); + } + } + + @Override + protected boolean accumulateExceptions() { + return true; + } + + static class Request extends NodesOperationRequest { + + private SnapshotId[] snapshotIds; + + public Request() { + } + + public Request(String[] nodesIds) { + super(nodesIds); + } + + public Request snapshotIds(SnapshotId[] snapshotIds) { + this.snapshotIds = snapshotIds; + return this; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + // This operation is never executed remotely + throw new UnsupportedOperationException("shouldn't be here"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + // This operation is never executed remotely + throw new UnsupportedOperationException("shouldn't be here"); + } + } + + public static class NodesSnapshotStatus extends NodesOperationResponse { + + private FailedNodeException[] failures; + + NodesSnapshotStatus() { + } + + public NodesSnapshotStatus(ClusterName clusterName, NodeSnapshotStatus[] nodes, FailedNodeException[] failures) { + super(clusterName, nodes); + this.failures = failures; + } + + public FailedNodeException[] failures() { + return failures; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + nodes = new NodeSnapshotStatus[in.readVInt()]; + for (int i = 0; i < nodes.length; i++) { + nodes[i] = new NodeSnapshotStatus(); + nodes[i].readFrom(in); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(nodes.length); + for (NodeSnapshotStatus response : nodes) { + response.writeTo(out); + } + } + } + + + static class NodeRequest extends NodeOperationRequest { + + private SnapshotId[] snapshotIds; + + NodeRequest() { + } + + NodeRequest(String nodeId, TransportNodesSnapshotsStatus.Request request) { + super(request, nodeId); + snapshotIds = request.snapshotIds; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + int n = in.readVInt(); + snapshotIds = new SnapshotId[n]; + for (int i = 0; i < n; i++) { + snapshotIds[i] = SnapshotId.readSnapshotId(in); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (snapshotIds != null) { + out.writeVInt(snapshotIds.length); + for (int i = 0; i < snapshotIds.length; i++) { + snapshotIds[i].writeTo(out); + } + } else { + out.writeVInt(0); + } + } + } + + public static class NodeSnapshotStatus extends NodeOperationResponse { + + private ImmutableMap> status; + + NodeSnapshotStatus() { + } + + public NodeSnapshotStatus(DiscoveryNode node, ImmutableMap> status) { + super(node); + this.status = status; + } + + public ImmutableMap> status() { + return status; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + int numberOfSnapshots = in.readVInt(); + ImmutableMap.Builder> snapshotMapBuilder = ImmutableMap.builder(); + for (int i = 0; i < numberOfSnapshots; i++) { + SnapshotId snapshotId = SnapshotId.readSnapshotId(in); + ImmutableMap.Builder shardMapBuilder = ImmutableMap.builder(); + int numberOfShards = in.readVInt(); + for (int j = 0; j < numberOfShards; j++) { + ShardId shardId = ShardId.readShardId(in); + SnapshotIndexShardStatus status = SnapshotIndexShardStatus.readShardSnapshotStatus(in); + shardMapBuilder.put(shardId, status); + } + snapshotMapBuilder.put(snapshotId, shardMapBuilder.build()); + } + status = snapshotMapBuilder.build(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (status != null) { + out.writeVInt(status.size()); + for (ImmutableMap.Entry> entry : status.entrySet()) { + entry.getKey().writeTo(out); + out.writeVInt(entry.getValue().size()); + for (ImmutableMap.Entry shardEntry : entry.getValue().entrySet()) { + shardEntry.getKey().writeTo(out); + shardEntry.getValue().writeTo(out); + } + } + } else { + out.writeVInt(0); + } + } + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java new file mode 100644 index 00000000000..f8f1b15d955 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -0,0 +1,222 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.status; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.SnapshotId; +import org.elasticsearch.cluster.metadata.SnapshotMetaData; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.Map; +import java.util.Set; + +import static com.google.common.collect.Maps.newHashMap; +import static com.google.common.collect.Sets.newHashSet; + +/** + */ +public class TransportSnapshotsStatusAction extends TransportMasterNodeOperationAction { + + private final SnapshotsService snapshotsService; + + private final TransportNodesSnapshotsStatus transportNodesSnapshotsStatus; + + @Inject + public TransportSnapshotsStatusAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, SnapshotsService snapshotsService, TransportNodesSnapshotsStatus transportNodesSnapshotsStatus) { + super(settings, transportService, clusterService, threadPool); + this.snapshotsService = snapshotsService; + this.transportNodesSnapshotsStatus = transportNodesSnapshotsStatus; + } + + @Override + protected String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + protected String transportAction() { + return SnapshotsStatusAction.NAME; + } + + @Override + protected SnapshotsStatusRequest newRequest() { + return new SnapshotsStatusRequest(); + } + + @Override + protected SnapshotsStatusResponse newResponse() { + return new SnapshotsStatusResponse(); + } + + @Override + protected void masterOperation(final SnapshotsStatusRequest request, + final ClusterState state, + final ActionListener listener) throws ElasticsearchException { + ImmutableList currentSnapshots = snapshotsService.currentSnapshots(request.repository(), request.snapshots()); + + if (currentSnapshots.isEmpty()) { + buildResponse(request, currentSnapshots, null); + } + + Set nodesIds = newHashSet(); + for (SnapshotMetaData.Entry entry : currentSnapshots) { + for (SnapshotMetaData.ShardSnapshotStatus status : entry.shards().values()) { + if (status.nodeId() != null) { + nodesIds.add(status.nodeId()); + } + } + } + + if (!nodesIds.isEmpty()) { + // There are still some snapshots running - check their progress + SnapshotId[] snapshotIds = new SnapshotId[currentSnapshots.size()]; + for (int i = 0; i < currentSnapshots.size(); i++) { + snapshotIds[i] = currentSnapshots.get(i).snapshotId(); + } + + transportNodesSnapshotsStatus.status(nodesIds.toArray(new String[nodesIds.size()]), + snapshotIds, request.masterNodeTimeout(), new ActionListener() { + @Override + public void onResponse(TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) { + ImmutableList currentSnapshots = + snapshotsService.currentSnapshots(request.repository(), request.snapshots()); + listener.onResponse(buildResponse(request, currentSnapshots, nodeSnapshotStatuses)); + } + + @Override + public void onFailure(Throwable e) { + listener.onFailure(e); + } + }); + } else { + // We don't have any in-progress shards, just return current stats + listener.onResponse(buildResponse(request, currentSnapshots, null)); + } + + } + + private SnapshotsStatusResponse buildResponse(SnapshotsStatusRequest request, ImmutableList currentSnapshots, + TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) { + // First process snapshot that are currently processed + ImmutableList.Builder builder = ImmutableList.builder(); + Set currentSnapshotIds = newHashSet(); + if (!currentSnapshots.isEmpty()) { + Map nodeSnapshotStatusMap; + if (nodeSnapshotStatuses != null) { + nodeSnapshotStatusMap = nodeSnapshotStatuses.getNodesMap(); + } else { + nodeSnapshotStatusMap = newHashMap(); + } + + for (SnapshotMetaData.Entry entry : currentSnapshots) { + currentSnapshotIds.add(entry.snapshotId()); + ImmutableList.Builder shardStatusBuilder = ImmutableList.builder(); + for (ImmutableMap.Entry shardEntry : entry.shards().entrySet()) { + SnapshotMetaData.ShardSnapshotStatus status = shardEntry.getValue(); + if (status.nodeId() != null) { + // We should have information about this shard from the shard: + TransportNodesSnapshotsStatus.NodeSnapshotStatus nodeStatus = nodeSnapshotStatusMap.get(status.nodeId()); + if (nodeStatus != null) { + ImmutableMap shardStatues = nodeStatus.status().get(entry.snapshotId()); + if (shardStatues != null) { + SnapshotIndexShardStatus shardStatus = shardStatues.get(shardEntry.getKey()); + if (shardStatus != null) { + // We have full information about this shard + shardStatusBuilder.add(shardStatus); + continue; + } + } + } + } + final SnapshotIndexShardStage stage; + switch (shardEntry.getValue().state()) { + case FAILED: + case ABORTED: + case MISSING: + stage = SnapshotIndexShardStage.FAILURE; + break; + case INIT: + case STARTED: + stage = SnapshotIndexShardStage.STARTED; + break; + case SUCCESS: + stage = SnapshotIndexShardStage.DONE; + break; + default: + throw new ElasticsearchIllegalArgumentException("Unknown snapshot state " + shardEntry.getValue().state()); + } + SnapshotIndexShardStatus shardStatus = new SnapshotIndexShardStatus(shardEntry.getKey().getIndex(), shardEntry.getKey().getId(), stage); + shardStatusBuilder.add(shardStatus); + } + builder.add(new SnapshotStatus(entry.snapshotId(), entry.state(), shardStatusBuilder.build())); + } + } + // Now add snapshots on disk that are not currently running + if (Strings.hasText(request.repository())) { + if (request.snapshots() != null && request.snapshots().length > 0) { + for (String snapshotName : request.snapshots()) { + SnapshotId snapshotId = new SnapshotId(request.repository(), snapshotName); + if (currentSnapshotIds.contains(snapshotId)) { + // This is a snapshot the is currently running - skipping + continue; + } + Snapshot snapshot = snapshotsService.snapshot(snapshotId); + ImmutableList.Builder shardStatusBuilder = ImmutableList.builder(); + if (snapshot.state().completed()) { + ImmutableMap shardStatues = snapshotsService.snapshotShards(snapshotId); + for (ImmutableMap.Entry shardStatus : shardStatues.entrySet()) { + shardStatusBuilder.add(new SnapshotIndexShardStatus(shardStatus.getKey(), shardStatus.getValue())); + } + final SnapshotMetaData.State state; + switch (snapshot.state()) { + case FAILED: + state = SnapshotMetaData.State.FAILED; + break; + case SUCCESS: + state = SnapshotMetaData.State.SUCCESS; + break; + default: + throw new ElasticsearchIllegalArgumentException("Unknown snapshot state " + snapshot.state()); + } + builder.add(new SnapshotStatus(snapshotId, state, shardStatusBuilder.build())); + } + } + } + } + + return new SnapshotsStatusResponse(builder.build()); + } + +} diff --git a/src/main/java/org/elasticsearch/client/ClusterAdminClient.java b/src/main/java/org/elasticsearch/client/ClusterAdminClient.java index cefe335e3c5..46cebf77301 100644 --- a/src/main/java/org/elasticsearch/client/ClusterAdminClient.java +++ b/src/main/java/org/elasticsearch/client/ClusterAdminClient.java @@ -69,6 +69,9 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequestBuilder; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -429,4 +432,19 @@ public interface ClusterAdminClient { */ PendingClusterTasksRequestBuilder preparePendingClusterTasks(); + /** + * Get snapshot status. + */ + ActionFuture snapshotsStatus(SnapshotsStatusRequest request); + + /** + * Get snapshot status. + */ + void snapshotsStatus(SnapshotsStatusRequest request, ActionListener listener); + + /** + * Get snapshot status. + */ + SnapshotsStatusRequestBuilder prepareSnapshotStatus(String repository); + } diff --git a/src/main/java/org/elasticsearch/client/Requests.java b/src/main/java/org/elasticsearch/client/Requests.java index 8d5d3f9ee1b..7750a6ba6e4 100644 --- a/src/main/java/org/elasticsearch/client/Requests.java +++ b/src/main/java/org/elasticsearch/client/Requests.java @@ -34,6 +34,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotReq import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; @@ -536,7 +537,7 @@ public class Requests { } /** - * Restores new snapshot + * Deletes a snapshot * * @param snapshot snapshot name * @param repository repository name @@ -545,4 +546,15 @@ public class Requests { public static DeleteSnapshotRequest deleteSnapshotRequest(String repository, String snapshot) { return new DeleteSnapshotRequest(repository, snapshot); } + + /** + * Get status of snapshots + * + * @param repository repository name + * @return snapshot status request + */ + public static SnapshotsStatusRequest snapshotsStatusRequest(String repository) { + return new SnapshotsStatusRequest(repository); + } + } diff --git a/src/main/java/org/elasticsearch/client/support/AbstractClusterAdminClient.java b/src/main/java/org/elasticsearch/client/support/AbstractClusterAdminClient.java index 6b6944eec8b..eb8a9d47b56 100644 --- a/src/main/java/org/elasticsearch/client/support/AbstractClusterAdminClient.java +++ b/src/main/java/org/elasticsearch/client/support/AbstractClusterAdminClient.java @@ -85,6 +85,10 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotA import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusAction; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequestBuilder; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder; @@ -399,4 +403,20 @@ public abstract class AbstractClusterAdminClient implements InternalClusterAdmin public RestoreSnapshotRequestBuilder prepareRestoreSnapshot(String repository, String snapshot) { return new RestoreSnapshotRequestBuilder(this, repository, snapshot); } + + + @Override + public ActionFuture snapshotsStatus(SnapshotsStatusRequest request) { + return execute(SnapshotsStatusAction.INSTANCE, request); + } + + @Override + public void snapshotsStatus(SnapshotsStatusRequest request, ActionListener listener) { + execute(SnapshotsStatusAction.INSTANCE, request, listener); + } + + @Override + public SnapshotsStatusRequestBuilder prepareSnapshotStatus(String repository) { + return new SnapshotsStatusRequestBuilder(this, repository); + } } diff --git a/src/main/java/org/elasticsearch/gateway/local/state/meta/TransportNodesListGatewayMetaState.java b/src/main/java/org/elasticsearch/gateway/local/state/meta/TransportNodesListGatewayMetaState.java index 1d133281fba..14be31fcb87 100644 --- a/src/main/java/org/elasticsearch/gateway/local/state/meta/TransportNodesListGatewayMetaState.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/meta/TransportNodesListGatewayMetaState.java @@ -107,6 +107,8 @@ public class TransportNodesListGatewayMetaState extends TransportNodesOperationA nodesList.add((NodeLocalGatewayMetaState) resp); } else if (resp instanceof FailedNodeException) { failures.add((FailedNodeException) resp); + } else { + logger.warn("unknown response type [{}], expected NodeLocalGatewayMetaState or FailedNodeException", resp); } } return new NodesLocalGatewayMetaState(clusterName, nodesList.toArray(new NodeLocalGatewayMetaState[nodesList.size()]), diff --git a/src/main/java/org/elasticsearch/gateway/local/state/shards/TransportNodesListGatewayStartedShards.java b/src/main/java/org/elasticsearch/gateway/local/state/shards/TransportNodesListGatewayStartedShards.java index 49edb0410bd..3163dea526e 100644 --- a/src/main/java/org/elasticsearch/gateway/local/state/shards/TransportNodesListGatewayStartedShards.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/shards/TransportNodesListGatewayStartedShards.java @@ -108,6 +108,8 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat nodesList.add((NodeLocalGatewayStartedShards) resp); } else if (resp instanceof FailedNodeException) { failures.add((FailedNodeException) resp); + } else { + logger.warn("unknown response type [{}], expected NodeLocalGatewayStartedShards or FailedNodeException", resp); } } return new NodesLocalGatewayStartedShards(clusterName, nodesList.toArray(new NodeLocalGatewayStartedShards[nodesList.size()]), diff --git a/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java b/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java index 760a39753e3..6a185f81b79 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java +++ b/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java @@ -60,4 +60,13 @@ public interface IndexShardRepository { */ void restore(SnapshotId snapshotId, ShardId shardId, ShardId snapshotShardId, RecoveryStatus recoveryStatus); + /** + * Retrieve shard snapshot status for the stored snapshot + * + * @param snapshotId snapshot id + * @param shardId shard id + * @return snapshot status + */ + IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, ShardId shardId); + } diff --git a/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java b/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java index cb9c7bec233..d335e03d9a9 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java +++ b/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java @@ -19,6 +19,9 @@ package org.elasticsearch.index.snapshots; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + /** * Represent shard snapshot status */ @@ -58,12 +61,18 @@ public class IndexShardSnapshotStatus { private int numberOfFiles; + private volatile int processedFiles; + private long totalSize; + private volatile long processedSize; + private long indexVersion; private boolean aborted; + private String failure; + /** * Returns current snapshot stage * @@ -145,6 +154,25 @@ public class IndexShardSnapshotStatus { this.totalSize = totalSize; } + /** + * Sets processed files stats + * + * @param numberOfFiles number of files in this snapshot + * @param totalSize total size of files in this snapshot + */ + public synchronized void processedFiles(int numberOfFiles, long totalSize) { + processedFiles = numberOfFiles; + processedSize = totalSize; + } + + /** + * Increments number of processed files + */ + public synchronized void addProcessedFile(long size) { + processedFiles++; + processedSize += size; + } + /** * Number of files * @@ -163,6 +191,25 @@ public class IndexShardSnapshotStatus { return totalSize; } + /** + * Number of processed files + * + * @return number of processed files + */ + public int processedFiles() { + return processedFiles; + } + + /** + * Size of processed files + * + * @return size of processed files + */ + public long processedSize() { + return processedSize; + } + + /** * Sets index version * @@ -180,4 +227,18 @@ public class IndexShardSnapshotStatus { public long indexVersion() { return indexVersion; } + + /** + * Sets the reason for the failure if the snapshot is in the {@link IndexShardSnapshotStatus.Stage#FAILURE} state + */ + public void failure(String failure) { + this.failure = failure; + } + + /** + * Returns the reason for the failure if the snapshot is in the {@link IndexShardSnapshotStatus.Stage#FAILURE} state + */ + public String failure() { + return failure; + } } diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java index b62d85d55aa..c47dedc6191 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java @@ -26,6 +26,7 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.common.blobstore.*; import org.elasticsearch.common.component.AbstractComponent; @@ -50,6 +51,7 @@ import org.elasticsearch.repositories.RepositoryName; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -130,6 +132,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements } catch (Throwable e) { snapshotStatus.time(System.currentTimeMillis() - snapshotStatus.startTime()); snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FAILURE); + snapshotStatus.failure(ExceptionsHelper.detailedMessage(e)); if (e instanceof IndexShardSnapshotFailedException) { throw (IndexShardSnapshotFailedException) e; } else { @@ -154,6 +157,23 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements } } + /** + * {@inheritDoc} + */ + @Override + public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, ShardId shardId) { + Context context = new Context(snapshotId, shardId); + BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot(); + IndexShardSnapshotStatus status = new IndexShardSnapshotStatus(); + status.updateStage(IndexShardSnapshotStatus.Stage.DONE); + status.startTime(snapshot.startTime()); + status.files(snapshot.numberOfFiles(), snapshot.totalSize()); + // The snapshot is done which means the number of processed files is the same as total + status.processedFiles(snapshot.numberOfFiles(), snapshot.totalSize()); + status.time(snapshot.time()); + return status; + } + /** * Delete shard snapshot * @@ -266,6 +286,19 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements cleanup(newSnapshotsList, blobs); } + /** + * Loads information about shard snapshot + */ + public BlobStoreIndexShardSnapshot loadSnapshot() { + BlobStoreIndexShardSnapshot snapshot; + try { + snapshot = readSnapshot(blobContainer.readBlobFully(snapshotBlobName(snapshotId))); + } catch (IOException ex) { + throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex); + } + return snapshot; + } + /** * Removes all unreferenced files from the repository * @@ -343,6 +376,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements } return new BlobStoreIndexShardSnapshots(snapshots); } + } /** @@ -370,7 +404,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements /** * Create snapshot from index commit point * - * @param snapshotIndexCommit + * @param snapshotIndexCommit snapshot commit point */ public void snapshot(SnapshotIndexCommit snapshotIndexCommit) { logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, repositoryName); @@ -385,14 +419,12 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements long generation = findLatestFileNameGeneration(blobs); BlobStoreIndexShardSnapshots snapshots = buildBlobStoreIndexShardSnapshots(blobs); - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.STARTED); - - final CountDownLatch indexLatch = new CountDownLatch(snapshotIndexCommit.getFiles().length); final CopyOnWriteArrayList failures = new CopyOnWriteArrayList(); final List indexCommitPointFiles = newArrayList(); int indexNumberOfFiles = 0; long indexTotalFilesSize = 0; + ArrayList filesToSnapshot = newArrayList(); for (String fileName : snapshotIndexCommit.getFiles()) { if (snapshotStatus.aborted()) { logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); @@ -423,20 +455,28 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements indexNumberOfFiles++; indexTotalFilesSize += md.length(); // create a new FileInfo - try { - BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = new BlobStoreIndexShardSnapshot.FileInfo(fileNameFromGeneration(++generation), fileName, md.length(), chunkSize, md.checksum()); - indexCommitPointFiles.add(snapshotFileInfo); - snapshotFile(snapshotFileInfo, indexLatch, failures); - } catch (IOException e) { - failures.add(e); - } + BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = new BlobStoreIndexShardSnapshot.FileInfo(fileNameFromGeneration(++generation), fileName, md.length(), chunkSize, md.checksum()); + indexCommitPointFiles.add(snapshotFileInfo); + filesToSnapshot.add(snapshotFileInfo); } else { indexCommitPointFiles.add(fileInfo); - indexLatch.countDown(); } } snapshotStatus.files(indexNumberOfFiles, indexTotalFilesSize); + + snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.STARTED); + + final CountDownLatch indexLatch = new CountDownLatch(filesToSnapshot.size()); + + for (FileInfo snapshotFileInfo : filesToSnapshot) { + try { + snapshotFile(snapshotFileInfo, indexLatch, failures); + } catch (IOException e) { + failures.add(e); + } + } + snapshotStatus.indexVersion(snapshotIndexCommit.getGeneration()); try { @@ -453,7 +493,11 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE); String commitPointName = snapshotBlobName(snapshotId); - BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getSnapshot(), snapshotIndexCommit.getGeneration(), indexCommitPointFiles); + BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getSnapshot(), + snapshotIndexCommit.getGeneration(), indexCommitPointFiles, snapshotStatus.startTime(), + // snapshotStatus.startTime() is assigned on the same machine, so it's safe to use with VLong + System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize); + //TODO: The time stored in snapshot doesn't include cleanup time. try { byte[] snapshotData = writeSnapshot(snapshot); logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); @@ -506,6 +550,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements @Override public void onCompleted() { IOUtils.closeWhileHandlingException(fIndexInput); + snapshotStatus.addProcessedFile(fileInfo.length()); if (counter.decrementAndGet() == 0) { latch.countDown(); } @@ -514,6 +559,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements @Override public void onFailure(Throwable t) { IOUtils.closeWhileHandlingException(fIndexInput); + snapshotStatus.addProcessedFile(0); failures.add(t); if (counter.decrementAndGet() == 0) { latch.countDown(); @@ -613,12 +659,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements */ public void restore() { logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId); - BlobStoreIndexShardSnapshot snapshot; - try { - snapshot = readSnapshot(blobContainer.readBlobFully(snapshotBlobName(snapshotId))); - } catch (IOException ex) { - throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex); - } + BlobStoreIndexShardSnapshot snapshot = loadSnapshot(); recoveryStatus.updateStage(RecoveryStatus.Stage.INDEX); int numberOfFiles = 0; diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java index 67a6ec4dcf3..cd8674fd9a4 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java @@ -22,6 +22,7 @@ package org.elasticsearch.index.snapshots.blobstore; import com.google.common.collect.ImmutableList; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -269,21 +270,38 @@ public class BlobStoreIndexShardSnapshot { private final long indexVersion; + private final long startTime; + + private final long time; + + private final int numberOfFiles; + + private final long totalSize; + private final ImmutableList indexFiles; /** * Constructs new shard snapshot metadata from snapshot metadata * - * @param snapshot snapshot id - * @param indexVersion index version - * @param indexFiles list of files in the shard + * @param snapshot snapshot id + * @param indexVersion index version + * @param indexFiles list of files in the shard + * @param startTime snapshot start time + * @param time snapshot running time + * @param numberOfFiles number of files that where snapshotted + * @param totalSize total size of all files snapshotted */ - public BlobStoreIndexShardSnapshot(String snapshot, long indexVersion, List indexFiles) { + public BlobStoreIndexShardSnapshot(String snapshot, long indexVersion, List indexFiles, long startTime, long time, + int numberOfFiles, long totalSize) { assert snapshot != null; assert indexVersion >= 0; this.snapshot = snapshot; this.indexVersion = indexVersion; this.indexFiles = ImmutableList.copyOf(indexFiles); + this.startTime = startTime; + this.time = time; + this.numberOfFiles = numberOfFiles; + this.totalSize = totalSize; } /** @@ -313,6 +331,55 @@ public class BlobStoreIndexShardSnapshot { return indexFiles; } + /** + * Returns snapshot start time + */ + public long startTime() { + return startTime; + } + + /** + * Returns snapshot running time + */ + public long time() { + return time; + } + + /** + * Returns number of files that where snapshotted + */ + public int numberOfFiles() { + return numberOfFiles; + } + + /** + * Returns total size of all files that where snapshotted + */ + public long totalSize() { + return totalSize; + } + + static final class Fields { + static final XContentBuilderString NAME = new XContentBuilderString("name"); + static final XContentBuilderString INDEX_VERSION = new XContentBuilderString("index_version"); + static final XContentBuilderString START_TIME = new XContentBuilderString("start_time"); + static final XContentBuilderString TIME = new XContentBuilderString("time"); + static final XContentBuilderString NUMBER_OF_FILES = new XContentBuilderString("number_of_files"); + static final XContentBuilderString TOTAL_SIZE = new XContentBuilderString("total_size"); + static final XContentBuilderString FILES = new XContentBuilderString("files"); + } + + static final class ParseFields { + static final ParseField NAME = new ParseField("name"); + static final ParseField INDEX_VERSION = new ParseField("index_version", "index-version"); + static final ParseField START_TIME = new ParseField("start_time"); + static final ParseField TIME = new ParseField("time"); + static final ParseField NUMBER_OF_FILES = new ParseField("number_of_files"); + static final ParseField TOTAL_SIZE = new ParseField("total_size"); + static final ParseField FILES = new ParseField("files"); + } + + /** * Serializes shard snapshot metadata info into JSON * @@ -323,9 +390,13 @@ public class BlobStoreIndexShardSnapshot { */ public static void toXContent(BlobStoreIndexShardSnapshot snapshot, XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startObject(); - builder.field("name", snapshot.snapshot); - builder.field("index-version", snapshot.indexVersion); - builder.startArray("files"); + builder.field(Fields.NAME, snapshot.snapshot); + builder.field(Fields.INDEX_VERSION, snapshot.indexVersion); + builder.field(Fields.START_TIME, snapshot.startTime); + builder.field(Fields.TIME, snapshot.time); + builder.field(Fields.NUMBER_OF_FILES, snapshot.numberOfFiles); + builder.field(Fields.TOTAL_SIZE, snapshot.totalSize); + builder.startArray(Fields.FILES); for (FileInfo fileInfo : snapshot.indexFiles) { FileInfo.toXContent(fileInfo, builder, params); } @@ -344,6 +415,10 @@ public class BlobStoreIndexShardSnapshot { String snapshot = null; long indexVersion = -1; + long startTime = 0; + long time = 0; + int numberOfFiles = 0; + long totalSize = 0; List indexFiles = newArrayList(); @@ -354,16 +429,29 @@ public class BlobStoreIndexShardSnapshot { String currentFieldName = parser.currentName(); token = parser.nextToken(); if (token.isValue()) { - if ("name".equals(currentFieldName)) { + if (ParseFields.NAME.match(currentFieldName)) { snapshot = parser.text(); - } else if ("index-version".equals(currentFieldName)) { + } else if (ParseFields.INDEX_VERSION.match(currentFieldName)) { + // The index-version is needed for backward compatibility with v 1.0 indexVersion = parser.longValue(); + } else if (ParseFields.START_TIME.match(currentFieldName)) { + startTime = parser.longValue(); + } else if (ParseFields.TIME.match(currentFieldName)) { + time = parser.longValue(); + } else if (ParseFields.NUMBER_OF_FILES.match(currentFieldName)) { + numberOfFiles = parser.intValue(); + } else if (ParseFields.TOTAL_SIZE.match(currentFieldName)) { + totalSize = parser.longValue(); } else { throw new ElasticsearchParseException("unknown parameter [" + currentFieldName + "]"); } } else if (token == XContentParser.Token.START_ARRAY) { - while ((parser.nextToken()) != XContentParser.Token.END_ARRAY) { - indexFiles.add(FileInfo.fromXContent(parser)); + if (ParseFields.FILES.match(currentFieldName)) { + while ((parser.nextToken()) != XContentParser.Token.END_ARRAY) { + indexFiles.add(FileInfo.fromXContent(parser)); + } + } else { + throw new ElasticsearchParseException("unknown parameter [" + currentFieldName + "]"); } } else { throw new ElasticsearchParseException("unexpected token [" + token + "]"); @@ -373,7 +461,8 @@ public class BlobStoreIndexShardSnapshot { } } } - return new BlobStoreIndexShardSnapshot(snapshot, indexVersion, ImmutableList.copyOf(indexFiles)); + return new BlobStoreIndexShardSnapshot(snapshot, indexVersion, ImmutableList.copyOf(indexFiles), + startTime, time, numberOfFiles, totalSize); } /** diff --git a/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index 2d160de5daa..95a948db86f 100644 --- a/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -116,6 +116,8 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio nodeStoreFilesMetaDatas.add((NodeStoreFilesMetaData) resp); } else if (resp instanceof FailedNodeException) { failures.add((FailedNodeException) resp); + } else { + logger.warn("unknown response type [{}], expected NodeStoreFilesMetaData or FailedNodeException", resp); } } return new NodesStoreFilesMetaData(clusterName, nodeStoreFilesMetaDatas.toArray(new NodeStoreFilesMetaData[nodeStoreFilesMetaDatas.size()]), diff --git a/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java b/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java index 11024bbacde..67b30577bf0 100644 --- a/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java +++ b/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java @@ -21,6 +21,7 @@ package org.elasticsearch.repositories; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import org.elasticsearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; import org.elasticsearch.repositories.fs.FsRepository; @@ -61,6 +62,7 @@ public class RepositoriesModule extends AbstractModule { protected void configure() { bind(RepositoriesService.class).asEagerSingleton(); bind(SnapshotsService.class).asEagerSingleton(); + bind(TransportNodesSnapshotsStatus.class).asEagerSingleton(); bind(RestoreService.class).asEagerSingleton(); bind(RepositoryTypesRegistry.class).toInstance(new RepositoryTypesRegistry(ImmutableMap.copyOf(repositoryTypes))); } diff --git a/src/main/java/org/elasticsearch/rest/action/RestActionModule.java b/src/main/java/org/elasticsearch/rest/action/RestActionModule.java index bffbfd88077..af3b795d632 100644 --- a/src/main/java/org/elasticsearch/rest/action/RestActionModule.java +++ b/src/main/java/org/elasticsearch/rest/action/RestActionModule.java @@ -40,6 +40,7 @@ import org.elasticsearch.rest.action.admin.cluster.snapshots.create.RestCreateSn import org.elasticsearch.rest.action.admin.cluster.snapshots.delete.RestDeleteSnapshotAction; import org.elasticsearch.rest.action.admin.cluster.snapshots.get.RestGetSnapshotsAction; import org.elasticsearch.rest.action.admin.cluster.snapshots.restore.RestRestoreSnapshotAction; +import org.elasticsearch.rest.action.admin.cluster.snapshots.status.RestSnapshotsStatusAction; import org.elasticsearch.rest.action.admin.cluster.state.RestClusterStateAction; import org.elasticsearch.rest.action.admin.cluster.stats.RestClusterStatsAction; import org.elasticsearch.rest.action.admin.cluster.tasks.RestPendingClusterTasksAction; @@ -141,6 +142,7 @@ public class RestActionModule extends AbstractModule { bind(RestCreateSnapshotAction.class).asEagerSingleton(); bind(RestRestoreSnapshotAction.class).asEagerSingleton(); bind(RestDeleteSnapshotAction.class).asEagerSingleton(); + bind(RestSnapshotsStatusAction.class).asEagerSingleton(); bind(RestIndicesExistsAction.class).asEagerSingleton(); bind(RestTypesExistsAction.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/rest/action/admin/cluster/snapshots/status/RestSnapshotsStatusAction.java b/src/main/java/org/elasticsearch/rest/action/admin/cluster/snapshots/status/RestSnapshotsStatusAction.java new file mode 100644 index 00000000000..3a89d1acc50 --- /dev/null +++ b/src/main/java/org/elasticsearch/rest/action/admin/cluster/snapshots/status/RestSnapshotsStatusAction.java @@ -0,0 +1,77 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.cluster.snapshots.status; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.*; +import org.elasticsearch.rest.action.support.RestXContentBuilder; + +import java.io.IOException; + +import static org.elasticsearch.client.Requests.snapshotsStatusRequest; +import static org.elasticsearch.rest.RestRequest.Method.GET; +import static org.elasticsearch.rest.RestStatus.OK; + +/** + * Returns status of currently running snapshot + */ +public class RestSnapshotsStatusAction extends BaseRestHandler { + + @Inject + public RestSnapshotsStatusAction(Settings settings, Client client, RestController controller) { + super(settings, client); + controller.registerHandler(GET, "/_snapshot/{repository}/{snapshot}/_status", this); + controller.registerHandler(GET, "/_snapshot/{repository}/_status", this); + controller.registerHandler(GET, "/_snapshot/_status", this); + } + + @Override + public void handleRequest(final RestRequest request, final RestChannel channel) { + String repository = request.param("repository"); + String[] snapshots = request.paramAsStringArray("snapshot", Strings.EMPTY_ARRAY); + if (snapshots.length == 1 && "_all".equalsIgnoreCase(snapshots[0])) { + snapshots = Strings.EMPTY_ARRAY; + } + SnapshotsStatusRequest snapshotsStatusResponse = snapshotsStatusRequest(repository).snapshots(snapshots); + + snapshotsStatusResponse.masterNodeTimeout(request.paramAsTime("master_timeout", snapshotsStatusResponse.masterNodeTimeout())); + client.admin().cluster().snapshotsStatus(snapshotsStatusResponse, new AbstractRestResponseActionListener(request, channel, logger) { + @Override + public void onResponse(SnapshotsStatusResponse response) { + try { + XContentBuilder builder = RestXContentBuilder.restContentBuilder(request); + builder.startObject(); + response.toXContent(builder, request); + builder.endObject(); + channel.sendResponse(new XContentRestResponse(request, OK, builder)); + } catch (IOException e) { + onFailure(e); + } + } + }); + } +} diff --git a/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 641d5c2a255..f4f945f24d1 100644 --- a/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.index.snapshots.IndexShardSnapshotAndRestoreService; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.indices.IndicesService; @@ -365,6 +366,128 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL } } + /** + * Returns status of the currently running snapshots + *

+ * This method is executed on master node + *

+ * + * @param repository repository id + * @param snapshots optional list of snapshots that will be used as a filter + * @return list of metadata for currently running snapshots + */ + public ImmutableList currentSnapshots(String repository, String[] snapshots) { + MetaData metaData = clusterService.state().metaData(); + SnapshotMetaData snapshotMetaData = metaData.custom(SnapshotMetaData.TYPE); + if (snapshotMetaData == null || snapshotMetaData.entries().isEmpty()) { + return ImmutableList.of(); + } + if (repository == null) { + return snapshotMetaData.entries(); + } + if (snapshotMetaData.entries().size() == 1) { + // Most likely scenario - one snapshot is currently running + // Check this snapshot against the query + SnapshotMetaData.Entry entry = snapshotMetaData.entries().get(0); + if (!entry.snapshotId().getRepository().equals(repository)) { + return ImmutableList.of(); + } + if (snapshots != null && snapshots.length > 0) { + for (String snapshot : snapshots) { + if (entry.snapshotId().getSnapshot().equals(snapshot)) { + return snapshotMetaData.entries(); + } + } + return ImmutableList.of(); + } else { + return snapshotMetaData.entries(); + } + } + ImmutableList.Builder builder = ImmutableList.builder(); + for (SnapshotMetaData.Entry entry : snapshotMetaData.entries()) { + if (!entry.snapshotId().getRepository().equals(repository)) { + continue; + } + if (snapshots != null && snapshots.length > 0) { + for (String snapshot : snapshots) { + if (entry.snapshotId().getSnapshot().equals(snapshot)) { + builder.add(entry); + break; + } + } + } else { + builder.add(entry); + } + } + return builder.build(); + } + + /** + * Returns status of shards that are snapshotted on the node and belong to the given snapshot + *

+ * This method is executed on data node + *

+ * + * @param snapshotId snapshot id + * @return map of shard id to snapshot status + */ + public ImmutableMap currentSnapshotShards(SnapshotId snapshotId) { + SnapshotShards snapshotShards = shardSnapshots.get(snapshotId); + if (snapshotShards == null) { + return null; + } else { + return snapshotShards.shards; + } + } + + /** + * Returns status of shards currently finished snapshots + *

+ * This method is executed on master node and it's complimentary to the {@link #currentSnapshotShards(SnapshotId)} becuase it + * returns simliar information but for already finished snapshots. + *

+ * + * @param snapshotId snapshot id + * @return map of shard id to snapshot status + */ + public ImmutableMap snapshotShards(SnapshotId snapshotId) { + ImmutableMap.Builder shardStatusBuilder = ImmutableMap.builder(); + Repository repository = repositoriesService.repository(snapshotId.getRepository()); + IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(snapshotId.getRepository()); + Snapshot snapshot = repository.readSnapshot(snapshotId); + MetaData metaData = repository.readSnapshotMetaData(snapshotId, snapshot.indices()); + for (String index : snapshot.indices()) { + IndexMetaData indexMetaData = metaData.indices().get(index); + if (indexMetaData != null) { + int numberOfShards = indexMetaData.getNumberOfShards(); + for (int i = 0; i < numberOfShards; i++) { + ShardId shardId = new ShardId(index, i); + SnapshotShardFailure shardFailure = findShardFailure(snapshot.shardFailures(), shardId); + if (shardFailure != null) { + IndexShardSnapshotStatus shardSnapshotStatus = new IndexShardSnapshotStatus(); + shardSnapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FAILURE); + shardSnapshotStatus.failure(shardFailure.reason()); + shardStatusBuilder.put(shardId, shardSnapshotStatus); + } else { + IndexShardSnapshotStatus shardSnapshotStatus = indexShardRepository.snapshotStatus(snapshotId, shardId); + shardStatusBuilder.put(shardId, shardSnapshotStatus); + } + } + } + } + return shardStatusBuilder.build(); + } + + + private SnapshotShardFailure findShardFailure(ImmutableList shardFailures, ShardId shardId) { + for (SnapshotShardFailure shardFailure : shardFailures) { + if (shardId.getIndex().equals(shardFailure.index()) && shardId.getId() == shardFailure.shardId()) { + return shardFailure; + } + } + return null; + } + @Override public void clusterChanged(ClusterChangedEvent event) { try { diff --git a/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java b/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java index daedfca5f4e..64f732736c6 100644 --- a/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java +++ b/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java @@ -28,12 +28,17 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.status.*; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.SnapshotMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -375,7 +380,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests { ImmutableSettings.settingsBuilder() .put("location", newTempDir(LifecycleScope.TEST)) .put("random", randomAsciiOfLength(10)) - .put("random_data_file_io_exception_rate", 0.1))); + .put("random_data_file_io_exception_rate", 0.3))); createIndex("test-idx"); ensureGreen(); @@ -390,9 +395,11 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests { logger.info("--> snapshot"); CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get(); if (createSnapshotResponse.getSnapshotInfo().totalShards() == createSnapshotResponse.getSnapshotInfo().successfulShards()) { + logger.info("--> no failures"); // If we are here, that means we didn't have any failures, let's check it assertThat(getFailureCount("test-repo"), equalTo(0L)); } else { + logger.info("--> some failures"); assertThat(getFailureCount("test-repo"), greaterThan(0L)); assertThat(createSnapshotResponse.getSnapshotInfo().shardFailures().size(), greaterThan(0)); for (SnapshotShardFailure shardFailure : createSnapshotResponse.getSnapshotInfo().shardFailures()) { @@ -404,6 +411,28 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests { SnapshotInfo snapshotInfo = getSnapshotsResponse.getSnapshots().get(0); assertThat(snapshotInfo.shardFailures().size(), greaterThan(0)); assertThat(snapshotInfo.totalShards(), greaterThan(snapshotInfo.successfulShards())); + + // Verify that snapshot status also contains the same failures + SnapshotsStatusResponse snapshotsStatusResponse = client.admin().cluster().prepareSnapshotStatus("test-repo").addSnapshots("test-snap").get(); + assertThat(snapshotsStatusResponse.getSnapshots().size(), equalTo(1)); + SnapshotStatus snapshotStatus = snapshotsStatusResponse.getSnapshots().get(0); + assertThat(snapshotStatus.getIndices().size(), equalTo(1)); + SnapshotIndexStatus indexStatus = snapshotStatus.getIndices().get("test-idx"); + assertThat(indexStatus, notNullValue()); + assertThat(indexStatus.getShardsStats().getFailedShards(), equalTo(snapshotInfo.failedShards())); + assertThat(indexStatus.getShardsStats().getDoneShards(), equalTo(snapshotInfo.successfulShards())); + assertThat(indexStatus.getShards().size(), equalTo(snapshotInfo.totalShards())); + + int numberOfFailures = 0; + for (SnapshotIndexShardStatus shardStatus : indexStatus.getShards().values()) { + if (shardStatus.getStage() == SnapshotIndexShardStage.FAILURE) { + assertThat(shardStatus.getFailure(), notNullValue()); + numberOfFailures++; + } else { + assertThat(shardStatus.getFailure(), nullValue()); + } + } + assertThat(indexStatus.getShardsStats().getFailedShards(), equalTo(numberOfFailures)); } } @@ -972,6 +1001,84 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests { } } + + @Test + @TestLogging("cluster.routing.allocation.decider:TRACE") + public void snapshotStatusTest() throws Exception { + Client client = client(); + File repositoryLocation = newTempDir(LifecycleScope.TEST); + logger.info("--> creating repository"); + PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") + .setType(MockRepositoryModule.class.getCanonicalName()).setSettings( + ImmutableSettings.settingsBuilder() + .put("location", repositoryLocation) + .put("random", randomAsciiOfLength(10)) + .put("wait_after_unblock", 200) + ).get(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + + // Create index on 2 nodes and make sure each node has a primary by setting no replicas + assertAcked(prepareCreate("test-idx", 2, ImmutableSettings.builder().put("number_of_replicas", 0))); + + logger.info("--> indexing some data"); + for (int i = 0; i < 100; i++) { + index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L)); + + // Pick one node and block it + String blockedNode = blockNodeWithIndex("test-idx"); + + + logger.info("--> snapshot"); + client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get(); + + logger.info("--> waiting for block to kick in"); + waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60)); + + logger.info("--> execution was blocked on node [{}], checking snapshot status", blockedNode); + SnapshotsStatusResponse response = client.admin().cluster().prepareSnapshotStatus("test-repo").execute().actionGet(); + + logger.info("--> unblocking blocked node"); + unblockNode(blockedNode); + + assertThat(response.getSnapshots().size(), equalTo(1)); + SnapshotStatus snapshotStatus = response.getSnapshots().get(0); + assertThat(snapshotStatus.getState(), equalTo(SnapshotMetaData.State.STARTED)); + // We blocked the node during data write operation, so at least one shard snapshot should be in STARTED stage + assertThat(snapshotStatus.getShardsStats().getStartedShards(), greaterThan(0)); + for( SnapshotIndexShardStatus shardStatus : snapshotStatus.getIndices().get("test-idx")) { + if (shardStatus.getStage() == SnapshotIndexShardStage.STARTED) { + assertThat(shardStatus.getNodeId(), notNullValue()); + } + } + + SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600)); + logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size()); + logger.info("--> done"); + + + logger.info("--> checking snapshot status again after snapshot is done", blockedNode); + response = client.admin().cluster().prepareSnapshotStatus("test-repo").addSnapshots("test-snap").execute().actionGet(); + snapshotStatus = response.getSnapshots().get(0); + assertThat(snapshotStatus.getIndices().size(), equalTo(1)); + SnapshotIndexStatus indexStatus = snapshotStatus.getIndices().get("test-idx"); + assertThat(indexStatus, notNullValue()); + assertThat(indexStatus.getShardsStats().getInitializingShards(), equalTo(0)); + assertThat(indexStatus.getShardsStats().getFailedShards(), equalTo(snapshotInfo.failedShards())); + assertThat(indexStatus.getShardsStats().getDoneShards(), equalTo(snapshotInfo.successfulShards())); + assertThat(indexStatus.getShards().size(), equalTo(snapshotInfo.totalShards())); + + try { + client.admin().cluster().prepareSnapshotStatus("test-repo").addSnapshots("test-snap-doesnt-exist").execute().actionGet(); + fail(); + } catch (SnapshotMissingException ex) { + // Expected + } + + } + private boolean waitForIndex(String index, TimeValue timeout) throws InterruptedException { long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < timeout.millis()) {