Use existing datastructures from RoutingNodes to elect unassigned primaries

Currently we trying to find a replica for a primary that is allocated by
running through all shards in the cluster while RoutingNodes already has
a datastructure keyed by shard ID for this. We should lookup this
directly rather than using linear probing. This improves shard allocation performance
by 5x.
This commit is contained in:
Simon Willnauer 2013-12-18 11:38:29 +01:00
parent 62104a10ef
commit 314499cee0
2 changed files with 63 additions and 60 deletions

View File

@ -23,6 +23,7 @@ import com.carrotsearch.hppc.ObjectIntOpenHashMap;
import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -97,7 +98,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
} }
MutableShardRouting sr = new MutableShardRouting(shard); MutableShardRouting sr = new MutableShardRouting(shard);
entries.add(sr); entries.add(sr);
activeShardsAdd(sr); assignedShardsAdd(sr);
if (shard.relocating()) { if (shard.relocating()) {
entries = nodesToShards.get(shard.relocatingNodeId()); entries = nodesToShards.get(shard.relocatingNodeId());
relocatingShards++; relocatingShards++;
@ -110,7 +111,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
sr = new MutableShardRouting(shard.index(), shard.id(), shard.relocatingNodeId(), sr = new MutableShardRouting(shard.index(), shard.id(), shard.relocatingNodeId(),
shard.currentNodeId(), shard.primary(), ShardRoutingState.INITIALIZING, shard.version()); shard.currentNodeId(), shard.primary(), ShardRoutingState.INITIALIZING, shard.version());
entries.add(sr); entries.add(sr);
activeShardsAdd(sr); assignedShardsAdd(sr);
} else if (!shard.active()) { // shards that are initializing without being relocated } else if (!shard.active()) { // shards that are initializing without being relocated
if (shard.primary()) { if (shard.primary()) {
inactivePrimaryCount++; inactivePrimaryCount++;
@ -119,7 +120,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
} }
} else { } else {
MutableShardRouting sr = new MutableShardRouting(shard); MutableShardRouting sr = new MutableShardRouting(shard);
activeShardsAdd(sr); assignedShardsAdd(sr);
unassignedShards.add(sr); unassignedShards.add(sr);
} }
} }
@ -251,22 +252,40 @@ public class RoutingNodes implements Iterable<RoutingNode> {
*/ */
public MutableShardRouting activePrimary(ShardRouting shard) { public MutableShardRouting activePrimary(ShardRouting shard) {
assert !shard.primary(); assert !shard.primary();
for (MutableShardRouting shardRouting : activeShards(shard.shardId())) { for (MutableShardRouting shardRouting : assignedShards(shard.shardId())) {
if (shardRouting.primary()) { if (shardRouting.primary() && shardRouting.active()) {
if (shardRouting.active()) {
return shardRouting; return shardRouting;
} }
break;
}
} }
return null; return null;
} }
/**
* Returns one active replica shard for the given ShardRouting shard ID or <code>null</code> if
* no active replica is found.
*/
public MutableShardRouting activeReplica(ShardRouting shard) {
for (MutableShardRouting shardRouting : assignedShards(shard.shardId())) {
if (!shardRouting.primary() && shardRouting.active()) {
return shardRouting;
}
}
return null;
}
/**
* Returns all shards that are not in the state UNASSIGNED with the same shard
* ID as the given shard.
*/
public Iterable<MutableShardRouting> assignedShards(ShardRouting shard) {
return Iterables.unmodifiableIterable(assignedShards(shard.shardId()));
}
/** /**
* Returns <code>true</code> iff all replicas are active for the given shard routing. Otherwise <code>false</code> * Returns <code>true</code> iff all replicas are active for the given shard routing. Otherwise <code>false</code>
*/ */
public boolean allReplicasActive(ShardRouting shardRouting) { public boolean allReplicasActive(ShardRouting shardRouting) {
final Set<MutableShardRouting> shards = activeShards(shardRouting.shardId()); final Set<MutableShardRouting> shards = assignedShards(shardRouting.shardId());
if (shards.isEmpty() || shards.size() < this.routingTable.index(shardRouting.index()).shard(shardRouting.id()).size()) { if (shards.isEmpty() || shards.size() < this.routingTable.index(shardRouting.index()).shard(shardRouting.id()).size()) {
return false; // if we are empty nothing is active if we have less than total at least one is unassigned return false; // if we are empty nothing is active if we have less than total at least one is unassigned
} }
@ -352,7 +371,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
if (shard.state() == ShardRoutingState.RELOCATING) { if (shard.state() == ShardRoutingState.RELOCATING) {
relocatingShards++; relocatingShards++;
} }
activeShardsAdd(shard); assignedShardsAdd(shard);
} }
/** /**
@ -410,7 +429,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
private static final Set<MutableShardRouting> EMPTY = Collections.emptySet(); private static final Set<MutableShardRouting> EMPTY = Collections.emptySet();
private Set<MutableShardRouting> activeShards(ShardId shardId) { private Set<MutableShardRouting> assignedShards(ShardId shardId) {
final Set<MutableShardRouting> replicaSet = assignedShards.get(shardId); final Set<MutableShardRouting> replicaSet = assignedShards.get(shardId);
return replicaSet == null ? EMPTY : Collections.unmodifiableSet(replicaSet); return replicaSet == null ? EMPTY : Collections.unmodifiableSet(replicaSet);
} }
@ -430,10 +449,10 @@ public class RoutingNodes implements Iterable<RoutingNode> {
} else if (shard.relocating()) { } else if (shard.relocating()) {
cancelRelocation(shard); cancelRelocation(shard);
} }
activeShardsRemove(shard); assignedShardsRemove(shard);
} }
private void activeShardsAdd(MutableShardRouting shard) { private void assignedShardsAdd(MutableShardRouting shard) {
if (shard.unassigned()) { if (shard.unassigned()) {
// no unassigned // no unassigned
return; return;
@ -446,7 +465,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
replicaSet.add(shard); replicaSet.add(shard);
} }
private void activeShardsRemove(MutableShardRouting shard) { private void assignedShardsRemove(MutableShardRouting shard) {
Set<MutableShardRouting> replicaSet = assignedShards.get(shard.shardId()); Set<MutableShardRouting> replicaSet = assignedShards.get(shard.shardId());
if (replicaSet != null) { if (replicaSet != null) {
if (replicaSet.contains(shard)) { if (replicaSet.contains(shard)) {
@ -488,6 +507,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
public final static class UnassignedShards implements Iterable<MutableShardRouting> { public final static class UnassignedShards implements Iterable<MutableShardRouting> {
private final List<MutableShardRouting> unassigned; private final List<MutableShardRouting> unassigned;
private int primaries = 0; private int primaries = 0;
private long transactionId = 0; private long transactionId = 0;
private final UnassignedShards source; private final UnassignedShards source;
@ -581,10 +601,6 @@ public class RoutingNodes implements Iterable<RoutingNode> {
return new UnassignedShards(this); return new UnassignedShards(this);
} }
public void copyAll(Collection<MutableShardRouting> others) {
others.addAll(unassigned);
}
public MutableShardRouting[] drain() { public MutableShardRouting[] drain() {
MutableShardRouting[] mutableShardRoutings = unassigned.toArray(new MutableShardRouting[unassigned.size()]); MutableShardRouting[] mutableShardRoutings = unassigned.toArray(new MutableShardRouting[unassigned.size()]);
unassigned.clear(); unassigned.clear();
@ -649,7 +665,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
} }
} }
} }
Set<MutableShardRouting> mutableShardRoutings = routingNodes.activeShards(new ShardId(index, i)); Set<MutableShardRouting> mutableShardRoutings = routingNodes.assignedShards(new ShardId(index, i));
for (MutableShardRouting r : mutableShardRoutings) { for (MutableShardRouting r : mutableShardRoutings) {
assert shards.contains(r); assert shards.contains(r);
shards.remove(r); shards.remove(r);

View File

@ -246,58 +246,44 @@ public class AllocationService extends AbstractComponent {
private boolean electPrimariesAndUnassignDanglingReplicas(RoutingAllocation allocation) { private boolean electPrimariesAndUnassignDanglingReplicas(RoutingAllocation allocation) {
boolean changed = false; boolean changed = false;
RoutingNodes routingNodes = allocation.routingNodes(); RoutingNodes routingNodes = allocation.routingNodes();
if (!routingNodes.hasUnassignedPrimaries()) {
// move out if we don't have unassigned primaries
return changed;
}
for (MutableShardRouting shardEntry : routingNodes.unassigned()) { for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary() && !shardEntry.assignedToNode()) { if (shardEntry.primary()) {
boolean elected = false; MutableShardRouting candidate = allocation.routingNodes().activeReplica(shardEntry);
// primary and not assigned, go over and find a replica that is assigned and active (since it might be relocating) if (candidate != null) {
for (RoutingNode routingNode : routingNodes) { routingNodes.swapPrimaryFlag(shardEntry, candidate);
if (candidate.relocatingNodeId() != null) {
for (MutableShardRouting shardEntry2 : routingNode) {
if (shardEntry.shardId().equals(shardEntry2.shardId()) && shardEntry2.active()) {
assert shardEntry2.assignedToNode();
assert !shardEntry2.primary();
changed = true; changed = true;
routingNodes.swapPrimaryFlag(shardEntry, shardEntry2);
if (shardEntry2.relocatingNodeId() != null) {
// its also relocating, make sure to move the other routing to primary // its also relocating, make sure to move the other routing to primary
RoutingNode node = routingNodes.node(shardEntry2.relocatingNodeId()); RoutingNode node = routingNodes.node(candidate.relocatingNodeId());
if (node != null) { if (node != null) {
for (MutableShardRouting shardRouting : node) { for (MutableShardRouting shardRouting : node) {
if (shardRouting.shardId().equals(shardEntry2.shardId()) && !shardRouting.primary()) { if (shardRouting.shardId().equals(candidate.shardId()) && !shardRouting.primary()) {
routingNodes.swapPrimaryFlag(shardRouting); routingNodes.swapPrimaryFlag(shardRouting);
break; break;
} }
} }
} }
} }
elected = true;
break;
}
}
if (elected) {
break;
}
} }
} }
} }
// go over and remove dangling replicas that are initializing, but we couldn't elect primary ones... // go over and remove dangling replicas that are initializing, but we couldn't elect primary ones...
List<ShardRouting> shardsToFail = null; List<ShardRouting> shardsToFail = null;
if (routingNodes.hasUnassignedPrimaries()) {
for (MutableShardRouting shardEntry : routingNodes.unassigned()) { for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary() && !shardEntry.assignedToNode()) { if (shardEntry.primary()) {
for (RoutingNode routingNode : routingNodes) { for(MutableShardRouting routing : routingNodes.assignedShards(shardEntry)) {
for (MutableShardRouting shardEntry2 : routingNode) { if (!routing.primary()) {
if (shardEntry.shardId().equals(shardEntry2.shardId()) && !shardEntry2.active()) {
changed = true; changed = true;
if (shardsToFail == null) { if (shardsToFail == null) {
shardsToFail = new ArrayList<ShardRouting>(); shardsToFail = new ArrayList<ShardRouting>();
} }
shardsToFail.add(shardEntry2); shardsToFail.add(routing);
}
} }
} }
} }
@ -307,6 +293,7 @@ public class AllocationService extends AbstractComponent {
applyFailedShard(allocation, shardToFail, false); applyFailedShard(allocation, shardToFail, false);
} }
} }
}
return changed; return changed;
} }