Move index health calculations to ClusterIndexHealth so it can be reused.

This commit is contained in:
Boaz Leskes 2013-12-16 21:49:47 +01:00
parent 33599d9a34
commit 9fb361cea1
3 changed files with 87 additions and 66 deletions

View File

@ -21,6 +21,10 @@ package org.elasticsearch.action.admin.cluster.health;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -69,6 +73,68 @@ public class ClusterIndexHealth implements Iterable<ClusterShardHealth>, Streama
this.validationFailures = validationFailures;
}
public ClusterIndexHealth(IndexMetaData indexMetaData, IndexRoutingTable indexRoutingTable) {
this.index = indexMetaData.index();
this.numberOfShards = indexMetaData.getNumberOfShards();
this.numberOfReplicas = indexMetaData.getNumberOfReplicas();
this.validationFailures = indexRoutingTable.validate(indexMetaData);
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
ClusterShardHealth shardHealth = new ClusterShardHealth(shardRoutingTable.shardId().id());
for (ShardRouting shardRouting : shardRoutingTable) {
if (shardRouting.active()) {
shardHealth.activeShards++;
if (shardRouting.relocating()) {
// the shard is relocating, the one he is relocating to will be in initializing state, so we don't count it
shardHealth.relocatingShards++;
}
if (shardRouting.primary()) {
shardHealth.primaryActive = true;
}
} else if (shardRouting.initializing()) {
shardHealth.initializingShards++;
} else if (shardRouting.unassigned()) {
shardHealth.unassignedShards++;
}
}
if (shardHealth.primaryActive) {
if (shardHealth.activeShards == shardRoutingTable.size()) {
shardHealth.status = ClusterHealthStatus.GREEN;
} else {
shardHealth.status = ClusterHealthStatus.YELLOW;
}
} else {
shardHealth.status = ClusterHealthStatus.RED;
}
shards.put(shardHealth.getId(), shardHealth);
}
// update the index status
status = ClusterHealthStatus.GREEN;
for (ClusterShardHealth shardHealth : shards.values()) {
if (shardHealth.isPrimaryActive()) {
activePrimaryShards++;
}
activeShards += shardHealth.activeShards;
relocatingShards += shardHealth.relocatingShards;
initializingShards += shardHealth.initializingShards;
unassignedShards += shardHealth.unassignedShards;
if (shardHealth.getStatus() == ClusterHealthStatus.RED) {
status = ClusterHealthStatus.RED;
} else if (shardHealth.getStatus() == ClusterHealthStatus.YELLOW && status != ClusterHealthStatus.RED) {
// do not override an existing red
status = ClusterHealthStatus.YELLOW;
}
}
if (!validationFailures.isEmpty()) {
status = ClusterHealthStatus.RED;
} else if (shards.isEmpty()) { // might be since none has been created yet (two phase index creation)
status = ClusterHealthStatus.RED;
}
}
public String getIndex() {
return index;
}

View File

