From c2073c343df39afbe10db22492aaefa23787e4a8 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Tue, 16 Oct 2012 09:03:54 -0400 Subject: [PATCH] improve allocation decision to allow to explain why a decision has been made building the infra to support explaining why an allocation decision has been made, for example, why a shard is not allocated on a specific node --- .../allocator/EvenShardsCountAllocator.java | 13 ++- .../command/AllocateAllocationCommand.java | 6 +- .../command/MoveAllocationCommand.java | 10 +- .../allocation/decider/AllocationDecider.java | 50 +------- .../decider/AllocationDeciders.java | 16 ++- .../routing/allocation/decider/Decision.java | 107 ++++++++++++++++++ .../BlobReuseExistingGatewayAllocator.java | 9 +- .../gateway/local/LocalGatewayAllocator.java | 17 +-- 8 files changed, 148 insertions(+), 80 deletions(-) create mode 100644 src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/EvenShardsCountAllocator.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/EvenShardsCountAllocator.java index 711f8a2472f..8561408814f 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/EvenShardsCountAllocator.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/EvenShardsCountAllocator.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -77,7 +78,8 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard lastNode = 0; } - if (allocation.deciders().canAllocate(shard, node, allocation).allocate()) { + Decision decision = allocation.deciders().canAllocate(shard, node, allocation); + if (decision.type() == Decision.Type.YES) { int numberOfShardsToAllocate = routingNodes.requiredAverageNumberOfShardsPerNode() - node.shards().size(); if (numberOfShardsToAllocate <= 0) { continue; @@ -96,7 +98,8 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard MutableShardRouting shard = it.next(); // go over the nodes and try and allocate the remaining ones for (RoutingNode routingNode : sortedNodesLeastToHigh(allocation)) { - if (allocation.deciders().canAllocate(shard, routingNode, allocation).allocate()) { + Decision decision = allocation.deciders().canAllocate(shard, routingNode, allocation); + if (decision.type() == Decision.Type.YES) { changed = true; routingNode.add(shard); it.remove(); @@ -142,7 +145,8 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard continue; } - if (allocation.deciders().canAllocate(startedShard, lowRoutingNode, allocation).allocate()) { + Decision decision = allocation.deciders().canAllocate(startedShard, lowRoutingNode, allocation); + if (decision.type() == Decision.Type.YES) { changed = true; lowRoutingNode.add(new MutableShardRouting(startedShard.index(), startedShard.id(), lowRoutingNode.nodeId(), startedShard.currentNodeId(), @@ -179,7 +183,8 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard if (nodeToCheck.nodeId().equals(node.nodeId())) { continue; } - if (allocation.deciders().canAllocate(shardRouting, nodeToCheck, allocation).allocate()) { + Decision decision = allocation.deciders().canAllocate(shardRouting, nodeToCheck, allocation); + if (decision.type() == Decision.Type.YES) { nodeToCheck.add(new MutableShardRouting(shardRouting.index(), shardRouting.id(), nodeToCheck.nodeId(), shardRouting.currentNodeId(), shardRouting.primary(), INITIALIZING, shardRouting.version() + 1)); diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java index 649cd6e5af9..7e94bfe4448 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ToXContent; @@ -160,8 +161,9 @@ public class AllocateAllocationCommand implements AllocationCommand { } RoutingNode routingNode = allocation.routingNodes().node(discoNode.id()); - if (!allocation.deciders().canAllocate(shardRouting, routingNode, allocation).allowed()) { - throw new ElasticSearchIllegalArgumentException("[allocate] allocation of " + shardId + " on node " + discoNode + " is not allowed"); + Decision decision = allocation.deciders().canAllocate(shardRouting, routingNode, allocation); + if (decision.type() == Decision.Type.NO) { + throw new ElasticSearchIllegalArgumentException("[allocate] allocation of " + shardId + " on node " + discoNode + " is not allowed, reason: " + decision); } // go over and remove it from the unassigned for (Iterator it = allocation.routingNodes().unassigned().iterator(); it.hasNext(); ) { diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java index 5676d466167..6432180625d 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java @@ -27,7 +27,7 @@ import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ToXContent; @@ -158,11 +158,11 @@ public class MoveAllocationCommand implements AllocationCommand { } RoutingNode toRoutingNode = allocation.routingNodes().node(toDiscoNode.id()); - AllocationDecider.Decision decision = allocation.deciders().canAllocate(shardRouting, toRoutingNode, allocation); - if (!decision.allowed()) { - throw new ElasticSearchIllegalArgumentException("[move_allocation] can't move " + shardId + ", from " + fromDiscoNode + ", to " + toDiscoNode + ", since its not allowed"); + Decision decision = allocation.deciders().canAllocate(shardRouting, toRoutingNode, allocation); + if (decision.type() == Decision.Type.NO) { + throw new ElasticSearchIllegalArgumentException("[move_allocation] can't move " + shardId + ", from " + fromDiscoNode + ", to " + toDiscoNode + ", since its not allowed, reason: " + decision); } - if (!decision.allocate()) { + if (decision.type() == Decision.Type.THROTTLE) { // its being throttled, maybe have a flag to take it into account and fail? for now, just do it since the "user" wants it... } diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java index cab2c5f19b2..34752c13679 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java @@ -30,54 +30,6 @@ import org.elasticsearch.common.settings.Settings; */ public abstract class AllocationDecider extends AbstractComponent { - public static enum Decision { - YES { - @Override - public boolean allocate() { - return true; - } - - @Override - public boolean allowed() { - return true; - } - }, - NO { - @Override - public boolean allocate() { - return false; - } - - @Override - public boolean allowed() { - return false; - } - }, - THROTTLE { - @Override - public boolean allocate() { - return false; - } - - @Override - public boolean allowed() { - return true; - } - }; - - /** - * It can be allocated *now* on a node. Note, it might be {@link #allowed()} to be allocated - * on a node, yet, allocate will be false since its being throttled for example. - */ - public abstract boolean allocate(); - - /** - * Is allocation allowed on a node. Note, this does not mean that we should allocate *now*, - * though, in extreme cases, we might "force" allocation. - */ - public abstract boolean allowed(); - } - protected AllocationDecider(Settings settings) { super(settings); } @@ -87,7 +39,7 @@ public abstract class AllocationDecider extends AbstractComponent { } public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - return Decision.YES; + return Decision.ALWAYS; } /** diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java index 72a7b6f8b31..b8a71e1eb6a 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java @@ -70,18 +70,16 @@ public class AllocationDeciders extends AllocationDecider { @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - Decision ret = Decision.YES; - // first, check if its in the ignored, if so, return NO if (allocation.shouldIgnoreShardForNode(shardRouting.shardId(), node.nodeId())) { return Decision.NO; } - // now, go over the registered allocations - for (AllocationDecider allocation1 : allocations) { - Decision decision = allocation1.canAllocate(shardRouting, node, allocation); - if (decision == Decision.NO) { - return Decision.NO; - } else if (decision == Decision.THROTTLE) { - ret = Decision.THROTTLE; + Decision.Multi ret = new Decision.Multi(); + for (AllocationDecider allocationDecider : allocations) { + Decision decision = allocationDecider.canAllocate(shardRouting, node, allocation); + // the assumption is that a decider that returns the static instance Decision#ALWAYS + // does not really implements canAllocate + if (decision != Decision.ALWAYS) { + ret.add(decision); } } return ret; diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java new file mode 100644 index 00000000000..388fdd64af2 --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java @@ -0,0 +1,107 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation.decider; + +import com.google.common.collect.Lists; + +import java.util.List; + +/** + */ +public abstract class Decision { + + public static final Decision ALWAYS = new Single(Type.YES); + public static final Decision YES = new Single(Type.YES); + public static final Decision NO = new Single(Type.NO); + public static final Decision THROTTLE = new Single(Type.THROTTLE); + + public static Decision single(Type type, String explanation, Object... explanationParams) { + return new Single(type, explanation, explanationParams); + } + + public static enum Type { + YES, + NO, + THROTTLE + } + + public abstract Type type(); + + public static class Single extends Decision { + private final Type type; + private final String explanation; + private final Object[] explanationParams; + + public Single(Type type) { + this(type, null, (Object[]) null); + } + + public Single(Type type, String explanation, Object... explanationParams) { + this.type = type; + this.explanation = explanation; + this.explanationParams = explanationParams; + } + + public Type type() { + return this.type; + } + + @Override + public String toString() { + if (explanation == null) { + return type + "()"; + } + return type + "(" + String.format(explanation, explanationParams) + ")"; + } + } + + public static class Multi extends Decision { + + private final List decisions = Lists.newArrayList(); + + public Multi add(Decision decision) { + decisions.add(decision); + return this; + } + + @Override + public Type type() { + Type ret = Type.YES; + for (int i = 0; i < decisions.size(); i++) { + Type type = decisions.get(i).type(); + if (type == Type.NO) { + return type; + } else if (type == Type.THROTTLE) { + ret = type; + } + } + return ret; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + for (Decision decision : decisions) { + sb.append("[").append(decision.toString()).append("]"); + } + return sb.toString(); + } + } +} diff --git a/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingGatewayAllocator.java b/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingGatewayAllocator.java index 2b3f79cfe91..a36a8eb3479 100644 --- a/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingGatewayAllocator.java +++ b/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingGatewayAllocator.java @@ -32,7 +32,7 @@ import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator; -import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -119,7 +119,8 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme continue; } // if its THROTTLING, we are not going to allocate it to this node, so ignore it as well - if (allocation.deciders().canAllocate(shard, node, allocation).allocate()) { + Decision decision = allocation.deciders().canAllocate(shard, node, allocation); + if (decision.type() == Decision.Type.YES) { canBeAllocatedToAtLeastOneNode = true; break; } @@ -153,7 +154,7 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme // check if we can allocate on that node... // we only check for NO, since if this node is THROTTLING and it has enough "same data" // then we will try and assign it next time - if (allocation.deciders().canAllocate(shard, node, allocation) == AllocationDecider.Decision.NO) { + if (allocation.deciders().canAllocate(shard, node, allocation).type() == Decision.Type.NO) { continue; } @@ -236,7 +237,7 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme } if (lastNodeMatched != null) { - if (allocation.deciders().canAllocate(shard, lastNodeMatched, allocation) == AllocationDecider.Decision.THROTTLE) { + if (allocation.deciders().canAllocate(shard, lastNodeMatched, allocation).type() == Decision.Type.THROTTLE) { if (logger.isTraceEnabled()) { logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched)); } diff --git a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java b/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java index 20e67f052d2..9d65b72a0ff 100644 --- a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java +++ b/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java @@ -35,7 +35,7 @@ import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator; -import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -195,10 +195,10 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA Set noNodes = Sets.newHashSet(); for (DiscoveryNode discoNode : nodesWithHighestVersion) { RoutingNode node = routingNodes.node(discoNode.id()); - AllocationDecider.Decision decision = allocation.deciders().canAllocate(shard, node, allocation); - if (decision == AllocationDecider.Decision.THROTTLE) { + Decision decision = allocation.deciders().canAllocate(shard, node, allocation); + if (decision.type() == Decision.Type.THROTTLE) { throttledNodes.add(discoNode); - } else if (decision == AllocationDecider.Decision.NO) { + } else if (decision.type() == Decision.Type.NO) { noNodes.add(discoNode); } else { if (logger.isDebugEnabled()) { @@ -258,7 +258,8 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA } // if we can't allocate it on a node, ignore it, for example, this handles // cases for only allocating a replica after a primary - if (allocation.deciders().canAllocate(shard, node, allocation).allocate()) { + Decision decision = allocation.deciders().canAllocate(shard, node, allocation); + if (decision.type() == Decision.Type.YES) { canBeAllocatedToAtLeastOneNode = true; break; } @@ -292,7 +293,8 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA // check if we can allocate on that node... // we only check for NO, since if this node is THROTTLING and it has enough "same data" // then we will try and assign it next time - if (allocation.deciders().canAllocate(shard, node, allocation) == AllocationDecider.Decision.NO) { + Decision decision = allocation.deciders().canAllocate(shard, node, allocation); + if (decision.type() == Decision.Type.NO) { continue; } @@ -328,7 +330,8 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA if (lastNodeMatched != null) { // we only check on THROTTLE since we checked before before on NO - if (allocation.deciders().canAllocate(shard, lastNodeMatched, allocation) == AllocationDecider.Decision.THROTTLE) { + Decision decision = allocation.deciders().canAllocate(shard, lastNodeMatched, allocation); + if (decision.type() == Decision.Type.THROTTLE) { if (logger.isTraceEnabled()) { logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched)); }