Add node name & ip to _cat/recovery.

Closes #4016.
This commit is contained in:
Andrew Raines 2013-10-30 12:37:46 -05:00
parent 8819f91d47
commit b2e8ec2924
1 changed files with 60 additions and 40 deletions

View File

@ -20,15 +20,19 @@
package org.elasticsearch.rest.action.cat; package org.elasticsearch.rest.action.cat;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.status.IndicesStatusRequest; import org.elasticsearch.action.admin.indices.status.IndicesStatusRequest;
import org.elasticsearch.action.admin.indices.status.IndicesStatusResponse; import org.elasticsearch.action.admin.indices.status.IndicesStatusResponse;
import org.elasticsearch.action.admin.indices.status.ShardStatus; import org.elasticsearch.action.admin.indices.status.ShardStatus;
import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading; import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Table; import org.elasticsearch.common.Table;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.rest.*; import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestTable; import org.elasticsearch.rest.action.support.RestTable;
@ -54,38 +58,57 @@ public class RestRecoveryAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel) { public void handleRequest(final RestRequest request, final RestChannel channel) {
String[] indices = Strings.splitStringByCommaToArray(request.param("index")); final String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
IndicesStatusRequest indicesStatusRequest = new IndicesStatusRequest(indices); final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
indicesStatusRequest.recovery(true); clusterStateRequest.filterMetaData(true);
indicesStatusRequest.operationThreading(BroadcastOperationThreading.SINGLE_THREAD); clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));
clusterStateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterStateRequest.masterNodeTimeout()));
client.admin().indices().status(indicesStatusRequest, new ActionListener<IndicesStatusResponse>() { client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
@Override @Override
public void onResponse(IndicesStatusResponse indicesStatusResponse) { public void onResponse(final ClusterStateResponse clusterStateResponse) {
Map<String, Long> primarySizes = new HashMap<String, Long>(); IndicesStatusRequest indicesStatusRequest = new IndicesStatusRequest(indices);
Set<ShardStatus> replicas = new HashSet<ShardStatus>(); indicesStatusRequest.recovery(true);
indicesStatusRequest.operationThreading(BroadcastOperationThreading.SINGLE_THREAD);
// Loop through all the shards in the index status, keeping client.admin().indices().status(indicesStatusRequest, new ActionListener<IndicesStatusResponse>() {
// track of the primary shard size with a Map and the @Override
// recovering shards in a Set of ShardStatus objects public void onResponse(IndicesStatusResponse indicesStatusResponse) {
for (ShardStatus shardStatus : indicesStatusResponse.getShards()) { Map<String, Long> primarySizes = new HashMap<String, Long>();
if (shardStatus.getShardRouting().primary()) { Set<ShardStatus> replicas = new HashSet<ShardStatus>();
primarySizes.put(shardStatus.getShardRouting().getIndex() + shardStatus.getShardRouting().getId(),
shardStatus.getStoreSize().bytes());
} else if (shardStatus.getState() == IndexShardState.RECOVERING) {
replicas.add(shardStatus);
}
}
try { // Loop through all the shards in the index status, keeping
channel.sendResponse(RestTable.buildResponse(buildRecoveryTable(primarySizes, replicas), request, channel)); // track of the primary shard size with a Map and the
} catch (Throwable e) { // recovering shards in a Set of ShardStatus objects
try { for (ShardStatus shardStatus : indicesStatusResponse.getShards()) {
channel.sendResponse(new XContentThrowableRestResponse(request, e)); if (shardStatus.getShardRouting().primary()) {
} catch (IOException e2) { primarySizes.put(shardStatus.getShardRouting().getIndex() + shardStatus.getShardRouting().getId(),
logger.error("Unable to send recovery status response", e2); shardStatus.getStoreSize().bytes());
} else if (shardStatus.getState() == IndexShardState.RECOVERING) {
replicas.add(shardStatus);
}
}
try {
channel.sendResponse(RestTable.buildResponse(buildRecoveryTable(clusterStateResponse, primarySizes, replicas), request, channel));
} catch (Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e2) {
logger.error("Unable to send recovery status response", e2);
}
}
} }
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
} }
@Override @Override
@ -103,20 +126,24 @@ public class RestRecoveryAction extends BaseRestHandler {
/** /**
* buildRecoveryTable will build a table of recovery information suitable * buildRecoveryTable will build a table of recovery information suitable
* for displaying at the command line. * for displaying at the command line.
* @param state Current cluster state.
* @param primarySizes A Map of {@code index + shardId} strings to store size for all primary shards. * @param primarySizes A Map of {@code index + shardId} strings to store size for all primary shards.
* @param recoveringReplicas A Set of {@link ShardStatus} objects for each recovering replica to be displayed. * @param recoveringReplicas A Set of {@link ShardStatus} objects for each recovering replica to be displayed.
* @return A table containing index, shardId, node, target size, recovered size and percentage for each recovering replica * @return A table containing index, shardId, node, target size, recovered size and percentage for each recovering replica
*/ */
public static Table buildRecoveryTable(Map<String, Long> primarySizes, Set<ShardStatus> recoveringReplicas) { public static Table buildRecoveryTable(ClusterStateResponse state, Map<String, Long> primarySizes, Set<ShardStatus> recoveringReplicas) {
Table t = new Table(); Table t = new Table();
t.startHeaders().addCell("index") t.startHeaders().addCell("index")
.addCell("shard") .addCell("shard")
.addCell("node")
.addCell("target", "text-align:right;") .addCell("target", "text-align:right;")
.addCell("recovered", "text-align:right;") .addCell("recovered", "text-align:right;")
.addCell("%", "text-align:right;") .addCell("%", "text-align:right;")
.addCell("ip")
.addCell("node")
.endHeaders(); .endHeaders();
for (ShardStatus status : recoveringReplicas) { for (ShardStatus status : recoveringReplicas) {
DiscoveryNode node = state.getState().nodes().get(status.getShardRouting().currentNodeId());
String index = status.getShardRouting().getIndex(); String index = status.getShardRouting().getIndex();
int id = status.getShardId(); int id = status.getShardId();
long replicaSize = status.getStoreSize().bytes(); long replicaSize = status.getStoreSize().bytes();
@ -124,18 +151,11 @@ public class RestRecoveryAction extends BaseRestHandler {
t.startRow(); t.startRow();
t.addCell(index); t.addCell(index);
t.addCell(id); t.addCell(id);
t.addCell(status.getShardRouting().currentNodeId()); t.addCell(primarySize);
if (primarySize == null) {
t.addCell("NaN");
} else {
t.addCell(primarySize);
}
t.addCell(replicaSize); t.addCell(replicaSize);
if (primarySize == null) { t.addCell(primarySize == null ? null : String.format(Locale.ROOT, "%1.1f%%", 100.0 * (float)replicaSize / primarySize));
t.addCell("NaN"); t.addCell(node == null ? null : ((InetSocketTransportAddress) node.address()).address().getAddress().getHostAddress());
} else { t.addCell(node == null ? null : node.name());
t.addCell(String.format(Locale.ROOT, "%1.1f%%", 100.0 * (float)replicaSize / primarySize));
}
t.endRow(); t.endRow();
} }
return t; return t;