@ -28,9 +28,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTableValidation;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.IndexMissingException;
@ -223,6 +221,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
}
}
private ClusterHealthResponse clusterHealth(ClusterHealthRequest request, ClusterState clusterState) {
if (logger.isTraceEnabled()) {
logger.trace("Calculating health based on state version [{}]", clusterState.version());
@ -244,64 +243,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
if (indexRoutingTable == null) {
continue;
}
ClusterIndexHealth indexHealth = new ClusterIndexHealth(index, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), validation.indexFailures(indexMetaData.index()));
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
ClusterShardHealth shardHealth = new ClusterShardHealth(shardRoutingTable.shardId().id());
for (ShardRouting shardRouting : shardRoutingTable) {
if (shardRouting.active()) {
shardHealth.activeShards++;
if (shardRouting.relocating()) {
// the shard is relocating, the one he is relocating to will be in initializing state, so we don't count it
shardHealth.relocatingShards++;
}
if (shardRouting.primary()) {
shardHealth.primaryActive = true;
}
} else if (shardRouting.initializing()) {
shardHealth.initializingShards++;
} else if (shardRouting.unassigned()) {
shardHealth.unassignedShards++;
}
}
if (shardHealth.primaryActive) {
if (shardHealth.activeShards == shardRoutingTable.size()) {
shardHealth.status = ClusterHealthStatus.GREEN;
} else {
shardHealth.status = ClusterHealthStatus.YELLOW;
}
} else {
shardHealth.status = ClusterHealthStatus.RED;
}
indexHealth.shards.put(shardHealth.getId(), shardHealth);
}
for (ClusterShardHealth shardHealth : indexHealth) {
if (shardHealth.isPrimaryActive()) {
indexHealth.activePrimaryShards++;
}
indexHealth.activeShards += shardHealth.activeShards;
indexHealth.relocatingShards += shardHealth.relocatingShards;
indexHealth.initializingShards += shardHealth.initializingShards;
indexHealth.unassignedShards += shardHealth.unassignedShards;
}
// update the index status
indexHealth.status = ClusterHealthStatus.GREEN;
if (!indexHealth.getValidationFailures().isEmpty()) {
indexHealth.status = ClusterHealthStatus.RED;
} else if (indexHealth.getShards().isEmpty()) { // might be since none has been created yet (two phase index creation)
indexHealth.status = ClusterHealthStatus.RED;
} else {
for (ClusterShardHealth shardHealth : indexHealth) {
if (shardHealth.getStatus() == ClusterHealthStatus.RED) {
indexHealth.status = ClusterHealthStatus.RED;
break;
}
if (shardHealth.getStatus() == ClusterHealthStatus.YELLOW) {
indexHealth.status = ClusterHealthStatus.YELLOW;
}
}
}
ClusterIndexHealth indexHealth = new ClusterIndexHealth(indexMetaData, indexRoutingTable);
response.indices.put(indexHealth.getIndex(), indexHealth);
}

View File

@ -21,22 +21,21 @@ package org.elasticsearch.cluster.routing;
import com.carrotsearch.hppc.cursors.IntCursor;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.LongObjectCursor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenLongMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.collect.Lists.newArrayList;
@ -124,6 +123,18 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
return;
}
IndexMetaData indexMetaData = metaData.index(index());
for (String failure : validate(indexMetaData)) {
validation.addIndexFailure(index, failure);
}
}
/**
* validate based on a meta data, returning failures found
*/
public List<String> validate(IndexMetaData indexMetaData) {
ArrayList<String> failures = new ArrayList<String>();
// check the number of shards
if (indexMetaData.numberOfShards() != shards().size()) {
Set<Integer> expected = Sets.newHashSet();
@ -133,21 +144,22 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
for (IndexShardRoutingTable indexShardRoutingTable : this) {
expected.remove(indexShardRoutingTable.shardId().id());
}
validation.addIndexFailure(index(), "Wrong number of shards in routing table, missing: " + expected);
failures.add("Wrong number of shards in routing table, missing: " + expected);
}
// check the replicas
for (IndexShardRoutingTable indexShardRoutingTable : this) {
int routingNumberOfReplicas = indexShardRoutingTable.size() - 1;
if (routingNumberOfReplicas != indexMetaData.numberOfReplicas()) {
validation.addIndexFailure(index(), "Shard [" + indexShardRoutingTable.shardId().id()
failures.add("Shard [" + indexShardRoutingTable.shardId().id()
+ "] routing table has wrong number of replicas, expected [" + indexMetaData.numberOfReplicas() + "], got [" + routingNumberOfReplicas + "]");
}
for (ShardRouting shardRouting : indexShardRoutingTable) {
if (!shardRouting.index().equals(index())) {
validation.addIndexFailure(index(), "shard routing has an index [" + shardRouting.index() + "] that is different than the routing table");
failures.add("shard routing has an index [" + shardRouting.index() + "] that is different than the routing table");
}
}
}
return failures;
}
@Override