improve allocation decision to allow to explain why a decision has been made

building the infra to support explaining why an allocation decision has been made, for example, why a shard is not allocated on a specific node
This commit is contained in:
Shay Banon 2012-10-16 09:03:54 -04:00
parent c350dcd9cf
commit c2073c343d
8 changed files with 148 additions and 80 deletions

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -77,7 +78,8 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard
lastNode = 0; lastNode = 0;
} }
if (allocation.deciders().canAllocate(shard, node, allocation).allocate()) { Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.YES) {
int numberOfShardsToAllocate = routingNodes.requiredAverageNumberOfShardsPerNode() - node.shards().size(); int numberOfShardsToAllocate = routingNodes.requiredAverageNumberOfShardsPerNode() - node.shards().size();
if (numberOfShardsToAllocate <= 0) { if (numberOfShardsToAllocate <= 0) {
continue; continue;
@ -96,7 +98,8 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard
MutableShardRouting shard = it.next(); MutableShardRouting shard = it.next();
// go over the nodes and try and allocate the remaining ones // go over the nodes and try and allocate the remaining ones
for (RoutingNode routingNode : sortedNodesLeastToHigh(allocation)) { for (RoutingNode routingNode : sortedNodesLeastToHigh(allocation)) {
if (allocation.deciders().canAllocate(shard, routingNode, allocation).allocate()) { Decision decision = allocation.deciders().canAllocate(shard, routingNode, allocation);
if (decision.type() == Decision.Type.YES) {
changed = true; changed = true;
routingNode.add(shard); routingNode.add(shard);
it.remove(); it.remove();
@ -142,7 +145,8 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard
continue; continue;
} }
if (allocation.deciders().canAllocate(startedShard, lowRoutingNode, allocation).allocate()) { Decision decision = allocation.deciders().canAllocate(startedShard, lowRoutingNode, allocation);
if (decision.type() == Decision.Type.YES) {
changed = true; changed = true;
lowRoutingNode.add(new MutableShardRouting(startedShard.index(), startedShard.id(), lowRoutingNode.add(new MutableShardRouting(startedShard.index(), startedShard.id(),
lowRoutingNode.nodeId(), startedShard.currentNodeId(), lowRoutingNode.nodeId(), startedShard.currentNodeId(),
@ -179,7 +183,8 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard
if (nodeToCheck.nodeId().equals(node.nodeId())) { if (nodeToCheck.nodeId().equals(node.nodeId())) {
continue; continue;
} }
if (allocation.deciders().canAllocate(shardRouting, nodeToCheck, allocation).allocate()) { Decision decision = allocation.deciders().canAllocate(shardRouting, nodeToCheck, allocation);
if (decision.type() == Decision.Type.YES) {
nodeToCheck.add(new MutableShardRouting(shardRouting.index(), shardRouting.id(), nodeToCheck.add(new MutableShardRouting(shardRouting.index(), shardRouting.id(),
nodeToCheck.nodeId(), shardRouting.currentNodeId(), nodeToCheck.nodeId(), shardRouting.currentNodeId(),
shardRouting.primary(), INITIALIZING, shardRouting.version() + 1)); shardRouting.primary(), INITIALIZING, shardRouting.version() + 1));

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
@ -160,8 +161,9 @@ public class AllocateAllocationCommand implements AllocationCommand {
} }
RoutingNode routingNode = allocation.routingNodes().node(discoNode.id()); RoutingNode routingNode = allocation.routingNodes().node(discoNode.id());
if (!allocation.deciders().canAllocate(shardRouting, routingNode, allocation).allowed()) { Decision decision = allocation.deciders().canAllocate(shardRouting, routingNode, allocation);
throw new ElasticSearchIllegalArgumentException("[allocate] allocation of " + shardId + " on node " + discoNode + " is not allowed"); if (decision.type() == Decision.Type.NO) {
throw new ElasticSearchIllegalArgumentException("[allocate] allocation of " + shardId + " on node " + discoNode + " is not allowed, reason: " + decision);
} }
// go over and remove it from the unassigned // go over and remove it from the unassigned
for (Iterator<MutableShardRouting> it = allocation.routingNodes().unassigned().iterator(); it.hasNext(); ) { for (Iterator<MutableShardRouting> it = allocation.routingNodes().unassigned().iterator(); it.hasNext(); ) {

View File

@ -27,7 +27,7 @@ import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
@ -158,11 +158,11 @@ public class MoveAllocationCommand implements AllocationCommand {
} }
RoutingNode toRoutingNode = allocation.routingNodes().node(toDiscoNode.id()); RoutingNode toRoutingNode = allocation.routingNodes().node(toDiscoNode.id());
AllocationDecider.Decision decision = allocation.deciders().canAllocate(shardRouting, toRoutingNode, allocation); Decision decision = allocation.deciders().canAllocate(shardRouting, toRoutingNode, allocation);
if (!decision.allowed()) { if (decision.type() == Decision.Type.NO) {
throw new ElasticSearchIllegalArgumentException("[move_allocation] can't move " + shardId + ", from " + fromDiscoNode + ", to " + toDiscoNode + ", since its not allowed"); throw new ElasticSearchIllegalArgumentException("[move_allocation] can't move " + shardId + ", from " + fromDiscoNode + ", to " + toDiscoNode + ", since its not allowed, reason: " + decision);
} }
if (!decision.allocate()) { if (decision.type() == Decision.Type.THROTTLE) {
// its being throttled, maybe have a flag to take it into account and fail? for now, just do it since the "user" wants it... // its being throttled, maybe have a flag to take it into account and fail? for now, just do it since the "user" wants it...
} }

View File

@ -30,54 +30,6 @@ import org.elasticsearch.common.settings.Settings;
*/ */
public abstract class AllocationDecider extends AbstractComponent { public abstract class AllocationDecider extends AbstractComponent {
public static enum Decision {
YES {
@Override
public boolean allocate() {
return true;
}
@Override
public boolean allowed() {
return true;
}
},
NO {
@Override
public boolean allocate() {
return false;
}
@Override
public boolean allowed() {
return false;
}
},
THROTTLE {
@Override
public boolean allocate() {
return false;
}
@Override
public boolean allowed() {
return true;
}
};
/**
* It can be allocated *now* on a node. Note, it might be {@link #allowed()} to be allocated
* on a node, yet, allocate will be <tt>false</tt> since its being throttled for example.
*/
public abstract boolean allocate();
/**
* Is allocation allowed on a node. Note, this does not mean that we should allocate *now*,
* though, in extreme cases, we might "force" allocation.
*/
public abstract boolean allowed();
}
protected AllocationDecider(Settings settings) { protected AllocationDecider(Settings settings) {
super(settings); super(settings);
} }
@ -87,7 +39,7 @@ public abstract class AllocationDecider extends AbstractComponent {
} }
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return Decision.YES; return Decision.ALWAYS;
} }
/** /**

View File

@ -70,18 +70,16 @@ public class AllocationDeciders extends AllocationDecider {
@Override @Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
Decision ret = Decision.YES;
// first, check if its in the ignored, if so, return NO
if (allocation.shouldIgnoreShardForNode(shardRouting.shardId(), node.nodeId())) { if (allocation.shouldIgnoreShardForNode(shardRouting.shardId(), node.nodeId())) {
return Decision.NO; return Decision.NO;
} }
// now, go over the registered allocations Decision.Multi ret = new Decision.Multi();
for (AllocationDecider allocation1 : allocations) { for (AllocationDecider allocationDecider : allocations) {
Decision decision = allocation1.canAllocate(shardRouting, node, allocation); Decision decision = allocationDecider.canAllocate(shardRouting, node, allocation);
if (decision == Decision.NO) { // the assumption is that a decider that returns the static instance Decision#ALWAYS
return Decision.NO; // does not really implements canAllocate
} else if (decision == Decision.THROTTLE) { if (decision != Decision.ALWAYS) {
ret = Decision.THROTTLE; ret.add(decision);
} }
} }
return ret; return ret;

View File

@ -0,0 +1,107 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.decider;
import com.google.common.collect.Lists;
import java.util.List;
/**
*/
public abstract class Decision {
public static final Decision ALWAYS = new Single(Type.YES);
public static final Decision YES = new Single(Type.YES);
public static final Decision NO = new Single(Type.NO);
public static final Decision THROTTLE = new Single(Type.THROTTLE);
public static Decision single(Type type, String explanation, Object... explanationParams) {
return new Single(type, explanation, explanationParams);
}
public static enum Type {
YES,
NO,
THROTTLE
}
public abstract Type type();
public static class Single extends Decision {
private final Type type;
private final String explanation;
private final Object[] explanationParams;
public Single(Type type) {
this(type, null, (Object[]) null);
}
public Single(Type type, String explanation, Object... explanationParams) {
this.type = type;
this.explanation = explanation;
this.explanationParams = explanationParams;
}
public Type type() {
return this.type;
}
@Override
public String toString() {
if (explanation == null) {
return type + "()";
}
return type + "(" + String.format(explanation, explanationParams) + ")";
}
}
public static class Multi extends Decision {
private final List<Decision> decisions = Lists.newArrayList();
public Multi add(Decision decision) {
decisions.add(decision);
return this;
}
@Override
public Type type() {
Type ret = Type.YES;
for (int i = 0; i < decisions.size(); i++) {
Type type = decisions.get(i).type();
if (type == Type.NO) {
return type;
} else if (type == Type.THROTTLE) {
ret = type;
}
}
return ret;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
for (Decision decision : decisions) {
sb.append("[").append(decision.toString()).append("]");
}
return sb.toString();
}
}
}

View File

@ -32,7 +32,7 @@ import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -119,7 +119,8 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme
continue; continue;
} }
// if its THROTTLING, we are not going to allocate it to this node, so ignore it as well // if its THROTTLING, we are not going to allocate it to this node, so ignore it as well
if (allocation.deciders().canAllocate(shard, node, allocation).allocate()) { Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.YES) {
canBeAllocatedToAtLeastOneNode = true; canBeAllocatedToAtLeastOneNode = true;
break; break;
} }
@ -153,7 +154,7 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme
// check if we can allocate on that node... // check if we can allocate on that node...
// we only check for NO, since if this node is THROTTLING and it has enough "same data" // we only check for NO, since if this node is THROTTLING and it has enough "same data"
// then we will try and assign it next time // then we will try and assign it next time
if (allocation.deciders().canAllocate(shard, node, allocation) == AllocationDecider.Decision.NO) { if (allocation.deciders().canAllocate(shard, node, allocation).type() == Decision.Type.NO) {
continue; continue;
} }
@ -236,7 +237,7 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme
} }
if (lastNodeMatched != null) { if (lastNodeMatched != null) {
if (allocation.deciders().canAllocate(shard, lastNodeMatched, allocation) == AllocationDecider.Decision.THROTTLE) { if (allocation.deciders().canAllocate(shard, lastNodeMatched, allocation).type() == Decision.Type.THROTTLE) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched)); logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
} }

View File

@ -35,7 +35,7 @@ import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -195,10 +195,10 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
Set<DiscoveryNode> noNodes = Sets.newHashSet(); Set<DiscoveryNode> noNodes = Sets.newHashSet();
for (DiscoveryNode discoNode : nodesWithHighestVersion) { for (DiscoveryNode discoNode : nodesWithHighestVersion) {
RoutingNode node = routingNodes.node(discoNode.id()); RoutingNode node = routingNodes.node(discoNode.id());
AllocationDecider.Decision decision = allocation.deciders().canAllocate(shard, node, allocation); Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision == AllocationDecider.Decision.THROTTLE) { if (decision.type() == Decision.Type.THROTTLE) {
throttledNodes.add(discoNode); throttledNodes.add(discoNode);
} else if (decision == AllocationDecider.Decision.NO) { } else if (decision.type() == Decision.Type.NO) {
noNodes.add(discoNode); noNodes.add(discoNode);
} else { } else {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
@ -258,7 +258,8 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
} }
// if we can't allocate it on a node, ignore it, for example, this handles // if we can't allocate it on a node, ignore it, for example, this handles
// cases for only allocating a replica after a primary // cases for only allocating a replica after a primary
if (allocation.deciders().canAllocate(shard, node, allocation).allocate()) { Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.YES) {
canBeAllocatedToAtLeastOneNode = true; canBeAllocatedToAtLeastOneNode = true;
break; break;
} }
@ -292,7 +293,8 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
// check if we can allocate on that node... // check if we can allocate on that node...
// we only check for NO, since if this node is THROTTLING and it has enough "same data" // we only check for NO, since if this node is THROTTLING and it has enough "same data"
// then we will try and assign it next time // then we will try and assign it next time
if (allocation.deciders().canAllocate(shard, node, allocation) == AllocationDecider.Decision.NO) { Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.NO) {
continue; continue;
} }
@ -328,7 +330,8 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
if (lastNodeMatched != null) { if (lastNodeMatched != null) {
// we only check on THROTTLE since we checked before before on NO // we only check on THROTTLE since we checked before before on NO
if (allocation.deciders().canAllocate(shard, lastNodeMatched, allocation) == AllocationDecider.Decision.THROTTLE) { Decision decision = allocation.deciders().canAllocate(shard, lastNodeMatched, allocation);
if (decision.type() == Decision.Type.THROTTLE) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched)); logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
} }