Add explanations for all AllocationDeciders

Relates to #4380
Relates to #2483
This commit is contained in:
Lee Hinman 2014-01-28 15:22:27 -07:00
parent d68d8fbf11
commit 5448477c54
19 changed files with 213 additions and 96 deletions

View File

@ -109,7 +109,7 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA
@Override
public ClusterState execute(ClusterState currentState) {
RoutingAllocation.Result routingResult = allocationService.reroute(currentState, request.commands);
RoutingAllocation.Result routingResult = allocationService.reroute(currentState, request.commands, true);
ClusterState newState = ClusterState.builder(currentState).routingResult(routingResult).build();
clusterStateToSend = newState;
if (request.dryRun) {

View File

@ -153,4 +153,34 @@ public class DiscoveryNodeFilters {
return true;
}
}
/**
* Generates a human-readable string for the DiscoverNodeFilters.
* Example: {@code _id:"id1 OR blah",name:"blah OR name2"}
*/
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
int entryCount = filters.size();
for (Map.Entry<String, String[]> entry : filters.entrySet()) {
String attr = entry.getKey();
String[] values = entry.getValue();
sb.append(attr);
sb.append(":\"");
int valueCount = values.length;
for (String value : values) {
sb.append(value);
if (valueCount > 1) {
sb.append(" " + opType.toString() + " ");
}
valueCount--;
}
sb.append("\"");
if (entryCount > 1) {
sb.append(",");
}
entryCount--;
}
return sb.toString();
}
}

View File

