diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 837a9efa6ce..97573c0d312 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -23,6 +23,7 @@ import com.carrotsearch.hppc.ObjectIntOpenHashMap; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Sets; import org.elasticsearch.cluster.ClusterState; @@ -97,7 +98,7 @@ public class RoutingNodes implements Iterable { } MutableShardRouting sr = new MutableShardRouting(shard); entries.add(sr); - activeShardsAdd(sr); + assignedShardsAdd(sr); if (shard.relocating()) { entries = nodesToShards.get(shard.relocatingNodeId()); relocatingShards++; @@ -110,7 +111,7 @@ public class RoutingNodes implements Iterable { sr = new MutableShardRouting(shard.index(), shard.id(), shard.relocatingNodeId(), shard.currentNodeId(), shard.primary(), ShardRoutingState.INITIALIZING, shard.version()); entries.add(sr); - activeShardsAdd(sr); + assignedShardsAdd(sr); } else if (!shard.active()) { // shards that are initializing without being relocated if (shard.primary()) { inactivePrimaryCount++; @@ -119,7 +120,7 @@ public class RoutingNodes implements Iterable { } } else { MutableShardRouting sr = new MutableShardRouting(shard); - activeShardsAdd(sr); + assignedShardsAdd(sr); unassignedShards.add(sr); } } @@ -251,22 +252,40 @@ public class RoutingNodes implements Iterable { */ public MutableShardRouting activePrimary(ShardRouting shard) { assert !shard.primary(); - for (MutableShardRouting shardRouting : activeShards(shard.shardId())) { - if (shardRouting.primary()) { - if (shardRouting.active()) { - return shardRouting; - } - break; + for (MutableShardRouting shardRouting : assignedShards(shard.shardId())) { + if (shardRouting.primary() && shardRouting.active()) { + return shardRouting; } } return null; } + /** + * Returns one active replica shard for the given ShardRouting shard ID or null if + * no active replica is found. + */ + public MutableShardRouting activeReplica(ShardRouting shard) { + for (MutableShardRouting shardRouting : assignedShards(shard.shardId())) { + if (!shardRouting.primary() && shardRouting.active()) { + return shardRouting; + } + } + return null; + } + + /** + * Returns all shards that are not in the state UNASSIGNED with the same shard + * ID as the given shard. + */ + public Iterable assignedShards(ShardRouting shard) { + return Iterables.unmodifiableIterable(assignedShards(shard.shardId())); + } + /** * Returns true iff all replicas are active for the given shard routing. Otherwise false */ public boolean allReplicasActive(ShardRouting shardRouting) { - final Set shards = activeShards(shardRouting.shardId()); + final Set shards = assignedShards(shardRouting.shardId()); if (shards.isEmpty() || shards.size() < this.routingTable.index(shardRouting.index()).shard(shardRouting.id()).size()) { return false; // if we are empty nothing is active if we have less than total at least one is unassigned } @@ -352,7 +371,7 @@ public class RoutingNodes implements Iterable { if (shard.state() == ShardRoutingState.RELOCATING) { relocatingShards++; } - activeShardsAdd(shard); + assignedShardsAdd(shard); } /** @@ -410,7 +429,7 @@ public class RoutingNodes implements Iterable { private static final Set EMPTY = Collections.emptySet(); - private Set activeShards(ShardId shardId) { + private Set assignedShards(ShardId shardId) { final Set replicaSet = assignedShards.get(shardId); return replicaSet == null ? EMPTY : Collections.unmodifiableSet(replicaSet); } @@ -430,10 +449,10 @@ public class RoutingNodes implements Iterable { } else if (shard.relocating()) { cancelRelocation(shard); } - activeShardsRemove(shard); + assignedShardsRemove(shard); } - private void activeShardsAdd(MutableShardRouting shard) { + private void assignedShardsAdd(MutableShardRouting shard) { if (shard.unassigned()) { // no unassigned return; @@ -446,7 +465,7 @@ public class RoutingNodes implements Iterable { replicaSet.add(shard); } - private void activeShardsRemove(MutableShardRouting shard) { + private void assignedShardsRemove(MutableShardRouting shard) { Set replicaSet = assignedShards.get(shard.shardId()); if (replicaSet != null) { if (replicaSet.contains(shard)) { @@ -488,6 +507,7 @@ public class RoutingNodes implements Iterable { public final static class UnassignedShards implements Iterable { private final List unassigned; + private int primaries = 0; private long transactionId = 0; private final UnassignedShards source; @@ -581,10 +601,6 @@ public class RoutingNodes implements Iterable { return new UnassignedShards(this); } - public void copyAll(Collection others) { - others.addAll(unassigned); - } - public MutableShardRouting[] drain() { MutableShardRouting[] mutableShardRoutings = unassigned.toArray(new MutableShardRouting[unassigned.size()]); unassigned.clear(); @@ -649,7 +665,7 @@ public class RoutingNodes implements Iterable { } } } - Set mutableShardRoutings = routingNodes.activeShards(new ShardId(index, i)); + Set mutableShardRoutings = routingNodes.assignedShards(new ShardId(index, i)); for (MutableShardRouting r : mutableShardRoutings) { assert shards.contains(r); shards.remove(r); diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 5a765b297ec..0dc217b3952 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -246,65 +246,52 @@ public class AllocationService extends AbstractComponent { private boolean electPrimariesAndUnassignDanglingReplicas(RoutingAllocation allocation) { boolean changed = false; RoutingNodes routingNodes = allocation.routingNodes(); + if (!routingNodes.hasUnassignedPrimaries()) { + // move out if we don't have unassigned primaries + return changed; + } for (MutableShardRouting shardEntry : routingNodes.unassigned()) { - if (shardEntry.primary() && !shardEntry.assignedToNode()) { - boolean elected = false; - // primary and not assigned, go over and find a replica that is assigned and active (since it might be relocating) - for (RoutingNode routingNode : routingNodes) { - - for (MutableShardRouting shardEntry2 : routingNode) { - if (shardEntry.shardId().equals(shardEntry2.shardId()) && shardEntry2.active()) { - assert shardEntry2.assignedToNode(); - assert !shardEntry2.primary(); - - changed = true; - routingNodes.swapPrimaryFlag(shardEntry, shardEntry2); - - if (shardEntry2.relocatingNodeId() != null) { - // its also relocating, make sure to move the other routing to primary - RoutingNode node = routingNodes.node(shardEntry2.relocatingNodeId()); - if (node != null) { - for (MutableShardRouting shardRouting : node) { - if (shardRouting.shardId().equals(shardEntry2.shardId()) && !shardRouting.primary()) { - routingNodes.swapPrimaryFlag(shardRouting); - break; - } - } + if (shardEntry.primary()) { + MutableShardRouting candidate = allocation.routingNodes().activeReplica(shardEntry); + if (candidate != null) { + routingNodes.swapPrimaryFlag(shardEntry, candidate); + if (candidate.relocatingNodeId() != null) { + changed = true; + // its also relocating, make sure to move the other routing to primary + RoutingNode node = routingNodes.node(candidate.relocatingNodeId()); + if (node != null) { + for (MutableShardRouting shardRouting : node) { + if (shardRouting.shardId().equals(candidate.shardId()) && !shardRouting.primary()) { + routingNodes.swapPrimaryFlag(shardRouting); + break; } } - - elected = true; - break; } } - - if (elected) { - break; - } } } } // go over and remove dangling replicas that are initializing, but we couldn't elect primary ones... List shardsToFail = null; - for (MutableShardRouting shardEntry : routingNodes.unassigned()) { - if (shardEntry.primary() && !shardEntry.assignedToNode()) { - for (RoutingNode routingNode : routingNodes) { - for (MutableShardRouting shardEntry2 : routingNode) { - if (shardEntry.shardId().equals(shardEntry2.shardId()) && !shardEntry2.active()) { + if (routingNodes.hasUnassignedPrimaries()) { + for (MutableShardRouting shardEntry : routingNodes.unassigned()) { + if (shardEntry.primary()) { + for(MutableShardRouting routing : routingNodes.assignedShards(shardEntry)) { + if (!routing.primary()) { changed = true; if (shardsToFail == null) { shardsToFail = new ArrayList(); } - shardsToFail.add(shardEntry2); + shardsToFail.add(routing); } } } } - } - if (shardsToFail != null) { - for (ShardRouting shardToFail : shardsToFail) { - applyFailedShard(allocation, shardToFail, false); + if (shardsToFail != null) { + for (ShardRouting shardToFail : shardsToFail) { + applyFailedShard(allocation, shardToFail, false); + } } } return changed;