From 7592862646bbbcd80f26c64b9b039ca39c0313e2 Mon Sep 17 00:00:00 2001 From: kimchy Date: Sun, 22 Aug 2010 01:00:37 +0300 Subject: [PATCH] add a callback for allocation as well --- .../cluster/routing/RoutingNodes.java | 8 ++++++- .../cluster/routing/RoutingTable.java | 5 +++-- .../routing/allocation/NodeAllocation.java | 3 +++ .../routing/allocation/NodeAllocations.java | 9 ++++++++ ...ferUnallocatedShardUnassignedStrategy.java | 21 +++++++++++++------ .../allocation/PreferUnallocatedStrategy.java | 1 + ...plicaAfterPrimaryActiveNodeAllocation.java | 5 +++++ .../allocation/SameShardNodeAllocation.java | 5 +++++ .../routing/allocation/ShardsAllocation.java | 1 + 9 files changed, 49 insertions(+), 9 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 386bed7c515..f43d35cfd71 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -28,7 +28,7 @@ import static org.elasticsearch.common.collect.Lists.*; import static org.elasticsearch.common.collect.Maps.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ @NotThreadSafe public class RoutingNodes implements Iterable { @@ -41,6 +41,8 @@ public class RoutingNodes implements Iterable { private final List unassigned = newArrayList(); + private final List ignoredUnassigned = newArrayList(); + public RoutingNodes(MetaData metaData, RoutingTable routingTable) { this.metaData = metaData; this.routingTable = routingTable; @@ -106,6 +108,10 @@ public class RoutingNodes implements Iterable { return !unassigned.isEmpty(); } + public List ignoredUnassigned() { + return this.ignoredUnassigned; + } + public List unassigned() { return this.unassigned; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index d46f749e236..c6a2fa9d8f5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.collect.Iterables; import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.collect.UnmodifiableIterator; import org.elasticsearch.common.io.stream.StreamInput; @@ -37,7 +38,7 @@ import static org.elasticsearch.common.collect.Lists.*; import static org.elasticsearch.common.collect.Maps.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ @Immutable public class RoutingTable implements Iterable { @@ -232,7 +233,7 @@ public class RoutingTable implements Iterable { indexBuilder.addShard(new ImmutableShardRouting(shardRoutingEntry)); } } - for (MutableShardRouting shardRoutingEntry : routingNodes.unassigned()) { + for (MutableShardRouting shardRoutingEntry : Iterables.concat(routingNodes.unassigned(), routingNodes.ignoredUnassigned())) { String index = shardRoutingEntry.index(); IndexRoutingTable.Builder indexBuilder = indexRoutingTableBuilders.get(index); if (indexBuilder == null) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocation.java index c4906a51880..704c99665fa 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocation.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.routing.allocation; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; @@ -47,5 +48,7 @@ public interface NodeAllocation { abstract boolean allocate(); } + boolean allocate(RoutingNodes routingNodes, DiscoveryNodes nodes); + Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java index bb26f647f2d..fb185f74001 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.routing.allocation; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; @@ -51,6 +52,14 @@ public class NodeAllocations extends AbstractComponent implements NodeAllocation this.allocations = allocations.toArray(new NodeAllocation[allocations.size()]); } + @Override public boolean allocate(RoutingNodes routingNodes, DiscoveryNodes nodes) { + boolean changed = false; + for (NodeAllocation allocation : allocations) { + changed |= allocation.allocate(routingNodes, nodes); + } + return changed; + } + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) { Decision ret = Decision.YES; for (NodeAllocation allocation : allocations) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/PreferUnallocatedShardUnassignedStrategy.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/PreferUnallocatedShardUnassignedStrategy.java index 2dbc5cd64f5..756db6b81a3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/PreferUnallocatedShardUnassignedStrategy.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/PreferUnallocatedShardUnassignedStrategy.java @@ -227,13 +227,22 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent } if (lastNodeMatched != null) { - if (logger.isDebugEnabled()) { - logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched)); + if (nodeAllocations.canAllocate(shard, lastNodeMatched, routingNodes) == NodeAllocation.Decision.THROTTLE) { + if (logger.isTraceEnabled()) { + logger.trace("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched)); + } + // we are throttling this, but we have enough to allocate to this node, ignore it for now + unassignedIterator.remove(); + routingNodes.ignoredUnassigned().add(shard); + } else { + if (logger.isDebugEnabled()) { + logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched)); + } + // we found a match + changed = true; + lastNodeMatched.add(shard); + unassignedIterator.remove(); } - // we found a match - changed = true; - lastNodeMatched.add(shard); - unassignedIterator.remove(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/PreferUnallocatedStrategy.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/PreferUnallocatedStrategy.java index 1d41c6e1a38..bdee46487fe 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/PreferUnallocatedStrategy.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/PreferUnallocatedStrategy.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.routing.RoutingNodes; /** * @author kimchy (shay.banon) */ +// TODO move this to be a NodeAllocation (once we remove the md5 and make listing fast for Unassigned impl) public interface PreferUnallocatedStrategy { void prefetch(IndexMetaData index, DiscoveryNodes nodes); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ReplicaAfterPrimaryActiveNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ReplicaAfterPrimaryActiveNodeAllocation.java index a3a4142ccff..9e0515d56e9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ReplicaAfterPrimaryActiveNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ReplicaAfterPrimaryActiveNodeAllocation.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.routing.allocation; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; @@ -38,6 +39,10 @@ public class ReplicaAfterPrimaryActiveNodeAllocation extends AbstractComponent i super(settings); } + @Override public boolean allocate(RoutingNodes routingNodes, DiscoveryNodes nodes) { + return false; + } + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) { if (shardRouting.primary()) { return Decision.YES; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/SameShardNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/SameShardNodeAllocation.java index 31cb98418ea..a9d1179a9dd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/SameShardNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/SameShardNodeAllocation.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.routing.allocation; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; @@ -38,6 +39,10 @@ public class SameShardNodeAllocation extends AbstractComponent implements NodeAl super(settings); } + @Override public boolean allocate(RoutingNodes routingNodes, DiscoveryNodes nodes) { + return false; + } + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) { for (MutableShardRouting current : node.shards()) { // we do not allow for two shards of the same shard id to exists on the same node diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java index b8c247cb127..4a772e77139 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java @@ -125,6 +125,7 @@ public class ShardsAllocation extends AbstractComponent { if (preferUnallocatedStrategy != null) { changed |= preferUnallocatedStrategy.allocateUnassigned(routingNodes, nodes); } + changed |= nodeAllocations.allocate(routingNodes, nodes); changed |= allocateUnassigned(routingNodes); // elect primaries again, in case this is needed with unassigned allocation changed |= electPrimaries(routingNodes);