@ -114,12 +114,17 @@ public class AllocationService extends AbstractComponent {
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation());
}
public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands) throws ElasticsearchException {
public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands) {
return reroute(clusterState, commands, false);
}
public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands, boolean debug) throws ElasticsearchException {
RoutingNodes routingNodes = clusterState.routingNodes();
// we don't shuffle the unassigned shards here, to try and get as close as possible to
// a consistent result of the effect the commands have on the routing
// this allows systems to dry run the commands, see the resulting cluster state, and act on it
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo());
allocation.debugDecision(debug);
// we ignore disable allocation, because commands are explicit
allocation.ignoreDisable(true);
commands.execute(allocation);
@ -137,10 +142,20 @@ public class AllocationService extends AbstractComponent {
* <p>If the same instance of the routing table is returned, then no change has been made.
*/
public RoutingAllocation.Result reroute(ClusterState clusterState) {
return reroute(clusterState, false);
}
/**
* Reroutes the routing table based on the live nodes.
* <p/>
* <p>If the same instance of the routing table is returned, then no change has been made.
*/
public RoutingAllocation.Result reroute(ClusterState clusterState, boolean debug) {
RoutingNodes routingNodes = clusterState.routingNodes();
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo());
allocation.debugDecision(debug);
if (!reroute(allocation)) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
}
@ -153,10 +168,20 @@ public class AllocationService extends AbstractComponent {
* them.
*/
public RoutingAllocation.Result rerouteWithNoReassign(ClusterState clusterState) {
return rerouteWithNoReassign(clusterState, false);
}
/**
* Only handles reroute but *without* any reassignment of unassigned shards or rebalancing. Does
* make sure to handle removed nodes, but only moved the shards to UNASSIGNED, does not reassign
* them.
*/
public RoutingAllocation.Result rerouteWithNoReassign(ClusterState clusterState, boolean debug) {
RoutingNodes routingNodes = clusterState.routingNodes();
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo());
allocation.debugDecision(debug);
boolean changed = false;
// first, clear from the shards any node id they used to belong to that is now dead
changed |= deassociateDeadNodes(allocation);

View File

@ -25,6 +25,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.index.shard.ShardId;
import java.util.HashMap;
@ -99,6 +100,8 @@ public class RoutingAllocation {
private boolean ignoreDisable = false;
private boolean debugDecision = false;
/**
* Creates a new {@link RoutingAllocation}
*
@ -173,6 +176,14 @@ public class RoutingAllocation {
return this.ignoreDisable;
}
public void debugDecision(boolean debug) {
this.debugDecision = debug;
}
public boolean debugDecision() {
return this.debugDecision;
}
public void addIgnoreShardForNode(ShardId shardId, String nodeId) {
if (ignoredShardToNodes == null) {
ignoredShardToNodes = new HashMap<ShardId, String>();
@ -183,4 +194,16 @@ public class RoutingAllocation {
public boolean shouldIgnoreShardForNode(ShardId shardId, String nodeId) {
return ignoredShardToNodes != null && nodeId.equals(ignoredShardToNodes.get(shardId));
}
/**
* Create a routing decision, including the reason if the debug flag is
* turned on
*/
public Decision decision(Decision decision, String reason, Object... params) {
if (debugDecision()) {
return Decision.single(decision.type(), reason, params);
} else {
return decision;
}
}
}

View File

@ -52,7 +52,11 @@ public class AllocationDeciders extends AllocationDecider {
Decision decision = allocationDecider.canRebalance(shardRouting, allocation);
// short track if a NO is returned.
if (decision == Decision.NO) {
return decision;
if (!allocation.debugDecision()) {
return decision;
} else {
ret.add(decision);
}
} else if (decision != Decision.ALWAYS) {
ret.add(decision);
}
@ -73,7 +77,12 @@ public class AllocationDeciders extends AllocationDecider {
if (logger.isTraceEnabled()) {
logger.trace("Can not allocate [{}] on node [{}] due to [{}]", shardRouting, node.nodeId(), allocationDecider.getClass().getSimpleName());
}
return decision;
// short circuit only if debugging is not enabled
if (!allocation.debugDecision()) {
return decision;
} else {
ret.add(decision);
}
} else if (decision != Decision.ALWAYS) {
// the assumption is that a decider that returns the static instance Decision#ALWAYS
// does not really implements canAllocate
@ -99,7 +108,11 @@ public class AllocationDeciders extends AllocationDecider {
if (logger.isTraceEnabled()) {
logger.trace("Shard [{}] can not remain on node [{}] due to [{}]", shardRouting, node.nodeId(), allocationDecider.getClass().getSimpleName());
}
return decision;
if (!allocation.debugDecision()) {
return decision;
} else {
ret.add(decision);
}
} else if (decision != Decision.ALWAYS) {
ret.add(decision);
}
@ -113,7 +126,11 @@ public class AllocationDeciders extends AllocationDecider {
Decision decision = allocationDecider.canAllocate(shardRouting, allocation);
// short track if a NO is returned.
if (decision == Decision.NO) {
return decision;
if (!allocation.debugDecision()) {
return decision;
} else {
ret.add(decision);
}
} else if (decision != Decision.ALWAYS) {
ret.add(decision);
}
@ -127,7 +144,11 @@ public class AllocationDeciders extends AllocationDecider {
Decision decision = allocationDecider.canAllocate(node, allocation);
// short track if a NO is returned.
if (decision == Decision.NO) {
return decision;
if (!allocation.debugDecision()) {
return decision;
} else {
ret.add(decision);
}
} else if (decision != Decision.ALWAYS) {
ret.add(decision);
}

View File

@ -158,17 +158,17 @@ public class AwarenessAllocationDecider extends AllocationDecider {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, true) ? Decision.YES : Decision.NO;
return underCapacity(shardRouting, node, allocation, true);
}
@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, false) ? Decision.YES : Decision.NO;
return underCapacity(shardRouting, node, allocation, false);
}
private boolean underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) {
private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) {
if (awarenessAttributes.length == 0) {
return true;
return allocation.decision(Decision.YES, "no allocation awareness enabled");
}
IndexMetaData indexMetaData = allocation.metaData().index(shardRouting.index());
@ -176,7 +176,7 @@ public class AwarenessAllocationDecider extends AllocationDecider {
for (String awarenessAttribute : awarenessAttributes) {
// the node the shard exists on must be associated with an awareness attribute
if (!node.node().attributes().containsKey(awarenessAttribute)) {
return false;
return allocation.decision(Decision.NO, "node does not contain awareness attribute: [%s]", awarenessAttribute);
}
// build attr_value -> nodes map
@ -234,7 +234,7 @@ public class AwarenessAllocationDecider extends AllocationDecider {
int currentNodeCount = shardPerAttribute.get(node.node().attributes().get(awarenessAttribute));
// if we are above with leftover, then we know we are not good, even with mod
if (currentNodeCount > (requiredCountPerAttribute + leftoverPerAttribute)) {
return false;
return allocation.decision(Decision.NO, "too many shards on nodes for attribute: [%s]", awarenessAttribute);
}
// all is well, we are below or same as average
if (currentNodeCount <= requiredCountPerAttribute) {
@ -242,6 +242,6 @@ public class AwarenessAllocationDecider extends AllocationDecider {
}
}
return true;
return allocation.decision(Decision.YES, "node meets awareness requirements");
}
}

View File

@ -19,15 +19,11 @@
package org.elasticsearch.cluster.routing.allocation.decider;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import java.util.List;
import java.util.Locale;
/**
@ -91,27 +87,27 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
if (type == ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE) {
// check if there are unassigned primaries.
if ( allocation.routingNodes().hasUnassignedPrimaries() ) {
return Decision.NO;
return allocation.decision(Decision.NO, "cluster has unassigned primary shards");
}
// check if there are initializing primaries that don't have a relocatingNodeId entry.
if ( allocation.routingNodes().hasInactivePrimaries() ) {
return Decision.NO;
return allocation.decision(Decision.NO, "cluster has inactive primary shards");
}
return Decision.YES;
return allocation.decision(Decision.YES, "all primary shards are active");
}
if (type == ClusterRebalanceType.INDICES_ALL_ACTIVE) {
// check if there are unassigned shards.
if ( allocation.routingNodes().hasUnassignedShards() ) {
return Decision.NO;
return allocation.decision(Decision.NO, "cluster has unassigned shards");
}
// in case all indices are assigned, are there initializing shards which
// are not relocating?
if ( allocation.routingNodes().hasInactiveShards() ) {
return Decision.NO;
return allocation.decision(Decision.NO, "cluster has inactive shards");
}
}
// type == Type.ALWAYS
return Decision.YES;
return allocation.decision(Decision.YES, "all shards are active");
}
}

View File

@ -65,11 +65,12 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
@Override
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
if (clusterConcurrentRebalance == -1) {
return Decision.YES;
return allocation.decision(Decision.YES, "all concurrent rebalances are allowed");
}
if (allocation.routingNodes().getRelocatingShardCount() >= clusterConcurrentRebalance) {
return Decision.NO;
return allocation.decision(Decision.NO, "too man concurrent rebalances [%d], limit: [%d]",
allocation.routingNodes().getRelocatingShardCount(), clusterConcurrentRebalance);
}
return Decision.YES;
return allocation.decision(Decision.YES, "below threshold [%d] for concurrent rebalances", clusterConcurrentRebalance);
}
}

View File

@ -104,20 +104,28 @@ public class DisableAllocationDecider extends AllocationDecider {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (allocation.ignoreDisable()) {
return Decision.YES;
return allocation.decision(Decision.YES, "allocation disabling is ignored");
}
Settings indexSettings = allocation.routingNodes().metaData().index(shardRouting.index()).settings();
if (shardRouting.primary() && !allocation.routingNodes().routingTable().index(shardRouting.index()).shard(shardRouting.id()).primaryAllocatedPostApi()) {
// if its primary, and it hasn't been allocated post API (meaning its a "fresh newly created shard"), only disable allocation
// on a special disable allocation flag
return indexSettings.getAsBoolean(INDEX_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION, disableNewAllocation) ? Decision.NO : Decision.YES;
if (indexSettings.getAsBoolean(INDEX_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION, disableNewAllocation)) {
return allocation.decision(Decision.NO, "new primary allocation is disabled");
} else {
return allocation.decision(Decision.YES, "new primary allocation is enabled");
}
}
if (indexSettings.getAsBoolean(INDEX_ROUTING_ALLOCATION_DISABLE_ALLOCATION, disableAllocation)) {
return Decision.NO;
return allocation.decision(Decision.NO, "all allocation is disabled");
}
if (indexSettings.getAsBoolean(INDEX_ROUTING_ALLOCATION_DISABLE_REPLICA_ALLOCATION, disableReplicaAllocation)) {
return shardRouting.primary() ? Decision.YES : Decision.NO;
if (shardRouting.primary()) {
return allocation.decision(Decision.YES, "primary allocation is enabled");
} else {
return allocation.decision(Decision.NO, "replica allocation is disabled");
}
}
return Decision.YES;
return allocation.decision(Decision.YES, "all allocation is enabled");
}
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.node.settings.NodeSettingsService;
@ -101,10 +100,6 @@ public class DiskThresholdDecider extends AllocationDecider {
}
}
public DiskThresholdDecider() {
this(ImmutableSettings.Builder.EMPTY_SETTINGS);
}
public DiskThresholdDecider(Settings settings) {
this(settings, new NodeSettingsService(settings));
}
@ -134,11 +129,11 @@ public class DiskThresholdDecider extends AllocationDecider {
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (!enabled) {
return Decision.YES;
return allocation.decision(Decision.YES, "disk threshold decider disabled");
}
// Allow allocation regardless if only a single node is available
if (allocation.nodes().size() <= 1) {
return Decision.YES;
return allocation.decision(Decision.YES, "only a single node is present");
}
ClusterInfo clusterInfo = allocation.clusterInfo();
@ -146,7 +141,7 @@ public class DiskThresholdDecider extends AllocationDecider {
if (logger.isTraceEnabled()) {
logger.trace("Cluster info unavailable for disk threshold decider, allowing allocation.");
}
return Decision.YES;
return allocation.decision(Decision.YES, "cluster info unavailable");
}
Map<String, DiskUsage> usages = clusterInfo.getNodeDiskUsages();
@ -155,7 +150,7 @@ public class DiskThresholdDecider extends AllocationDecider {
if (logger.isTraceEnabled()) {
logger.trace("Unable to determine disk usages for disk-aware allocation, allowing allocation");
}
return Decision.YES;
return allocation.decision(Decision.YES, "disk usages unavailable");
}
DiskUsage usage = usages.get(node.nodeId());
@ -180,14 +175,16 @@ public class DiskThresholdDecider extends AllocationDecider {
logger.debug("Less than the required {} free bytes threshold ({} bytes free) on node {}, preventing allocation",
freeBytesThresholdLow, freeBytes, node.nodeId());
}
return Decision.NO;
return allocation.decision(Decision.NO, "less than required [%s] free on node, free: [%s]",
freeBytesThresholdLow, new ByteSizeValue(freeBytes));
}
if (freeDiskPercentage < freeDiskThresholdLow) {
if (logger.isDebugEnabled()) {
logger.debug("Less than the required {}% free disk threshold ({}% free) on node [{}], preventing allocation",
freeDiskThresholdLow, freeDiskPercentage, node.nodeId());
}
return Decision.NO;
return allocation.decision(Decision.NO, "less than required [%d%%] free disk on node, free: [%d%%]",
freeDiskThresholdLow, freeDiskThresholdLow);
}
// Secondly, check that allocating the shard to this node doesn't put it above the high watermark
@ -198,24 +195,26 @@ public class DiskThresholdDecider extends AllocationDecider {
if (freeBytesAfterShard < freeBytesThresholdHigh.bytes()) {
logger.warn("After allocating, node [{}] would have less than the required {} free bytes threshold ({} bytes free), preventing allocation",
node.nodeId(), freeBytesThresholdHigh, freeBytesAfterShard);
return Decision.NO;
return allocation.decision(Decision.NO, "after allocation less than required [%s] free on node, free: [%s]",
freeBytesThresholdLow, new ByteSizeValue(freeBytesAfterShard));
}
if (freeSpaceAfterShard < freeDiskThresholdHigh) {
logger.warn("After allocating, node [{}] would have less than the required {}% free disk threshold ({}% free), preventing allocation",
node.nodeId(), freeDiskThresholdHigh, freeSpaceAfterShard);
return Decision.NO;
return allocation.decision(Decision.NO, "after allocation less than required [%d%%] free disk on node, free: [%d%%]",
freeDiskThresholdLow, freeSpaceAfterShard);
}
return Decision.YES;
return allocation.decision(Decision.YES, "enough disk for shard on node, free: [%s]", new ByteSizeValue(freeBytes));
}
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (!enabled) {
return Decision.YES;
return allocation.decision(Decision.YES, "disk threshold decider disabled");
}
// Allow allocation regardless if only a single node is available
if (allocation.nodes().size() <= 1) {
return Decision.YES;
return allocation.decision(Decision.YES, "only a single node is present");
}
ClusterInfo clusterInfo = allocation.clusterInfo();
@ -223,7 +222,7 @@ public class DiskThresholdDecider extends AllocationDecider {
if (logger.isTraceEnabled()) {
logger.trace("Cluster info unavailable for disk threshold decider, allowing allocation.");
}
return Decision.YES;
return allocation.decision(Decision.YES, "cluster info unavailable");
}
Map<String, DiskUsage> usages = clusterInfo.getNodeDiskUsages();
@ -231,7 +230,7 @@ public class DiskThresholdDecider extends AllocationDecider {
if (logger.isTraceEnabled()) {
logger.trace("Unable to determine disk usages for disk-aware allocation, allowing allocation");
}
return Decision.YES;
return allocation.decision(Decision.YES, "disk usages unavailable");
}
DiskUsage usage = usages.get(node.nodeId());
@ -256,17 +255,19 @@ public class DiskThresholdDecider extends AllocationDecider {
logger.debug("Less than the required {} free bytes threshold ({} bytes free) on node {}, shard cannot remain",
freeBytesThresholdHigh, freeBytes, node.nodeId());
}
return Decision.NO;
return allocation.decision(Decision.NO, "after allocation less than required [%s] free on node, free: [%s]",
freeBytesThresholdHigh, new ByteSizeValue(freeBytes));
}
if (freeDiskPercentage < freeDiskThresholdHigh) {
if (logger.isDebugEnabled()) {
logger.debug("Less than the required {}% free disk threshold ({}% free) on node {}, shard cannot remain",
freeDiskThresholdHigh, freeDiskPercentage, node.nodeId());
}
return Decision.NO;
return allocation.decision(Decision.NO, "after allocation less than required [%d%%] free disk on node, free: [%d%%]",
freeDiskThresholdHigh, freeDiskPercentage);
}
return Decision.YES;
return allocation.decision(Decision.YES, "enough disk for shard to remain on node, free: [%s]", new ByteSizeValue(freeBytes));
}
/**

View File

@ -60,7 +60,7 @@ public class EnableAllocationDecider extends AllocationDecider implements NodeSe
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (allocation.ignoreDisable()) {
return Decision.YES;
return allocation.decision(Decision.YES, "allocation disabling is ignored");
}
Settings indexSettings = allocation.routingNodes().metaData().index(shardRouting.index()).settings();
@ -73,17 +73,21 @@ public class EnableAllocationDecider extends AllocationDecider implements NodeSe
}
switch (enable) {
case ALL:
return Decision.YES;
return allocation.decision(Decision.YES, "all allocations are allowed");
case NONE:
return Decision.NO;
return allocation.decision(Decision.NO, "no allocations are allowed");
case NEW_PRIMARIES:
if (shardRouting.primary() && !allocation.routingNodes().routingTable().index(shardRouting.index()).shard(shardRouting.id()).primaryAllocatedPostApi()) {
return Decision.YES;
return allocation.decision(Decision.YES, "new primary allocations are allowed");
} else {
return Decision.NO;
return allocation.decision(Decision.NO, "non-new primary allocations are disallowed");
}
case PRIMARIES:
return shardRouting.primary() ? Decision.YES : Decision.NO;
if (shardRouting.primary()) {
return allocation.decision(Decision.YES, "primary allocations are allowed");
} else {
return allocation.decision(Decision.NO, "replica allocations are disallowed");
}
default:
throw new ElasticsearchIllegalStateException("Unknown allocation option");
}

View File

@ -98,49 +98,49 @@ public class FilterAllocationDecider extends AllocationDecider {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(shardRouting, node, allocation) ? Decision.NO : Decision.YES;
return shouldFilter(shardRouting, node, allocation);
}
@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(shardRouting, node, allocation) ? Decision.NO : Decision.YES;
return shouldFilter(shardRouting, node, allocation);
}
private boolean shouldFilter(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
private Decision shouldFilter(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (clusterRequireFilters != null) {
if (!clusterRequireFilters.match(node.node())) {
return true;
return allocation.decision(Decision.NO, "node does not match global required filters [%s]", clusterRequireFilters);
}
}
if (clusterIncludeFilters != null) {
if (!clusterIncludeFilters.match(node.node())) {
return true;
return allocation.decision(Decision.NO, "node does not match global include filters [%s]", clusterIncludeFilters);
}
}
if (clusterExcludeFilters != null) {
if (clusterExcludeFilters.match(node.node())) {
return true;
return allocation.decision(Decision.NO, "node matches global exclude filters [%s]", clusterExcludeFilters);
}
}
IndexMetaData indexMd = allocation.routingNodes().metaData().index(shardRouting.index());
if (indexMd.requireFilters() != null) {
if (!indexMd.requireFilters().match(node.node())) {
return true;
return allocation.decision(Decision.NO, "node does not match index required filters [%s]", indexMd.requireFilters());
}
}
if (indexMd.includeFilters() != null) {
if (!indexMd.includeFilters().match(node.node())) {
return true;
return allocation.decision(Decision.NO, "node does not match index include filters [%s]", indexMd.includeFilters());
}
}
if (indexMd.excludeFilters() != null) {
if (indexMd.excludeFilters().match(node.node())) {
return true;
return allocation.decision(Decision.NO, "node matches index exclude filters [%s]", indexMd.excludeFilters());
}
}
return false;
return allocation.decision(Decision.YES, "node passes include/exclude/require filters");
}
class ApplySettings implements NodeSettingsService.Listener {

View File

@ -49,27 +49,29 @@ public class NodeVersionAllocationDecider extends AllocationDecider {
if (sourceNodeId == null) { // we allocate - check primary
if (shardRouting.primary()) {
// we are the primary we can allocate wherever
return Decision.YES;
return allocation.decision(Decision.YES, "primary shard can be allocated anywhere");
}
final MutableShardRouting primary = allocation.routingNodes().activePrimary(shardRouting);
if (primary == null) { // we have a primary - it's a start ;)
return Decision.YES;
return allocation.decision(Decision.YES, "no active primary shard yet");
}
sourceNodeId = primary.currentNodeId();
}
return isVersionCompatible(allocation.routingNodes(), sourceNodeId, node);
return isVersionCompatible(allocation.routingNodes(), sourceNodeId, node, allocation);
}
private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target) {
private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target, RoutingAllocation allocation) {
final RoutingNode source = routingNodes.node(sourceNodeId);
if (target.node().version().onOrAfter(source.node().version())) {
/* we can allocate if we can recover from a node that is younger or on the same version
* if the primary is already running on a newer version that won't work due to possible
* differences in the lucene index format etc.*/
return Decision.YES;
return allocation.decision(Decision.YES, "target node version [%s] is same or newer than source node version [%s]",
target.node().version(), source.node().version());
} else {
return Decision.NO;
return allocation.decision(Decision.NO, "target node version [%s] is older than source node version [%s]",
target.node().version(), source.node().version());
}
}
}

