From a07030ccf3873f0dd5835ce8918247d04232ab37 Mon Sep 17 00:00:00 2001 From: kimchy Date: Thu, 9 Jun 2011 10:51:45 +0300 Subject: [PATCH] improve local shard allocation to utilize same version shards allocation within the same replication group --- .../local/LocalGatewayNodeAllocation.java | 73 ++++++++++++++----- 1 file changed, 53 insertions(+), 20 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java index bfbe265fe9f..976b15b6e83 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java @@ -27,7 +27,11 @@ import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.allocation.*; +import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; +import org.elasticsearch.cluster.routing.allocation.NodeAllocation; +import org.elasticsearch.cluster.routing.allocation.NodeAllocations; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.inject.Inject; @@ -110,8 +114,8 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { int numberOfAllocationsFound = 0; long highestVersion = -1; - DiscoveryNode nodeWithHighestVersion = null; - for (TObjectLongIterator it = nodesState.iterator(); it.hasNext();) { + Set nodesWithHighestVersion = Sets.newHashSet(); + for (TObjectLongIterator it = nodesState.iterator(); it.hasNext(); ) { it.advance(); DiscoveryNode node = it.key(); long version = it.value(); @@ -122,12 +126,15 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { if (version != -1) { numberOfAllocationsFound++; if (highestVersion == -1) { - nodeWithHighestVersion = node; + nodesWithHighestVersion.add(node); highestVersion = version; } else { if (version > highestVersion) { - nodeWithHighestVersion = node; + nodesWithHighestVersion.clear(); + nodesWithHighestVersion.add(node); highestVersion = version; + } else if (version == highestVersion) { + nodesWithHighestVersion.add(node); } } } @@ -161,24 +168,50 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { continue; } - RoutingNode node = routingNodes.node(nodeWithHighestVersion.id()); - // check if we need to throttle, NOTE, we don't check on NO since it does not apply - // since this is our master data! - if (nodeAllocations.canAllocate(shard, node, allocation) == NodeAllocation.Decision.THROTTLE) { + Set throttledNodes = Sets.newHashSet(); + Set noNodes = Sets.newHashSet(); + for (DiscoveryNode discoNode : nodesWithHighestVersion) { + RoutingNode node = routingNodes.node(discoNode.id()); + Decision decision = nodeAllocations.canAllocate(shard, node, allocation); + if (decision == NodeAllocation.Decision.THROTTLE) { + throttledNodes.add(discoNode); + } else if (decision == Decision.NO) { + noNodes.add(discoNode); + } else { + if (logger.isDebugEnabled()) { + logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, discoNode); + } + // we found a match + changed = true; + node.add(shard); + unassignedIterator.remove(); + + // found a node, so no throttling, no "no", and break out of the loop + throttledNodes.clear(); + noNodes.clear(); + break; + } + } + if (throttledNodes.isEmpty()) { + // if we have a node that we "can't" allocate to, force allocation, since this is our master data! + if (!noNodes.isEmpty()) { + DiscoveryNode discoNode = noNodes.iterator().next(); + RoutingNode node = routingNodes.node(discoNode.id()); + if (logger.isDebugEnabled()) { + logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, discoNode); + } + // we found a match + changed = true; + node.add(shard); + unassignedIterator.remove(); + } + } else { if (logger.isDebugEnabled()) { - logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodeWithHighestVersion); + logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, throttledNodes); } // we are throttling this, but we have enough to allocate to this node, ignore it for now unassignedIterator.remove(); routingNodes.ignoredUnassigned().add(shard); - } else { - if (logger.isDebugEnabled()) { - logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodeWithHighestVersion); - } - // we found a match - changed = true; - node.add(shard); - unassignedIterator.remove(); } } @@ -301,7 +334,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { nodeIds = nodes.dataNodes().keySet(); } else { // clean nodes that have failed - for (TObjectLongIterator it = shardStates.iterator(); it.hasNext();) { + for (TObjectLongIterator it = shardStates.iterator(); it.hasNext(); ) { it.advance(); if (!nodes.nodeExists(it.key().id())) { it.remove(); @@ -351,7 +384,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { } else { nodesIds = Sets.newHashSet(); // clean nodes that have failed - for (Iterator it = shardStores.keySet().iterator(); it.hasNext();) { + for (Iterator it = shardStores.keySet().iterator(); it.hasNext(); ) { DiscoveryNode node = it.next(); if (!nodes.nodeExists(node.id())) { it.remove();