NPE in ShardStats when routing entry is not set yet on IndexShard

closes #7356
This commit is contained in:
Shay Banon 2014-08-20 12:43:26 -07:00
parent abdbfe768b
commit 2f3a041070
5 changed files with 20 additions and 8 deletions

View File

@ -117,12 +117,11 @@ public class TransportClusterStatsAction extends TransportNodesOperationAction<C
List<ShardStats> shardsStats = new ArrayList<>();
for (IndexService indexService : indicesService.indices().values()) {
for (IndexShard indexShard : indexService) {
if (indexShard.routingEntry().active()) {
if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) {
// only report on fully started shards
shardsStats.add(new ShardStats(indexShard, SHARD_STATS_FLAGS));
shardsStats.add(new ShardStats(indexShard, indexShard.routingEntry(), SHARD_STATS_FLAGS));
}
}
}
ClusterHealthStatus clusterStatus = null;

View File

@ -43,9 +43,9 @@ public class ShardStats extends BroadcastShardOperationResponse implements ToXCo
ShardStats() {
}
public ShardStats(IndexShard indexShard, CommonStatsFlags flags) {
super(indexShard.routingEntry().shardId());
this.shardRouting = indexShard.routingEntry();
public ShardStats(IndexShard indexShard, ShardRouting shardRouting, CommonStatsFlags flags) {
super(indexShard.shardId());
this.shardRouting = shardRouting;
this.stats = new CommonStats(indexShard, flags);
}

View File

@ -37,6 +37,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.service.InternalIndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.InternalIndexShard;
@ -135,6 +136,10 @@ public class TransportIndicesStatsAction extends TransportBroadcastOperationActi
protected ShardStats shardOperation(IndexShardStatsRequest request) throws ElasticsearchException {
InternalIndexService indexService = (InternalIndexService) indicesService.indexServiceSafe(request.shardId().getIndex());
InternalIndexShard indexShard = (InternalIndexShard) indexService.shardSafe(request.shardId().id());
// if we don't have the routing entry yet, we need it stats wise, we treat it as if the shard is not ready yet
if (indexShard.routingEntry() == null) {
throw new IndexShardMissingException(indexShard.shardId());
}
CommonStatsFlags flags = new CommonStatsFlags().clear();
@ -197,7 +202,7 @@ public class TransportIndicesStatsAction extends TransportBroadcastOperationActi
flags.set(CommonStatsFlags.Flag.QueryCache);
}
return new ShardStats(indexShard, flags);
return new ShardStats(indexShard, indexShard.routingEntry(), flags);
}
static class IndexShardStatsRequest extends BroadcastShardOperationRequest {

View File

@ -82,6 +82,11 @@ public interface IndexShard extends IndexShardComponent {
ShardFieldData fieldData();
/**
* Returns the latest cluster routing entry received with this shard. Might be null if the
* shard was just created.
*/
@Nullable
ShardRouting routingEntry();
DocsStats docStats();

View File

@ -206,7 +206,10 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
for (IndexService indexService : indices.values()) {
for (IndexShard indexShard : indexService) {
try {
IndexShardStats indexShardStats = new IndexShardStats(indexShard.shardId(), new ShardStats[] { new ShardStats(indexShard, flags) });
if (indexShard.routingEntry() == null) {
continue;
}
IndexShardStats indexShardStats = new IndexShardStats(indexShard.shardId(), new ShardStats[] { new ShardStats(indexShard, indexShard.routingEntry(), flags) });
if (!statsByShard.containsKey(indexService.index())) {
statsByShard.put(indexService.index(), Lists.<IndexShardStats>newArrayList(indexShardStats));
} else {