View File

@ -39,8 +39,8 @@ public class RebalanceOnlyWhenActiveAllocationDecider extends AllocationDecider
// its ok to check for active here, since in relocation, a shard is split into two in routing
// nodes, once relocating, and one initializing
if (!allocation.routingNodes().allReplicasActive(shardRouting)) {
return Decision.NO;
return allocation.decision(Decision.NO, "not all replicas are active in cluster");
}
return Decision.YES;
return allocation.decision(Decision.YES, "all replicas are active in cluster");
}
}

View File

@ -43,12 +43,12 @@ public class ReplicaAfterPrimaryActiveAllocationDecider extends AllocationDecide
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
if (shardRouting.primary()) {
return Decision.YES;
return allocation.decision(Decision.YES, "shard is primary");
}
MutableShardRouting primary = allocation.routingNodes().activePrimary(shardRouting);
if (primary == null) {
return Decision.NO;
return allocation.decision(Decision.NO, "primary shard is not yet active");
}
return Decision.YES;
return allocation.decision(Decision.YES, "primary is already active");
}
}

View File

@ -60,7 +60,7 @@ public class SameShardAllocationDecider extends AllocationDecider {
Iterable<MutableShardRouting> assignedShards = allocation.routingNodes().assignedShards(shardRouting);
for (MutableShardRouting assignedShard : assignedShards) {
if (node.nodeId().equals(assignedShard.currentNodeId())) {
return Decision.NO;
return allocation.decision(Decision.NO, "shard cannot be allocated on same node [%s] it already exists on", node.nodeId());
}
}
if (sameHost) {
@ -83,13 +83,14 @@ public class SameShardAllocationDecider extends AllocationDecider {
if (checkNodeOnSameHost) {
for (MutableShardRouting assignedShard : assignedShards) {
if (checkNode.nodeId().equals(assignedShard.currentNodeId())) {
return Decision.NO;
return allocation.decision(Decision.NO, "shard cannot be allocated on same host [%s] it already exists on",
node.nodeId());
}
}
}
}
}
}
return Decision.YES;
return allocation.decision(Decision.YES, "shard is not allocated to same node or host");
}
}

