diff --git a/src/main/java/org/elasticsearch/rest/action/cat/RestRecoveryAction.java b/src/main/java/org/elasticsearch/rest/action/cat/RestRecoveryAction.java index 483f0b1f2e1..0f5c3979618 100644 --- a/src/main/java/org/elasticsearch/rest/action/cat/RestRecoveryAction.java +++ b/src/main/java/org/elasticsearch/rest/action/cat/RestRecoveryAction.java @@ -20,15 +20,19 @@ package org.elasticsearch.rest.action.cat; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.status.IndicesStatusRequest; import org.elasticsearch.action.admin.indices.status.IndicesStatusResponse; import org.elasticsearch.action.admin.indices.status.ShardStatus; import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; import org.elasticsearch.common.Table; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.rest.*; import org.elasticsearch.rest.action.support.RestTable; @@ -54,38 +58,57 @@ public class RestRecoveryAction extends BaseRestHandler { @Override public void handleRequest(final RestRequest request, final RestChannel channel) { - String[] indices = Strings.splitStringByCommaToArray(request.param("index")); - IndicesStatusRequest indicesStatusRequest = new IndicesStatusRequest(indices); - indicesStatusRequest.recovery(true); - indicesStatusRequest.operationThreading(BroadcastOperationThreading.SINGLE_THREAD); + final String[] indices = Strings.splitStringByCommaToArray(request.param("index")); + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.filterMetaData(true); + clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local())); + clusterStateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterStateRequest.masterNodeTimeout())); - client.admin().indices().status(indicesStatusRequest, new ActionListener() { + client.admin().cluster().state(clusterStateRequest, new ActionListener() { @Override - public void onResponse(IndicesStatusResponse indicesStatusResponse) { - Map primarySizes = new HashMap(); - Set replicas = new HashSet(); + public void onResponse(final ClusterStateResponse clusterStateResponse) { + IndicesStatusRequest indicesStatusRequest = new IndicesStatusRequest(indices); + indicesStatusRequest.recovery(true); + indicesStatusRequest.operationThreading(BroadcastOperationThreading.SINGLE_THREAD); - // Loop through all the shards in the index status, keeping - // track of the primary shard size with a Map and the - // recovering shards in a Set of ShardStatus objects - for (ShardStatus shardStatus : indicesStatusResponse.getShards()) { - if (shardStatus.getShardRouting().primary()) { - primarySizes.put(shardStatus.getShardRouting().getIndex() + shardStatus.getShardRouting().getId(), - shardStatus.getStoreSize().bytes()); - } else if (shardStatus.getState() == IndexShardState.RECOVERING) { - replicas.add(shardStatus); - } - } + client.admin().indices().status(indicesStatusRequest, new ActionListener() { + @Override + public void onResponse(IndicesStatusResponse indicesStatusResponse) { + Map primarySizes = new HashMap(); + Set replicas = new HashSet(); - try { - channel.sendResponse(RestTable.buildResponse(buildRecoveryTable(primarySizes, replicas), request, channel)); - } catch (Throwable e) { - try { - channel.sendResponse(new XContentThrowableRestResponse(request, e)); - } catch (IOException e2) { - logger.error("Unable to send recovery status response", e2); + // Loop through all the shards in the index status, keeping + // track of the primary shard size with a Map and the + // recovering shards in a Set of ShardStatus objects + for (ShardStatus shardStatus : indicesStatusResponse.getShards()) { + if (shardStatus.getShardRouting().primary()) { + primarySizes.put(shardStatus.getShardRouting().getIndex() + shardStatus.getShardRouting().getId(), + shardStatus.getStoreSize().bytes()); + } else if (shardStatus.getState() == IndexShardState.RECOVERING) { + replicas.add(shardStatus); + } + } + + try { + channel.sendResponse(RestTable.buildResponse(buildRecoveryTable(clusterStateResponse, primarySizes, replicas), request, channel)); + } catch (Throwable e) { + try { + channel.sendResponse(new XContentThrowableRestResponse(request, e)); + } catch (IOException e2) { + logger.error("Unable to send recovery status response", e2); + } + } } - } + + @Override + public void onFailure(Throwable e) { + try { + channel.sendResponse(new XContentThrowableRestResponse(request, e)); + } catch (IOException e1) { + logger.error("Failed to send failure response", e1); + } + } + }); } @Override @@ -103,20 +126,24 @@ public class RestRecoveryAction extends BaseRestHandler { /** * buildRecoveryTable will build a table of recovery information suitable * for displaying at the command line. + * @param state Current cluster state. * @param primarySizes A Map of {@code index + shardId} strings to store size for all primary shards. * @param recoveringReplicas A Set of {@link ShardStatus} objects for each recovering replica to be displayed. * @return A table containing index, shardId, node, target size, recovered size and percentage for each recovering replica */ - public static Table buildRecoveryTable(Map primarySizes, Set recoveringReplicas) { + public static Table buildRecoveryTable(ClusterStateResponse state, Map primarySizes, Set recoveringReplicas) { Table t = new Table(); t.startHeaders().addCell("index") .addCell("shard") - .addCell("node") .addCell("target", "text-align:right;") .addCell("recovered", "text-align:right;") .addCell("%", "text-align:right;") + .addCell("ip") + .addCell("node") .endHeaders(); for (ShardStatus status : recoveringReplicas) { + DiscoveryNode node = state.getState().nodes().get(status.getShardRouting().currentNodeId()); + String index = status.getShardRouting().getIndex(); int id = status.getShardId(); long replicaSize = status.getStoreSize().bytes(); @@ -124,18 +151,11 @@ public class RestRecoveryAction extends BaseRestHandler { t.startRow(); t.addCell(index); t.addCell(id); - t.addCell(status.getShardRouting().currentNodeId()); - if (primarySize == null) { - t.addCell("NaN"); - } else { - t.addCell(primarySize); - } + t.addCell(primarySize); t.addCell(replicaSize); - if (primarySize == null) { - t.addCell("NaN"); - } else { - t.addCell(String.format(Locale.ROOT, "%1.1f%%", 100.0 * (float)replicaSize / primarySize)); - } + t.addCell(primarySize == null ? null : String.format(Locale.ROOT, "%1.1f%%", 100.0 * (float)replicaSize / primarySize)); + t.addCell(node == null ? null : ((InetSocketTransportAddress) node.address()).address().getAddress().getHostAddress()); + t.addCell(node == null ? null : node.name()); t.endRow(); } return t;