improve local shard allocation to utilize same version shards allocation within the same replication group

This commit is contained in:
kimchy 2011-06-09 10:51:45 +03:00
parent 1a0ee00fbb
commit a07030ccf3
1 changed files with 53 additions and 20 deletions

View File

@ -27,7 +27,11 @@ import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.*;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.NodeAllocation;
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.inject.Inject;
@ -110,8 +114,8 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
int numberOfAllocationsFound = 0;
long highestVersion = -1;
DiscoveryNode nodeWithHighestVersion = null;
for (TObjectLongIterator<DiscoveryNode> it = nodesState.iterator(); it.hasNext();) {
Set<DiscoveryNode> nodesWithHighestVersion = Sets.newHashSet();
for (TObjectLongIterator<DiscoveryNode> it = nodesState.iterator(); it.hasNext(); ) {
it.advance();
DiscoveryNode node = it.key();
long version = it.value();
@ -122,12 +126,15 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
if (version != -1) {
numberOfAllocationsFound++;
if (highestVersion == -1) {
nodeWithHighestVersion = node;
nodesWithHighestVersion.add(node);
highestVersion = version;
} else {
if (version > highestVersion) {
nodeWithHighestVersion = node;
nodesWithHighestVersion.clear();
nodesWithHighestVersion.add(node);
highestVersion = version;
} else if (version == highestVersion) {
nodesWithHighestVersion.add(node);
}
}
}
@ -161,24 +168,50 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
continue;
}
RoutingNode node = routingNodes.node(nodeWithHighestVersion.id());
// check if we need to throttle, NOTE, we don't check on NO since it does not apply
// since this is our master data!
if (nodeAllocations.canAllocate(shard, node, allocation) == NodeAllocation.Decision.THROTTLE) {
Set<DiscoveryNode> throttledNodes = Sets.newHashSet();
Set<DiscoveryNode> noNodes = Sets.newHashSet();
for (DiscoveryNode discoNode : nodesWithHighestVersion) {
RoutingNode node = routingNodes.node(discoNode.id());
Decision decision = nodeAllocations.canAllocate(shard, node, allocation);
if (decision == NodeAllocation.Decision.THROTTLE) {
throttledNodes.add(discoNode);
} else if (decision == Decision.NO) {
noNodes.add(discoNode);
} else {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, discoNode);
}
// we found a match
changed = true;
node.add(shard);
unassignedIterator.remove();
// found a node, so no throttling, no "no", and break out of the loop
throttledNodes.clear();
noNodes.clear();
break;
}
}
if (throttledNodes.isEmpty()) {
// if we have a node that we "can't" allocate to, force allocation, since this is our master data!
if (!noNodes.isEmpty()) {
DiscoveryNode discoNode = noNodes.iterator().next();
RoutingNode node = routingNodes.node(discoNode.id());
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, discoNode);
}
// we found a match
changed = true;
node.add(shard);
unassignedIterator.remove();
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodeWithHighestVersion);
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, throttledNodes);
}
// we are throttling this, but we have enough to allocate to this node, ignore it for now
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
} else {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodeWithHighestVersion);
}
// we found a match
changed = true;
node.add(shard);
unassignedIterator.remove();
}
}
@ -301,7 +334,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
nodeIds = nodes.dataNodes().keySet();
} else {
// clean nodes that have failed
for (TObjectLongIterator<DiscoveryNode> it = shardStates.iterator(); it.hasNext();) {
for (TObjectLongIterator<DiscoveryNode> it = shardStates.iterator(); it.hasNext(); ) {
it.advance();
if (!nodes.nodeExists(it.key().id())) {
it.remove();
@ -351,7 +384,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
} else {
nodesIds = Sets.newHashSet();
// clean nodes that have failed
for (Iterator<DiscoveryNode> it = shardStores.keySet().iterator(); it.hasNext();) {
for (Iterator<DiscoveryNode> it = shardStores.keySet().iterator(); it.hasNext(); ) {
DiscoveryNode node = it.next();
if (!nodes.nodeExists(node.id())) {
it.remove();