View File

@ -65,7 +65,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
IndexMetaData indexMd = allocation.routingNodes().metaData().index(shardRouting.index());
int totalShardsPerNode = indexMd.settings().getAsInt(INDEX_TOTAL_SHARDS_PER_NODE, -1);
if (totalShardsPerNode <= 0) {
return Decision.YES;
return allocation.decision(Decision.YES, "total shard limit disabled: [%d] <= 0", totalShardsPerNode);
}
int nodeCount = 0;
@ -80,9 +80,10 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
nodeCount++;
}
if (nodeCount >= totalShardsPerNode) {
return Decision.NO;
return allocation.decision(Decision.NO, "too many shards for this index on node [%d], limit: [%d]",
nodeCount, totalShardsPerNode);
}
return Decision.YES;
return allocation.decision(Decision.YES, "shard count under limit [%d] of total shards per node", totalShardsPerNode);
}
@Override
@ -90,7 +91,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
IndexMetaData indexMd = allocation.routingNodes().metaData().index(shardRouting.index());
int totalShardsPerNode = indexMd.settings().getAsInt(INDEX_TOTAL_SHARDS_PER_NODE, -1);
if (totalShardsPerNode <= 0) {
return Decision.YES;
return allocation.decision(Decision.YES, "total shard limit disabled: [%d] <= 0", totalShardsPerNode);
}
int nodeCount = 0;
@ -105,8 +106,9 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
nodeCount++;
}
if (nodeCount > totalShardsPerNode) {
return Decision.NO;
return allocation.decision(Decision.NO, "too many shards for this index on node [%d], limit: [%d]",
nodeCount, totalShardsPerNode);
}
return Decision.YES;
return allocation.decision(Decision.YES, "shard count under limit [%d] of total shards per node", totalShardsPerNode);
}
}

View File

@ -99,18 +99,19 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider {
SnapshotMetaData snapshotMetaData = allocation.metaData().custom(SnapshotMetaData.TYPE);
if (snapshotMetaData == null) {
// Snapshots are not running
return Decision.YES;
return allocation.decision(Decision.YES, "no snapshots are currently running");
}
for (SnapshotMetaData.Entry snapshot : snapshotMetaData.entries()) {
SnapshotMetaData.ShardSnapshotStatus shardSnapshotStatus = snapshot.shards().get(shardRouting.shardId());
if (shardSnapshotStatus != null && !shardSnapshotStatus.state().completed() && shardSnapshotStatus.nodeId() != null && shardSnapshotStatus.nodeId().equals(shardRouting.currentNodeId())) {
logger.trace("Preventing snapshotted shard [{}] to be moved from node [{}]", shardRouting.shardId(), shardSnapshotStatus.nodeId());
return Decision.NO;
return allocation.decision(Decision.NO, "snapshot for shard [%s] is currently running on node [%s]",
shardRouting.shardId(), shardSnapshotStatus.nodeId());
}
}
}
return Decision.YES;
return allocation.decision(Decision.YES, "shard not primary or relocation disabled");
}
}

View File

@ -85,9 +85,10 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
}
}
if (primariesInRecovery >= primariesInitialRecoveries) {
return Decision.THROTTLE;
return allocation.decision(Decision.THROTTLE, "too many primaries currently recovering [%d], limit: [%d]",
primariesInRecovery, primariesInitialRecoveries);
} else {
return Decision.YES;
return allocation.decision(Decision.YES, "below primary recovery limit of [%d]", primariesInitialRecoveries);
}
}
}
@ -106,9 +107,10 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
}
}
if (currentRecoveries >= concurrentRecoveries) {
return Decision.THROTTLE;
return allocation.decision(Decision.THROTTLE, "too many shards currently recovering [%d], limit: [%d]",
currentRecoveries, concurrentRecoveries);
} else {
return Decision.YES;
return allocation.decision(Decision.YES, "below shard recovery limit of [%d]", concurrentRecoveries);
}
}