Improves allocation decider decision explanation messages (#21771)
This commit improves the decision explanation messages, particularly for NO decisions, in the various AllocationDecider implementations by including the setting(s) in the explanation message that led to the decision. This commit also returns a THROTTLE decision instead of a NO decision when the concurrent rebalances limit has been reached in ConcurrentRebalanceAllocationDecider, because it more accurately reflects a temporary throttling that will turn into a YES decision once the number of concurrent rebalances lessens, as opposed to a more permanent NO decision (e.g. due to filtering).
This commit is contained in:
parent
a6082eb563
commit
07bd0a30f0
|
@ -224,12 +224,15 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
|
||||||
Setting.boolSetting(SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false, Property.Dynamic, Property.IndexScope);
|
Setting.boolSetting(SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false, Property.Dynamic, Property.IndexScope);
|
||||||
public static final String INDEX_UUID_NA_VALUE = "_na_";
|
public static final String INDEX_UUID_NA_VALUE = "_na_";
|
||||||
|
|
||||||
|
public static final String INDEX_ROUTING_REQUIRE_GROUP_PREFIX = "index.routing.allocation.require";
|
||||||
|
public static final String INDEX_ROUTING_INCLUDE_GROUP_PREFIX = "index.routing.allocation.include";
|
||||||
|
public static final String INDEX_ROUTING_EXCLUDE_GROUP_PREFIX = "index.routing.allocation.exclude";
|
||||||
public static final Setting<Settings> INDEX_ROUTING_REQUIRE_GROUP_SETTING =
|
public static final Setting<Settings> INDEX_ROUTING_REQUIRE_GROUP_SETTING =
|
||||||
Setting.groupSetting("index.routing.allocation.require.", Property.Dynamic, Property.IndexScope);
|
Setting.groupSetting(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + ".", Property.Dynamic, Property.IndexScope);
|
||||||
public static final Setting<Settings> INDEX_ROUTING_INCLUDE_GROUP_SETTING =
|
public static final Setting<Settings> INDEX_ROUTING_INCLUDE_GROUP_SETTING =
|
||||||
Setting.groupSetting("index.routing.allocation.include.", Property.Dynamic, Property.IndexScope);
|
Setting.groupSetting(INDEX_ROUTING_INCLUDE_GROUP_PREFIX + ".", Property.Dynamic, Property.IndexScope);
|
||||||
public static final Setting<Settings> INDEX_ROUTING_EXCLUDE_GROUP_SETTING =
|
public static final Setting<Settings> INDEX_ROUTING_EXCLUDE_GROUP_SETTING =
|
||||||
Setting.groupSetting("index.routing.allocation.exclude.", Property.Dynamic, Property.IndexScope);
|
Setting.groupSetting(INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + ".", Property.Dynamic, Property.IndexScope);
|
||||||
public static final Setting<Settings> INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING =
|
public static final Setting<Settings> INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING =
|
||||||
Setting.groupSetting("index.routing.allocation.initial_recovery."); // this is only setable internally not a registered setting!!
|
Setting.groupSetting("index.routing.allocation.initial_recovery."); // this is only setable internally not a registered setting!!
|
||||||
|
|
||||||
|
|
|
@ -49,6 +49,8 @@ public class DiskThresholdSettings {
|
||||||
Setting.positiveTimeSetting("cluster.routing.allocation.disk.reroute_interval", TimeValue.timeValueSeconds(60),
|
Setting.positiveTimeSetting("cluster.routing.allocation.disk.reroute_interval", TimeValue.timeValueSeconds(60),
|
||||||
Setting.Property.Dynamic, Setting.Property.NodeScope);
|
Setting.Property.Dynamic, Setting.Property.NodeScope);
|
||||||
|
|
||||||
|
private volatile String lowWatermarkRaw;
|
||||||
|
private volatile String highWatermarkRaw;
|
||||||
private volatile Double freeDiskThresholdLow;
|
private volatile Double freeDiskThresholdLow;
|
||||||
private volatile Double freeDiskThresholdHigh;
|
private volatile Double freeDiskThresholdHigh;
|
||||||
private volatile ByteSizeValue freeBytesThresholdLow;
|
private volatile ByteSizeValue freeBytesThresholdLow;
|
||||||
|
@ -86,6 +88,7 @@ public class DiskThresholdSettings {
|
||||||
|
|
||||||
private void setLowWatermark(String lowWatermark) {
|
private void setLowWatermark(String lowWatermark) {
|
||||||
// Watermark is expressed in terms of used data, but we need "free" data watermark
|
// Watermark is expressed in terms of used data, but we need "free" data watermark
|
||||||
|
this.lowWatermarkRaw = lowWatermark;
|
||||||
this.freeDiskThresholdLow = 100.0 - thresholdPercentageFromWatermark(lowWatermark);
|
this.freeDiskThresholdLow = 100.0 - thresholdPercentageFromWatermark(lowWatermark);
|
||||||
this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark,
|
this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark,
|
||||||
CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
|
CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
|
||||||
|
@ -93,11 +96,26 @@ public class DiskThresholdSettings {
|
||||||
|
|
||||||
private void setHighWatermark(String highWatermark) {
|
private void setHighWatermark(String highWatermark) {
|
||||||
// Watermark is expressed in terms of used data, but we need "free" data watermark
|
// Watermark is expressed in terms of used data, but we need "free" data watermark
|
||||||
|
this.highWatermarkRaw = highWatermark;
|
||||||
this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(highWatermark);
|
this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(highWatermark);
|
||||||
this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark,
|
this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark,
|
||||||
CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
|
CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the raw (uninterpreted) low watermark value as found in the settings.
|
||||||
|
*/
|
||||||
|
public String getLowWatermarkRaw() {
|
||||||
|
return lowWatermarkRaw;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the raw (uninterpreted) high watermark value as found in the settings.
|
||||||
|
*/
|
||||||
|
public String getHighWatermarkRaw() {
|
||||||
|
return highWatermarkRaw;
|
||||||
|
}
|
||||||
|
|
||||||
public Double getFreeDiskThresholdLow() {
|
public Double getFreeDiskThresholdLow() {
|
||||||
return freeDiskThresholdLow;
|
return freeDiskThresholdLow;
|
||||||
}
|
}
|
||||||
|
|
|
@ -87,22 +87,6 @@ public class AwarenessAllocationDecider extends AllocationDecider {
|
||||||
|
|
||||||
private volatile Map<String, String[]> forcedAwarenessAttributes;
|
private volatile Map<String, String[]> forcedAwarenessAttributes;
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new {@link AwarenessAllocationDecider} instance
|
|
||||||
*/
|
|
||||||
public AwarenessAllocationDecider() {
|
|
||||||
this(Settings.Builder.EMPTY_SETTINGS);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new {@link AwarenessAllocationDecider} instance from given settings
|
|
||||||
*
|
|
||||||
* @param settings {@link Settings} to use
|
|
||||||
*/
|
|
||||||
public AwarenessAllocationDecider(Settings settings) {
|
|
||||||
this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
|
||||||
}
|
|
||||||
|
|
||||||
public AwarenessAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
|
public AwarenessAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);
|
this.awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);
|
||||||
|
@ -140,7 +124,9 @@ public class AwarenessAllocationDecider extends AllocationDecider {
|
||||||
|
|
||||||
private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) {
|
private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) {
|
||||||
if (awarenessAttributes.length == 0) {
|
if (awarenessAttributes.length == 0) {
|
||||||
return allocation.decision(Decision.YES, NAME, "allocation awareness is not enabled");
|
return allocation.decision(Decision.YES, NAME,
|
||||||
|
"allocation awareness is not enabled, set [%s] to enable it",
|
||||||
|
CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey());
|
||||||
}
|
}
|
||||||
|
|
||||||
IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
|
IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
|
||||||
|
@ -148,7 +134,10 @@ public class AwarenessAllocationDecider extends AllocationDecider {
|
||||||
for (String awarenessAttribute : awarenessAttributes) {
|
for (String awarenessAttribute : awarenessAttributes) {
|
||||||
// the node the shard exists on must be associated with an awareness attribute
|
// the node the shard exists on must be associated with an awareness attribute
|
||||||
if (!node.node().getAttributes().containsKey(awarenessAttribute)) {
|
if (!node.node().getAttributes().containsKey(awarenessAttribute)) {
|
||||||
return allocation.decision(Decision.NO, NAME, "node does not contain the awareness attribute: [%s]", awarenessAttribute);
|
return allocation.decision(Decision.NO, NAME,
|
||||||
|
"node does not contain the awareness attribute [%s]; required attributes [%s=%s]",
|
||||||
|
awarenessAttribute, CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(),
|
||||||
|
allocation.debugDecision() ? Strings.arrayToCommaDelimitedString(awarenessAttributes) : null);
|
||||||
}
|
}
|
||||||
|
|
||||||
// build attr_value -> nodes map
|
// build attr_value -> nodes map
|
||||||
|
@ -206,15 +195,14 @@ public class AwarenessAllocationDecider extends AllocationDecider {
|
||||||
// if we are above with leftover, then we know we are not good, even with mod
|
// if we are above with leftover, then we know we are not good, even with mod
|
||||||
if (currentNodeCount > (requiredCountPerAttribute + leftoverPerAttribute)) {
|
if (currentNodeCount > (requiredCountPerAttribute + leftoverPerAttribute)) {
|
||||||
return allocation.decision(Decision.NO, NAME,
|
return allocation.decision(Decision.NO, NAME,
|
||||||
"there are too many shards on the node for attribute [%s], there are [%d] total shards for the index " +
|
"there are too many copies of the shard allocated to nodes with attribute [%s], there are [%d] total configured " +
|
||||||
" and [%d] total attributes values, expected the node count [%d] to be lower or equal to the required " +
|
"shard copies for this shard id and [%d] total attribute values, expected the allocated shard count per " +
|
||||||
"number of shards per attribute [%d] plus leftover [%d]",
|
"attribute [%d] to be less than or equal to the upper bound of the required number of shards per attribute [%d]",
|
||||||
awarenessAttribute,
|
awarenessAttribute,
|
||||||
shardCount,
|
shardCount,
|
||||||
numberOfAttributes,
|
numberOfAttributes,
|
||||||
currentNodeCount,
|
currentNodeCount,
|
||||||
requiredCountPerAttribute,
|
requiredCountPerAttribute + leftoverPerAttribute);
|
||||||
leftoverPerAttribute);
|
|
||||||
}
|
}
|
||||||
// all is well, we are below or same as average
|
// all is well, we are below or same as average
|
||||||
if (currentNodeCount <= requiredCountPerAttribute) {
|
if (currentNodeCount <= requiredCountPerAttribute) {
|
||||||
|
|
|
@ -48,14 +48,15 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
public class ClusterRebalanceAllocationDecider extends AllocationDecider {
|
public class ClusterRebalanceAllocationDecider extends AllocationDecider {
|
||||||
|
|
||||||
public static final String NAME = "cluster_rebalance";
|
public static final String NAME = "cluster_rebalance";
|
||||||
|
private static final String CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE = "cluster.routing.allocation.allow_rebalance";
|
||||||
public static final Setting<ClusterRebalanceType> CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING =
|
public static final Setting<ClusterRebalanceType> CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING =
|
||||||
new Setting<>("cluster.routing.allocation.allow_rebalance", ClusterRebalanceType.INDICES_ALL_ACTIVE.name().toLowerCase(Locale.ROOT),
|
new Setting<>(CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, ClusterRebalanceType.INDICES_ALL_ACTIVE.toString(),
|
||||||
ClusterRebalanceType::parseString, Property.Dynamic, Property.NodeScope);
|
ClusterRebalanceType::parseString, Property.Dynamic, Property.NodeScope);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An enum representation for the configured re-balance type.
|
* An enum representation for the configured re-balance type.
|
||||||
*/
|
*/
|
||||||
public static enum ClusterRebalanceType {
|
public enum ClusterRebalanceType {
|
||||||
/**
|
/**
|
||||||
* Re-balancing is allowed once a shard replication group is active
|
* Re-balancing is allowed once a shard replication group is active
|
||||||
*/
|
*/
|
||||||
|
@ -80,6 +81,11 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
|
||||||
throw new IllegalArgumentException("Illegal value for " +
|
throw new IllegalArgumentException("Illegal value for " +
|
||||||
CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING + ": " + typeString);
|
CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING + ": " + typeString);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return name().toLowerCase(Locale.ROOT);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private volatile ClusterRebalanceType type;
|
private volatile ClusterRebalanceType type;
|
||||||
|
@ -94,8 +100,7 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
|
||||||
CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getRaw(settings));
|
CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getRaw(settings));
|
||||||
type = ClusterRebalanceType.INDICES_ALL_ACTIVE;
|
type = ClusterRebalanceType.INDICES_ALL_ACTIVE;
|
||||||
}
|
}
|
||||||
logger.debug("using [{}] with [{}]", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(),
|
logger.debug("using [{}] with [{}]", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, type);
|
||||||
type.toString().toLowerCase(Locale.ROOT));
|
|
||||||
|
|
||||||
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, this::setType);
|
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, this::setType);
|
||||||
}
|
}
|
||||||
|
@ -115,12 +120,14 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
|
||||||
// check if there are unassigned primaries.
|
// check if there are unassigned primaries.
|
||||||
if ( allocation.routingNodes().hasUnassignedPrimaries() ) {
|
if ( allocation.routingNodes().hasUnassignedPrimaries() ) {
|
||||||
return allocation.decision(Decision.NO, NAME,
|
return allocation.decision(Decision.NO, NAME,
|
||||||
"the cluster has unassigned primary shards and rebalance type is set to [%s]", type);
|
"the cluster has unassigned primary shards and [%s] is set to [%s]",
|
||||||
|
CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, type);
|
||||||
}
|
}
|
||||||
// check if there are initializing primaries that don't have a relocatingNodeId entry.
|
// check if there are initializing primaries that don't have a relocatingNodeId entry.
|
||||||
if ( allocation.routingNodes().hasInactivePrimaries() ) {
|
if ( allocation.routingNodes().hasInactivePrimaries() ) {
|
||||||
return allocation.decision(Decision.NO, NAME,
|
return allocation.decision(Decision.NO, NAME,
|
||||||
"the cluster has inactive primary shards and rebalance type is set to [%s]", type);
|
"the cluster has inactive primary shards and [%s] is set to [%s]",
|
||||||
|
CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, type);
|
||||||
}
|
}
|
||||||
|
|
||||||
return allocation.decision(Decision.YES, NAME, "all primary shards are active");
|
return allocation.decision(Decision.YES, NAME, "all primary shards are active");
|
||||||
|
@ -129,16 +136,18 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
|
||||||
// check if there are unassigned shards.
|
// check if there are unassigned shards.
|
||||||
if (allocation.routingNodes().hasUnassignedShards() ) {
|
if (allocation.routingNodes().hasUnassignedShards() ) {
|
||||||
return allocation.decision(Decision.NO, NAME,
|
return allocation.decision(Decision.NO, NAME,
|
||||||
"the cluster has unassigned shards and rebalance type is set to [%s]", type);
|
"the cluster has unassigned shards and [%s] is set to [%s]",
|
||||||
|
CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, type);
|
||||||
}
|
}
|
||||||
// in case all indices are assigned, are there initializing shards which
|
// in case all indices are assigned, are there initializing shards which
|
||||||
// are not relocating?
|
// are not relocating?
|
||||||
if ( allocation.routingNodes().hasInactiveShards() ) {
|
if ( allocation.routingNodes().hasInactiveShards() ) {
|
||||||
return allocation.decision(Decision.NO, NAME,
|
return allocation.decision(Decision.NO, NAME,
|
||||||
"the cluster has inactive shards and rebalance type is set to [%s]", type);
|
"the cluster has inactive shards and [%s] is set to [%s]",
|
||||||
|
CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, type);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// type == Type.ALWAYS
|
// type == Type.ALWAYS
|
||||||
return allocation.decision(Decision.YES, NAME, "all shards are active, rebalance type is [%s]", type);
|
return allocation.decision(Decision.YES, NAME, "all shards are active");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,9 +66,11 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
|
||||||
}
|
}
|
||||||
int relocatingShards = allocation.routingNodes().getRelocatingShardCount();
|
int relocatingShards = allocation.routingNodes().getRelocatingShardCount();
|
||||||
if (relocatingShards >= clusterConcurrentRebalance) {
|
if (relocatingShards >= clusterConcurrentRebalance) {
|
||||||
return allocation.decision(Decision.NO, NAME,
|
return allocation.decision(Decision.THROTTLE, NAME,
|
||||||
"too many shards are concurrently rebalancing [%d], limit: [%d]",
|
"reached the limit of concurrently rebalancing shards [%d], [%s=%d]",
|
||||||
relocatingShards, clusterConcurrentRebalance);
|
relocatingShards,
|
||||||
|
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(),
|
||||||
|
clusterConcurrentRebalance);
|
||||||
}
|
}
|
||||||
return allocation.decision(Decision.YES, NAME,
|
return allocation.decision(Decision.YES, NAME,
|
||||||
"below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]",
|
"below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]",
|
||||||
|
|
|
@ -40,6 +40,9 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
|
||||||
|
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
|
||||||
|
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The {@link DiskThresholdDecider} checks that the node a shard is potentially
|
* The {@link DiskThresholdDecider} checks that the node a shard is potentially
|
||||||
* being allocated to has enough disk space.
|
* being allocated to has enough disk space.
|
||||||
|
@ -135,8 +138,10 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
diskThresholdSettings.getFreeBytesThresholdLow(), freeBytes, node.nodeId());
|
diskThresholdSettings.getFreeBytesThresholdLow(), freeBytes, node.nodeId());
|
||||||
}
|
}
|
||||||
return allocation.decision(Decision.NO, NAME,
|
return allocation.decision(Decision.NO, NAME,
|
||||||
"the node is above the low watermark and has less than required [%s] free, free: [%s]",
|
"the node is above the low watermark [%s=%s], having less than the minimum required [%s] free space, actual free: [%s]",
|
||||||
diskThresholdSettings.getFreeBytesThresholdLow(), new ByteSizeValue(freeBytes));
|
CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(),
|
||||||
|
diskThresholdSettings.getLowWatermarkRaw(),
|
||||||
|
diskThresholdSettings.getFreeBytesThresholdLow(), new ByteSizeValue(freeBytes));
|
||||||
} else if (freeBytes > diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) {
|
} else if (freeBytes > diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) {
|
||||||
// Allow the shard to be allocated because it is primary that
|
// Allow the shard to be allocated because it is primary that
|
||||||
// has never been allocated if it's under the high watermark
|
// has never been allocated if it's under the high watermark
|
||||||
|
@ -146,7 +151,8 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
diskThresholdSettings.getFreeBytesThresholdLow(), freeBytes, node.nodeId());
|
diskThresholdSettings.getFreeBytesThresholdLow(), freeBytes, node.nodeId());
|
||||||
}
|
}
|
||||||
return allocation.decision(Decision.YES, NAME,
|
return allocation.decision(Decision.YES, NAME,
|
||||||
"the node is above the low watermark, but this primary shard has never been allocated before");
|
"the node is above the low watermark, but less than the high watermark, and this primary shard has " +
|
||||||
|
"never been allocated before");
|
||||||
} else {
|
} else {
|
||||||
// Even though the primary has never been allocated, the node is
|
// Even though the primary has never been allocated, the node is
|
||||||
// above the high watermark, so don't allow allocating the shard
|
// above the high watermark, so don't allow allocating the shard
|
||||||
|
@ -156,9 +162,11 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
diskThresholdSettings.getFreeBytesThresholdHigh(), freeBytes, node.nodeId());
|
diskThresholdSettings.getFreeBytesThresholdHigh(), freeBytes, node.nodeId());
|
||||||
}
|
}
|
||||||
return allocation.decision(Decision.NO, NAME,
|
return allocation.decision(Decision.NO, NAME,
|
||||||
"the node is above the high watermark even though this shard has never been allocated " +
|
"the node is above the high watermark [%s=%s], having less than the minimum required [%s] free space, " +
|
||||||
"and has less than required [%s] free on node, free: [%s]",
|
"actual free: [%s]",
|
||||||
diskThresholdSettings.getFreeBytesThresholdHigh(), new ByteSizeValue(freeBytes));
|
CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(),
|
||||||
|
diskThresholdSettings.getHighWatermarkRaw(),
|
||||||
|
diskThresholdSettings.getFreeBytesThresholdHigh(), new ByteSizeValue(freeBytes));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,8 +180,10 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
Strings.format1Decimals(usedDiskPercentage, "%"), node.nodeId());
|
Strings.format1Decimals(usedDiskPercentage, "%"), node.nodeId());
|
||||||
}
|
}
|
||||||
return allocation.decision(Decision.NO, NAME,
|
return allocation.decision(Decision.NO, NAME,
|
||||||
"the node is above the low watermark and has more than allowed [%s%%] used disk, free: [%s%%]",
|
"the node is above the low watermark [%s=%s], using more disk space than the maximum allowed [%s%%], " +
|
||||||
usedDiskThresholdLow, freeDiskPercentage);
|
"actual free: [%s%%]",
|
||||||
|
CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(),
|
||||||
|
diskThresholdSettings.getLowWatermarkRaw(), usedDiskThresholdLow, freeDiskPercentage);
|
||||||
} else if (freeDiskPercentage > diskThresholdSettings.getFreeDiskThresholdHigh()) {
|
} else if (freeDiskPercentage > diskThresholdSettings.getFreeDiskThresholdHigh()) {
|
||||||
// Allow the shard to be allocated because it is primary that
|
// Allow the shard to be allocated because it is primary that
|
||||||
// has never been allocated if it's under the high watermark
|
// has never been allocated if it's under the high watermark
|
||||||
|
@ -184,7 +194,8 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
Strings.format1Decimals(usedDiskPercentage, "%"), node.nodeId());
|
Strings.format1Decimals(usedDiskPercentage, "%"), node.nodeId());
|
||||||
}
|
}
|
||||||
return allocation.decision(Decision.YES, NAME,
|
return allocation.decision(Decision.YES, NAME,
|
||||||
"the node is above the low watermark, but this primary shard has never been allocated before");
|
"the node is above the low watermark, but less than the high watermark, and this primary shard has " +
|
||||||
|
"never been allocated before");
|
||||||
} else {
|
} else {
|
||||||
// Even though the primary has never been allocated, the node is
|
// Even though the primary has never been allocated, the node is
|
||||||
// above the high watermark, so don't allow allocating the shard
|
// above the high watermark, so don't allow allocating the shard
|
||||||
|
@ -195,9 +206,10 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
Strings.format1Decimals(freeDiskPercentage, "%"), node.nodeId());
|
Strings.format1Decimals(freeDiskPercentage, "%"), node.nodeId());
|
||||||
}
|
}
|
||||||
return allocation.decision(Decision.NO, NAME,
|
return allocation.decision(Decision.NO, NAME,
|
||||||
"the node is above the high watermark even though this shard has never been allocated " +
|
"the node is above the high watermark [%s=%s], using more disk space than the maximum allowed [%s%%], " +
|
||||||
"and has more than allowed [%s%%] used disk, free: [%s%%]",
|
"actual free: [%s%%]",
|
||||||
usedDiskThresholdHigh, freeDiskPercentage);
|
CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(),
|
||||||
|
diskThresholdSettings.getHighWatermarkRaw(), usedDiskThresholdHigh, freeDiskPercentage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -210,9 +222,11 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
"{} free bytes threshold ({} bytes free), preventing allocation",
|
"{} free bytes threshold ({} bytes free), preventing allocation",
|
||||||
node.nodeId(), diskThresholdSettings.getFreeBytesThresholdHigh(), freeBytesAfterShard);
|
node.nodeId(), diskThresholdSettings.getFreeBytesThresholdHigh(), freeBytesAfterShard);
|
||||||
return allocation.decision(Decision.NO, NAME,
|
return allocation.decision(Decision.NO, NAME,
|
||||||
"after allocating the shard to this node, it would be above the high watermark " +
|
"allocating the shard to this node will bring the node above the high watermark [%s=%s] " +
|
||||||
"and have less than required [%s] free, free: [%s]",
|
"and cause it to have less than the minimum required [%s] of free space (free bytes after shard added: [%s])",
|
||||||
diskThresholdSettings.getFreeBytesThresholdLow(), new ByteSizeValue(freeBytesAfterShard));
|
CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(),
|
||||||
|
diskThresholdSettings.getHighWatermarkRaw(),
|
||||||
|
diskThresholdSettings.getFreeBytesThresholdHigh(), new ByteSizeValue(freeBytesAfterShard));
|
||||||
}
|
}
|
||||||
if (freeSpaceAfterShard < diskThresholdSettings.getFreeDiskThresholdHigh()) {
|
if (freeSpaceAfterShard < diskThresholdSettings.getFreeDiskThresholdHigh()) {
|
||||||
logger.warn("after allocating, node [{}] would have more than the allowed " +
|
logger.warn("after allocating, node [{}] would have more than the allowed " +
|
||||||
|
@ -220,9 +234,10 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
node.nodeId(), Strings.format1Decimals(diskThresholdSettings.getFreeDiskThresholdHigh(), "%"),
|
node.nodeId(), Strings.format1Decimals(diskThresholdSettings.getFreeDiskThresholdHigh(), "%"),
|
||||||
Strings.format1Decimals(freeSpaceAfterShard, "%"));
|
Strings.format1Decimals(freeSpaceAfterShard, "%"));
|
||||||
return allocation.decision(Decision.NO, NAME,
|
return allocation.decision(Decision.NO, NAME,
|
||||||
"after allocating the shard to this node, it would be above the high watermark " +
|
"allocating the shard to this node will bring the node above the high watermark [%s=%s] " +
|
||||||
"and have more than allowed [%s%%] used disk, free: [%s%%]",
|
"and cause it to use more disk space than the maximum allowed [%s%%] (free space after shard added: [%s%%])",
|
||||||
usedDiskThresholdLow, freeSpaceAfterShard);
|
CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(),
|
||||||
|
diskThresholdSettings.getHighWatermarkRaw(), usedDiskThresholdHigh, freeSpaceAfterShard);
|
||||||
}
|
}
|
||||||
|
|
||||||
return allocation.decision(Decision.YES, NAME,
|
return allocation.decision(Decision.YES, NAME,
|
||||||
|
@ -264,9 +279,11 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
diskThresholdSettings.getFreeBytesThresholdHigh(), freeBytes, node.nodeId());
|
diskThresholdSettings.getFreeBytesThresholdHigh(), freeBytes, node.nodeId());
|
||||||
}
|
}
|
||||||
return allocation.decision(Decision.NO, NAME,
|
return allocation.decision(Decision.NO, NAME,
|
||||||
"after allocating this shard this node would be above the high watermark " +
|
"the shard cannot remain on this node because it is above the high watermark [%s=%s] " +
|
||||||
"and there would be less than required [%s] free on node, free: [%s]",
|
"and there is less than the required [%s] free space on node, actual free: [%s]",
|
||||||
diskThresholdSettings.getFreeBytesThresholdHigh(), new ByteSizeValue(freeBytes));
|
CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(),
|
||||||
|
diskThresholdSettings.getHighWatermarkRaw(),
|
||||||
|
diskThresholdSettings.getFreeBytesThresholdHigh(), new ByteSizeValue(freeBytes));
|
||||||
}
|
}
|
||||||
if (freeDiskPercentage < diskThresholdSettings.getFreeDiskThresholdHigh()) {
|
if (freeDiskPercentage < diskThresholdSettings.getFreeDiskThresholdHigh()) {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
|
@ -274,9 +291,11 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
diskThresholdSettings.getFreeDiskThresholdHigh(), freeDiskPercentage, node.nodeId());
|
diskThresholdSettings.getFreeDiskThresholdHigh(), freeDiskPercentage, node.nodeId());
|
||||||
}
|
}
|
||||||
return allocation.decision(Decision.NO, NAME,
|
return allocation.decision(Decision.NO, NAME,
|
||||||
"after allocating this shard this node would be above the high watermark " +
|
"the shard cannot remain on this node because it is above the high watermark [%s=%s] " +
|
||||||
"and there would be less than required [%s%%] free disk on node, free: [%s%%]",
|
"and there is less than the required [%s%%] free disk on node, actual free: [%s%%]",
|
||||||
diskThresholdSettings.getFreeDiskThresholdHigh(), freeDiskPercentage);
|
CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(),
|
||||||
|
diskThresholdSettings.getHighWatermarkRaw(),
|
||||||
|
diskThresholdSettings.getFreeDiskThresholdHigh(), freeDiskPercentage);
|
||||||
}
|
}
|
||||||
|
|
||||||
return allocation.decision(Decision.YES, NAME,
|
return allocation.decision(Decision.YES, NAME,
|
||||||
|
|
|
@ -98,7 +98,8 @@ public class EnableAllocationDecider extends AllocationDecider {
|
||||||
@Override
|
@Override
|
||||||
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
||||||
if (allocation.ignoreDisable()) {
|
if (allocation.ignoreDisable()) {
|
||||||
return allocation.decision(Decision.YES, NAME, "allocation is explicitly ignoring any disabling of allocation");
|
return allocation.decision(Decision.YES, NAME,
|
||||||
|
"explicitly ignoring any disabling of allocation due to manual allocation commands via the reroute API");
|
||||||
}
|
}
|
||||||
|
|
||||||
final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
|
final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
|
||||||
|
|
|
@ -64,12 +64,15 @@ public class FilterAllocationDecider extends AllocationDecider {
|
||||||
|
|
||||||
public static final String NAME = "filter";
|
public static final String NAME = "filter";
|
||||||
|
|
||||||
|
private static final String CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX = "cluster.routing.allocation.require";
|
||||||
|
private static final String CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.include";
|
||||||
|
private static final String CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX = "cluster.routing.allocation.exclude";
|
||||||
public static final Setting<Settings> CLUSTER_ROUTING_REQUIRE_GROUP_SETTING =
|
public static final Setting<Settings> CLUSTER_ROUTING_REQUIRE_GROUP_SETTING =
|
||||||
Setting.groupSetting("cluster.routing.allocation.require.", Property.Dynamic, Property.NodeScope);
|
Setting.groupSetting(CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX + ".", Property.Dynamic, Property.NodeScope);
|
||||||
public static final Setting<Settings> CLUSTER_ROUTING_INCLUDE_GROUP_SETTING =
|
public static final Setting<Settings> CLUSTER_ROUTING_INCLUDE_GROUP_SETTING =
|
||||||
Setting.groupSetting("cluster.routing.allocation.include.", Property.Dynamic, Property.NodeScope);
|
Setting.groupSetting(CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX + ".", Property.Dynamic, Property.NodeScope);
|
||||||
public static final Setting<Settings> CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING =
|
public static final Setting<Settings> CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING =
|
||||||
Setting.groupSetting("cluster.routing.allocation.exclude.", Property.Dynamic, Property.NodeScope);
|
Setting.groupSetting(CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX + ".", Property.Dynamic, Property.NodeScope);
|
||||||
|
|
||||||
private volatile DiscoveryNodeFilters clusterRequireFilters;
|
private volatile DiscoveryNodeFilters clusterRequireFilters;
|
||||||
private volatile DiscoveryNodeFilters clusterIncludeFilters;
|
private volatile DiscoveryNodeFilters clusterIncludeFilters;
|
||||||
|
@ -96,8 +99,10 @@ public class FilterAllocationDecider extends AllocationDecider {
|
||||||
if (initialRecoveryFilters != null &&
|
if (initialRecoveryFilters != null &&
|
||||||
RecoverySource.isInitialRecovery(shardRouting.recoverySource().getType()) &&
|
RecoverySource.isInitialRecovery(shardRouting.recoverySource().getType()) &&
|
||||||
initialRecoveryFilters.match(node.node()) == false) {
|
initialRecoveryFilters.match(node.node()) == false) {
|
||||||
return allocation.decision(Decision.NO, NAME, "node does not match index initial recovery filters [%s]",
|
String explanation = (shardRouting.recoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) ?
|
||||||
indexMd.includeFilters());
|
"initial allocation of the shrunken index is only allowed on nodes [%s] that hold a copy of every shard in the index" :
|
||||||
|
"initial allocation of the index is only allowed on nodes [%s]";
|
||||||
|
return allocation.decision(Decision.NO, NAME, explanation, initialRecoveryFilters);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return shouldFilter(shardRouting, node, allocation);
|
return shouldFilter(shardRouting, node, allocation);
|
||||||
|
@ -136,17 +141,20 @@ public class FilterAllocationDecider extends AllocationDecider {
|
||||||
private Decision shouldIndexFilter(IndexMetaData indexMd, RoutingNode node, RoutingAllocation allocation) {
|
private Decision shouldIndexFilter(IndexMetaData indexMd, RoutingNode node, RoutingAllocation allocation) {
|
||||||
if (indexMd.requireFilters() != null) {
|
if (indexMd.requireFilters() != null) {
|
||||||
if (!indexMd.requireFilters().match(node.node())) {
|
if (!indexMd.requireFilters().match(node.node())) {
|
||||||
return allocation.decision(Decision.NO, NAME, "node does not match index required filters [%s]", indexMd.requireFilters());
|
return allocation.decision(Decision.NO, NAME, "node does not match [%s] filters [%s]",
|
||||||
|
IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_PREFIX, indexMd.requireFilters());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (indexMd.includeFilters() != null) {
|
if (indexMd.includeFilters() != null) {
|
||||||
if (!indexMd.includeFilters().match(node.node())) {
|
if (!indexMd.includeFilters().match(node.node())) {
|
||||||
return allocation.decision(Decision.NO, NAME, "node does not match index include filters [%s]", indexMd.includeFilters());
|
return allocation.decision(Decision.NO, NAME, "node does not match [%s] filters [%s]",
|
||||||
|
IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_PREFIX, indexMd.includeFilters());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (indexMd.excludeFilters() != null) {
|
if (indexMd.excludeFilters() != null) {
|
||||||
if (indexMd.excludeFilters().match(node.node())) {
|
if (indexMd.excludeFilters().match(node.node())) {
|
||||||
return allocation.decision(Decision.NO, NAME, "node matches index exclude filters [%s]", indexMd.excludeFilters());
|
return allocation.decision(Decision.NO, NAME, "node matches [%s] filters [%s]",
|
||||||
|
IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey(), indexMd.excludeFilters());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
@ -155,17 +163,20 @@ public class FilterAllocationDecider extends AllocationDecider {
|
||||||
private Decision shouldClusterFilter(RoutingNode node, RoutingAllocation allocation) {
|
private Decision shouldClusterFilter(RoutingNode node, RoutingAllocation allocation) {
|
||||||
if (clusterRequireFilters != null) {
|
if (clusterRequireFilters != null) {
|
||||||
if (!clusterRequireFilters.match(node.node())) {
|
if (!clusterRequireFilters.match(node.node())) {
|
||||||
return allocation.decision(Decision.NO, NAME, "node does not match global required filters [%s]", clusterRequireFilters);
|
return allocation.decision(Decision.NO, NAME, "node does not match [%s] filters [%s]",
|
||||||
|
CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX, clusterRequireFilters);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (clusterIncludeFilters != null) {
|
if (clusterIncludeFilters != null) {
|
||||||
if (!clusterIncludeFilters.match(node.node())) {
|
if (!clusterIncludeFilters.match(node.node())) {
|
||||||
return allocation.decision(Decision.NO, NAME, "node does not match global include filters [%s]", clusterIncludeFilters);
|
return allocation.decision(Decision.NO, NAME, "node does not [%s] filters [%s]",
|
||||||
|
CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX, clusterIncludeFilters);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (clusterExcludeFilters != null) {
|
if (clusterExcludeFilters != null) {
|
||||||
if (clusterExcludeFilters.match(node.node())) {
|
if (clusterExcludeFilters.match(node.node())) {
|
||||||
return allocation.decision(Decision.NO, NAME, "node matches global exclude filters [%s]", clusterExcludeFilters);
|
return allocation.decision(Decision.NO, NAME, "node matches [%s] filters [%s]",
|
||||||
|
CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX, clusterExcludeFilters);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -37,8 +37,8 @@ public class RebalanceOnlyWhenActiveAllocationDecider extends AllocationDecider
|
||||||
@Override
|
@Override
|
||||||
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
|
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
|
||||||
if (!allocation.routingNodes().allReplicasActive(shardRouting.shardId(), allocation.metaData())) {
|
if (!allocation.routingNodes().allReplicasActive(shardRouting.shardId(), allocation.metaData())) {
|
||||||
return allocation.decision(Decision.NO, NAME, "rebalancing can not occur if not all replicas are active in the cluster");
|
return allocation.decision(Decision.NO, NAME, "rebalancing is not allowed until all replicas in the cluster are active");
|
||||||
}
|
}
|
||||||
return allocation.decision(Decision.YES, NAME, "all replicas are active in the cluster, rebalancing can occur");
|
return allocation.decision(Decision.YES, NAME, "rebalancing is allowed as all replicas are active in the cluster");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,8 +61,15 @@ public class SameShardAllocationDecider extends AllocationDecider {
|
||||||
Iterable<ShardRouting> assignedShards = allocation.routingNodes().assignedShards(shardRouting.shardId());
|
Iterable<ShardRouting> assignedShards = allocation.routingNodes().assignedShards(shardRouting.shardId());
|
||||||
for (ShardRouting assignedShard : assignedShards) {
|
for (ShardRouting assignedShard : assignedShards) {
|
||||||
if (node.nodeId().equals(assignedShard.currentNodeId())) {
|
if (node.nodeId().equals(assignedShard.currentNodeId())) {
|
||||||
return allocation.decision(Decision.NO, NAME,
|
if (assignedShard.isSameAllocation(shardRouting)) {
|
||||||
"the shard cannot be allocated on the same node id [%s] on which it already exists", node.nodeId());
|
return allocation.decision(Decision.NO, NAME,
|
||||||
|
"the shard cannot be allocated to the node on which it already exists [%s]",
|
||||||
|
shardRouting.toString());
|
||||||
|
} else {
|
||||||
|
return allocation.decision(Decision.NO, NAME,
|
||||||
|
"the shard cannot be allocated to the same node on which a copy of the shard [%s] already exists",
|
||||||
|
assignedShard.toString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (sameHost) {
|
if (sameHost) {
|
||||||
|
@ -72,27 +79,32 @@ public class SameShardAllocationDecider extends AllocationDecider {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// check if its on the same host as the one we want to allocate to
|
// check if its on the same host as the one we want to allocate to
|
||||||
boolean checkNodeOnSameHost = false;
|
boolean checkNodeOnSameHostName = false;
|
||||||
|
boolean checkNodeOnSameHostAddress = false;
|
||||||
if (Strings.hasLength(checkNode.node().getHostAddress()) && Strings.hasLength(node.node().getHostAddress())) {
|
if (Strings.hasLength(checkNode.node().getHostAddress()) && Strings.hasLength(node.node().getHostAddress())) {
|
||||||
if (checkNode.node().getHostAddress().equals(node.node().getHostAddress())) {
|
if (checkNode.node().getHostAddress().equals(node.node().getHostAddress())) {
|
||||||
checkNodeOnSameHost = true;
|
checkNodeOnSameHostAddress = true;
|
||||||
}
|
}
|
||||||
} else if (Strings.hasLength(checkNode.node().getHostName()) && Strings.hasLength(node.node().getHostName())) {
|
} else if (Strings.hasLength(checkNode.node().getHostName()) && Strings.hasLength(node.node().getHostName())) {
|
||||||
if (checkNode.node().getHostName().equals(node.node().getHostName())) {
|
if (checkNode.node().getHostName().equals(node.node().getHostName())) {
|
||||||
checkNodeOnSameHost = true;
|
checkNodeOnSameHostName = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (checkNodeOnSameHost) {
|
if (checkNodeOnSameHostAddress || checkNodeOnSameHostName) {
|
||||||
for (ShardRouting assignedShard : assignedShards) {
|
for (ShardRouting assignedShard : assignedShards) {
|
||||||
if (checkNode.nodeId().equals(assignedShard.currentNodeId())) {
|
if (checkNode.nodeId().equals(assignedShard.currentNodeId())) {
|
||||||
|
String hostType = checkNodeOnSameHostAddress ? "address" : "name";
|
||||||
|
String host = checkNodeOnSameHostAddress ? node.node().getHostAddress() : node.node().getHostName();
|
||||||
return allocation.decision(Decision.NO, NAME,
|
return allocation.decision(Decision.NO, NAME,
|
||||||
"shard cannot be allocated on the same host [%s] on which it already exists", node.nodeId());
|
"the shard cannot be allocated on host %s [%s], where it already exists on node [%s]; " +
|
||||||
|
"set [%s] to false to allow multiple nodes on the same host to hold the same shard copies",
|
||||||
|
hostType, host, node.nodeId(), CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING.getKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return allocation.decision(Decision.YES, NAME, "shard is not allocated to same node or host");
|
return allocation.decision(Decision.YES, NAME, "the shard does not exist on the same " + (sameHost ? "host" : "node"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -107,17 +107,18 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (clusterShardLimit > 0 && nodeShardCount >= clusterShardLimit) {
|
if (clusterShardLimit > 0 && nodeShardCount >= clusterShardLimit) {
|
||||||
return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], cluster-level limit per node: [%d]",
|
return allocation.decision(Decision.NO, NAME,
|
||||||
nodeShardCount, clusterShardLimit);
|
"too many shards [%d] allocated to this node, [%s=%d]",
|
||||||
|
nodeShardCount, CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), clusterShardLimit);
|
||||||
}
|
}
|
||||||
if (indexShardLimit > 0 && indexShardCount >= indexShardLimit) {
|
if (indexShardLimit > 0 && indexShardCount >= indexShardLimit) {
|
||||||
return allocation.decision(Decision.NO, NAME,
|
return allocation.decision(Decision.NO, NAME,
|
||||||
"too many shards for this index [%s] on node [%d], index-level limit per node: [%d]",
|
"too many shards [%d] allocated to this node for index [%s], [%s=%d]",
|
||||||
shardRouting.index(), indexShardCount, indexShardLimit);
|
indexShardCount, shardRouting.getIndexName(), INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), indexShardLimit);
|
||||||
}
|
}
|
||||||
return allocation.decision(Decision.YES, NAME,
|
return allocation.decision(Decision.YES, NAME,
|
||||||
"the shard count is under index limit [%d] and cluster level node limit [%d] of total shards per node",
|
"the shard count [%d] for this node is under the index limit [%d] and cluster level node limit [%d]",
|
||||||
indexShardLimit, clusterShardLimit);
|
nodeShardCount, indexShardLimit, clusterShardLimit);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -148,17 +149,18 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
|
||||||
// Subtle difference between the `canAllocate` and `canRemain` is that
|
// Subtle difference between the `canAllocate` and `canRemain` is that
|
||||||
// this checks > while canAllocate checks >=
|
// this checks > while canAllocate checks >=
|
||||||
if (clusterShardLimit > 0 && nodeShardCount > clusterShardLimit) {
|
if (clusterShardLimit > 0 && nodeShardCount > clusterShardLimit) {
|
||||||
return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], cluster-level limit per node: [%d]",
|
return allocation.decision(Decision.NO, NAME,
|
||||||
nodeShardCount, clusterShardLimit);
|
"too many shards [%d] allocated to this node, [%s=%d]",
|
||||||
|
nodeShardCount, CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), clusterShardLimit);
|
||||||
}
|
}
|
||||||
if (indexShardLimit > 0 && indexShardCount > indexShardLimit) {
|
if (indexShardLimit > 0 && indexShardCount > indexShardLimit) {
|
||||||
return allocation.decision(Decision.NO, NAME,
|
return allocation.decision(Decision.NO, NAME,
|
||||||
"too many shards for this index [%s] on node [%d], index-level limit per node: [%d]",
|
"too many shards [%d] allocated to this node for index [%s], [%s=%d]",
|
||||||
shardRouting.index(), indexShardCount, indexShardLimit);
|
indexShardCount, shardRouting.getIndexName(), INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), indexShardLimit);
|
||||||
}
|
}
|
||||||
return allocation.decision(Decision.YES, NAME,
|
return allocation.decision(Decision.YES, NAME,
|
||||||
"the shard count is under index limit [%d] and cluster level node limit [%d] of total shards per node",
|
"the shard count [%d] for this node is under the index limit [%d] and cluster level node limit [%d]",
|
||||||
indexShardLimit, clusterShardLimit);
|
nodeShardCount, indexShardLimit, clusterShardLimit);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -182,10 +184,12 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
|
||||||
nodeShardCount++;
|
nodeShardCount++;
|
||||||
}
|
}
|
||||||
if (clusterShardLimit >= 0 && nodeShardCount >= clusterShardLimit) {
|
if (clusterShardLimit >= 0 && nodeShardCount >= clusterShardLimit) {
|
||||||
return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], cluster-level limit per node: [%d]",
|
return allocation.decision(Decision.NO, NAME,
|
||||||
nodeShardCount, clusterShardLimit);
|
"too many shards [%d] allocated to this node, [%s=%d]",
|
||||||
|
nodeShardCount, CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), clusterShardLimit);
|
||||||
}
|
}
|
||||||
return allocation.decision(Decision.YES, NAME, "the shard count is under node limit [%d] of total shards per node",
|
return allocation.decision(Decision.YES, NAME,
|
||||||
clusterShardLimit);
|
"the shard count [%d] for this node is under the cluster level node limit [%d]",
|
||||||
|
nodeShardCount, clusterShardLimit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,15 +77,16 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider {
|
||||||
if (shardSnapshotStatus != null && !shardSnapshotStatus.state().completed() && shardSnapshotStatus.nodeId() != null &&
|
if (shardSnapshotStatus != null && !shardSnapshotStatus.state().completed() && shardSnapshotStatus.nodeId() != null &&
|
||||||
shardSnapshotStatus.nodeId().equals(shardRouting.currentNodeId())) {
|
shardSnapshotStatus.nodeId().equals(shardRouting.currentNodeId())) {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("Preventing snapshotted shard [{}] to be moved from node [{}]",
|
logger.trace("Preventing snapshotted shard [{}] from being moved away from node [{}]",
|
||||||
shardRouting.shardId(), shardSnapshotStatus.nodeId());
|
shardRouting.shardId(), shardSnapshotStatus.nodeId());
|
||||||
}
|
}
|
||||||
return allocation.decision(Decision.NO, NAME, "snapshot for shard [%s] is currently running on node [%s]",
|
return allocation.decision(Decision.THROTTLE, NAME,
|
||||||
shardRouting.shardId(), shardSnapshotStatus.nodeId());
|
"waiting for snapshotting of shard [%s] to complete on this node [%s]",
|
||||||
|
shardRouting.shardId(), shardSnapshotStatus.nodeId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return allocation.decision(Decision.YES, NAME, "the shard is not primary or relocation is disabled");
|
return allocation.decision(Decision.YES, NAME, "the shard is not being snapshotted");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -126,8 +126,9 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
|
||||||
}
|
}
|
||||||
if (primariesInRecovery >= primariesInitialRecoveries) {
|
if (primariesInRecovery >= primariesInitialRecoveries) {
|
||||||
// TODO: Should index creation not be throttled for primary shards?
|
// TODO: Should index creation not be throttled for primary shards?
|
||||||
return allocation.decision(THROTTLE, NAME, "too many primaries are currently recovering [%d], limit: [%d]",
|
return allocation.decision(THROTTLE, NAME, "reached the limit of ongoing initial primary recoveries [%d], [%s=%d]",
|
||||||
primariesInRecovery, primariesInitialRecoveries);
|
primariesInRecovery, CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(),
|
||||||
|
primariesInitialRecoveries);
|
||||||
} else {
|
} else {
|
||||||
return allocation.decision(YES, NAME, "below primary recovery limit of [%d]", primariesInitialRecoveries);
|
return allocation.decision(YES, NAME, "below primary recovery limit of [%d]", primariesInitialRecoveries);
|
||||||
}
|
}
|
||||||
|
@ -138,8 +139,11 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
|
||||||
// Allocating a shard to this node will increase the incoming recoveries
|
// Allocating a shard to this node will increase the incoming recoveries
|
||||||
int currentInRecoveries = allocation.routingNodes().getIncomingRecoveries(node.nodeId());
|
int currentInRecoveries = allocation.routingNodes().getIncomingRecoveries(node.nodeId());
|
||||||
if (currentInRecoveries >= concurrentIncomingRecoveries) {
|
if (currentInRecoveries >= concurrentIncomingRecoveries) {
|
||||||
return allocation.decision(THROTTLE, NAME, "too many incoming shards are currently recovering [%d], limit: [%d]",
|
return allocation.decision(THROTTLE, NAME,
|
||||||
currentInRecoveries, concurrentIncomingRecoveries);
|
"reached the limit of incoming shard recoveries [%d], [%s=%d] (can also be set via [%s])",
|
||||||
|
currentInRecoveries, CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(),
|
||||||
|
concurrentIncomingRecoveries,
|
||||||
|
CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey());
|
||||||
} else {
|
} else {
|
||||||
// search for corresponding recovery source (= primary shard) and check number of outgoing recoveries on that node
|
// search for corresponding recovery source (= primary shard) and check number of outgoing recoveries on that node
|
||||||
ShardRouting primaryShard = allocation.routingNodes().activePrimary(shardRouting.shardId());
|
ShardRouting primaryShard = allocation.routingNodes().activePrimary(shardRouting.shardId());
|
||||||
|
@ -148,8 +152,13 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
|
||||||
}
|
}
|
||||||
int primaryNodeOutRecoveries = allocation.routingNodes().getOutgoingRecoveries(primaryShard.currentNodeId());
|
int primaryNodeOutRecoveries = allocation.routingNodes().getOutgoingRecoveries(primaryShard.currentNodeId());
|
||||||
if (primaryNodeOutRecoveries >= concurrentOutgoingRecoveries) {
|
if (primaryNodeOutRecoveries >= concurrentOutgoingRecoveries) {
|
||||||
return allocation.decision(THROTTLE, NAME, "too many outgoing shards are currently recovering [%d], limit: [%d]",
|
return allocation.decision(THROTTLE, NAME,
|
||||||
primaryNodeOutRecoveries, concurrentOutgoingRecoveries);
|
"reached the limit of outgoing shard recoveries [%d] on the node [%s] which holds the primary, " +
|
||||||
|
"[%s=%d] (can also be set via [%s])",
|
||||||
|
primaryNodeOutRecoveries, node.nodeId(),
|
||||||
|
CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(),
|
||||||
|
concurrentOutgoingRecoveries,
|
||||||
|
CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey());
|
||||||
} else {
|
} else {
|
||||||
return allocation.decision(YES, NAME, "below shard recovery limit of outgoing: [%d < %d] incoming: [%d < %d]",
|
return allocation.decision(YES, NAME, "below shard recovery limit of outgoing: [%d < %d] incoming: [%d < %d]",
|
||||||
primaryNodeOutRecoveries,
|
primaryNodeOutRecoveries,
|
||||||
|
|
|
@ -135,20 +135,20 @@ public final class ClusterAllocationExplainIT extends ESIntegTestCase {
|
||||||
|
|
||||||
assertEquals(d.type(), Decision.Type.NO);
|
assertEquals(d.type(), Decision.Type.NO);
|
||||||
if (noAttrNode.equals(nodeName)) {
|
if (noAttrNode.equals(nodeName)) {
|
||||||
assertThat(d.toString(), containsString("node does not match index include filters [foo:\"bar\"]"));
|
assertThat(d.toString(), containsString("node does not match [index.routing.allocation.include] filters [foo:\"bar\"]"));
|
||||||
assertNull(storeStatus);
|
assertNull(storeStatus);
|
||||||
assertEquals("the shard cannot be assigned because one or more allocation decider returns a 'NO' decision",
|
assertEquals("the shard cannot be assigned because one or more allocation decider returns a 'NO' decision",
|
||||||
explanation.getFinalExplanation());
|
explanation.getFinalExplanation());
|
||||||
assertEquals(ClusterAllocationExplanation.FinalDecision.NO, finalDecision);
|
assertEquals(ClusterAllocationExplanation.FinalDecision.NO, finalDecision);
|
||||||
} else if (barAttrNode.equals(nodeName)) {
|
} else if (barAttrNode.equals(nodeName)) {
|
||||||
assertThat(d.toString(), containsString("node does not match index include filters [foo:\"bar\"]"));
|
assertThat(d.toString(), containsString("node does not match [index.routing.allocation.include] filters [foo:\"bar\"]"));
|
||||||
barAttrWeight = weight;
|
barAttrWeight = weight;
|
||||||
assertNull(storeStatus);
|
assertNull(storeStatus);
|
||||||
assertEquals("the shard cannot be assigned because one or more allocation decider returns a 'NO' decision",
|
assertEquals("the shard cannot be assigned because one or more allocation decider returns a 'NO' decision",
|
||||||
explanation.getFinalExplanation());
|
explanation.getFinalExplanation());
|
||||||
assertEquals(ClusterAllocationExplanation.FinalDecision.NO, finalDecision);
|
assertEquals(ClusterAllocationExplanation.FinalDecision.NO, finalDecision);
|
||||||
} else if (fooBarAttrNode.equals(nodeName)) {
|
} else if (fooBarAttrNode.equals(nodeName)) {
|
||||||
assertThat(d.toString(), containsString("the shard cannot be allocated on the same node id"));
|
assertThat(d.toString(), containsString("the shard cannot be allocated to the same node"));
|
||||||
fooBarAttrWeight = weight;
|
fooBarAttrWeight = weight;
|
||||||
assertEquals(storeStatus.getAllocationStatus(),
|
assertEquals(storeStatus.getAllocationStatus(),
|
||||||
IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY);
|
IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY);
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
|
|
||||||
package org.elasticsearch.action.admin.cluster.allocation;
|
package org.elasticsearch.action.admin.cluster.allocation;
|
||||||
|
|
||||||
import org.elasticsearch.client.Requests;
|
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||||
|
|
||||||
|
@ -51,12 +50,12 @@ public final class ClusterAllocationExplainTests extends ESSingleNodeTestCase {
|
||||||
assertEquals(Decision.Type.NO, d.type());
|
assertEquals(Decision.Type.NO, d.type());
|
||||||
assertEquals(ClusterAllocationExplanation.FinalDecision.NO, fd);
|
assertEquals(ClusterAllocationExplanation.FinalDecision.NO, fd);
|
||||||
assertEquals(ClusterAllocationExplanation.StoreCopy.AVAILABLE, storeCopy);
|
assertEquals(ClusterAllocationExplanation.StoreCopy.AVAILABLE, storeCopy);
|
||||||
assertTrue(d.toString(), d.toString().contains("NO(the shard cannot be allocated on the same node id"));
|
assertTrue(d.toString(), d.toString().contains("NO(the shard cannot be allocated to the same node"));
|
||||||
assertTrue(d instanceof Decision.Multi);
|
assertTrue(d instanceof Decision.Multi);
|
||||||
Decision.Multi md = (Decision.Multi) d;
|
Decision.Multi md = (Decision.Multi) d;
|
||||||
Decision ssd = md.getDecisions().get(0);
|
Decision ssd = md.getDecisions().get(0);
|
||||||
assertEquals(Decision.Type.NO, ssd.type());
|
assertEquals(Decision.Type.NO, ssd.type());
|
||||||
assertTrue(ssd.toString(), ssd.toString().contains("NO(the shard cannot be allocated on the same node id"));
|
assertTrue(ssd.toString(), ssd.toString().contains("NO(the shard cannot be allocated to the same node"));
|
||||||
Float weight = explanation.getWeight();
|
Float weight = explanation.getWeight();
|
||||||
assertNotNull("should have a weight", weight);
|
assertNotNull("should have a weight", weight);
|
||||||
|
|
||||||
|
@ -78,12 +77,14 @@ public final class ClusterAllocationExplainTests extends ESSingleNodeTestCase {
|
||||||
assertEquals(Decision.Type.NO, d.type());
|
assertEquals(Decision.Type.NO, d.type());
|
||||||
assertEquals(ClusterAllocationExplanation.FinalDecision.ALREADY_ASSIGNED, fd);
|
assertEquals(ClusterAllocationExplanation.FinalDecision.ALREADY_ASSIGNED, fd);
|
||||||
assertEquals(ClusterAllocationExplanation.StoreCopy.AVAILABLE, storeCopy);
|
assertEquals(ClusterAllocationExplanation.StoreCopy.AVAILABLE, storeCopy);
|
||||||
assertTrue(d.toString(), d.toString().contains("NO(the shard cannot be allocated on the same node id"));
|
assertTrue(d.toString(), d.toString().contains(
|
||||||
|
"NO(the shard cannot be allocated to the node on which it already exists [[test][0]"));
|
||||||
assertTrue(d instanceof Decision.Multi);
|
assertTrue(d instanceof Decision.Multi);
|
||||||
md = (Decision.Multi) d;
|
md = (Decision.Multi) d;
|
||||||
ssd = md.getDecisions().get(0);
|
ssd = md.getDecisions().get(0);
|
||||||
assertEquals(Decision.Type.NO, ssd.type());
|
assertEquals(Decision.Type.NO, ssd.type());
|
||||||
assertTrue(ssd.toString(), ssd.toString().contains("NO(the shard cannot be allocated on the same node id"));
|
assertTrue(ssd.toString(), ssd.toString().contains(
|
||||||
|
"NO(the shard cannot be allocated to the node on which it already exists [[test][0]"));
|
||||||
weight = explanation.getWeight();
|
weight = explanation.getWeight();
|
||||||
assertNotNull("should have a weight", weight);
|
assertNotNull("should have a weight", weight);
|
||||||
|
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.RoutingTable;
|
||||||
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
|
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
|
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
|
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
|
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
|
||||||
|
@ -74,12 +75,17 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase {
|
||||||
// after failing the shard we are unassigned since the node is blacklisted and we can't initialize on the other node
|
// after failing the shard we are unassigned since the node is blacklisted and we can't initialize on the other node
|
||||||
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state,
|
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state,
|
||||||
null, 0, false);
|
null, 0, false);
|
||||||
assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).primaryShard(),
|
allocation.debugDecision(true);
|
||||||
state.getRoutingNodes().node("node2")
|
Decision.Single decision = (Decision.Single) filterAllocationDecider.canAllocate(
|
||||||
, allocation), Decision.YES);
|
routingTable.index("idx").shard(0).primaryShard(),
|
||||||
assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).primaryShard(),
|
state.getRoutingNodes().node("node2"), allocation);
|
||||||
state.getRoutingNodes().node("node1")
|
assertEquals(Type.YES, decision.type());
|
||||||
, allocation), Decision.NO);
|
assertEquals("node passes include/exclude/require filters", decision.getExplanation());
|
||||||
|
decision = (Decision.Single) filterAllocationDecider.canAllocate(
|
||||||
|
routingTable.index("idx").shard(0).primaryShard(),
|
||||||
|
state.getRoutingNodes().node("node1"), allocation);
|
||||||
|
assertEquals(Type.NO, decision.type());
|
||||||
|
assertEquals("initial allocation of the index is only allowed on nodes [_id:\"node2\"]", decision.getExplanation());
|
||||||
|
|
||||||
state = service.reroute(state, "try allocate again");
|
state = service.reroute(state, "try allocate again");
|
||||||
routingTable = state.routingTable();
|
routingTable = state.routingTable();
|
||||||
|
@ -114,12 +120,17 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase {
|
||||||
|
|
||||||
allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state,
|
allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state,
|
||||||
null, 0, false);
|
null, 0, false);
|
||||||
assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).shards().get(0),
|
allocation.debugDecision(true);
|
||||||
state.getRoutingNodes().node("node2")
|
decision = (Decision.Single) filterAllocationDecider.canAllocate(
|
||||||
, allocation), Decision.YES);
|
routingTable.index("idx").shard(0).shards().get(0),
|
||||||
assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).shards().get(0),
|
state.getRoutingNodes().node("node2"), allocation);
|
||||||
state.getRoutingNodes().node("node1")
|
assertEquals(Type.YES, decision.type());
|
||||||
, allocation), Decision.YES);
|
assertEquals("node passes include/exclude/require filters", decision.getExplanation());
|
||||||
|
decision = (Decision.Single) filterAllocationDecider.canAllocate(
|
||||||
|
routingTable.index("idx").shard(0).shards().get(0),
|
||||||
|
state.getRoutingNodes().node("node1"), allocation);
|
||||||
|
assertEquals(Type.YES, decision.type());
|
||||||
|
assertEquals("node passes include/exclude/require filters", decision.getExplanation());
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClusterState createInitialClusterState(AllocationService service, Settings settings) {
|
private ClusterState createInitialClusterState(AllocationService service, Settings settings) {
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||||
|
import org.elasticsearch.cluster.routing.RecoverySource;
|
||||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||||
|
@ -783,9 +784,10 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
strategy.reroute(clusterState, cmds, false, false);
|
strategy.reroute(clusterState, cmds, false, false);
|
||||||
fail("should not have been able to reroute the shard");
|
fail("should not have been able to reroute the shard");
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
assertThat("can't allocated because there isn't enough room: " + e.getMessage(),
|
assertThat("can't be allocated because there isn't enough room: " + e.getMessage(),
|
||||||
e.getMessage(),
|
e.getMessage(),
|
||||||
containsString("the node is above the low watermark and has more than allowed [70.0%] used disk, free: [26.0%]"));
|
containsString("the node is above the low watermark [cluster.routing.allocation.disk.watermark.low=0.7], using " +
|
||||||
|
"more disk space than the maximum allowed [70.0%], actual free: [26.0%]"));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -852,8 +854,13 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build();
|
ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build();
|
||||||
RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), clusterState, clusterInfo,
|
RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), clusterState, clusterInfo,
|
||||||
System.nanoTime(), false);
|
System.nanoTime(), false);
|
||||||
|
routingAllocation.debugDecision(true);
|
||||||
Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
||||||
assertThat(decision.type(), equalTo(Decision.Type.NO));
|
assertThat(decision.type(), equalTo(Decision.Type.NO));
|
||||||
|
assertThat(((Decision.Single) decision).getExplanation(), containsString(
|
||||||
|
"the shard cannot remain on this node because it is above the high watermark " +
|
||||||
|
"[cluster.routing.allocation.disk.watermark.high=70%] and there is less than the required [30.0%] free disk on node, " +
|
||||||
|
"actual free: [20.0%]"));
|
||||||
|
|
||||||
// Two shards consuming each 80% of disk space while 70% is allowed, but one is relocating, so shard 0 can stay
|
// Two shards consuming each 80% of disk space while 70% is allowed, but one is relocating, so shard 0 can stay
|
||||||
firstRouting = TestShardRouting.newShardRouting("test", 0, "node1", null, true, ShardRoutingState.STARTED);
|
firstRouting = TestShardRouting.newShardRouting("test", 0, "node1", null, true, ShardRoutingState.STARTED);
|
||||||
|
@ -874,10 +881,22 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build();
|
clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build();
|
||||||
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), clusterState, clusterInfo, System.nanoTime(),
|
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), clusterState, clusterInfo, System.nanoTime(),
|
||||||
false);
|
false);
|
||||||
|
routingAllocation.debugDecision(true);
|
||||||
decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
||||||
assertThat(decision.type(), equalTo(Decision.Type.YES));
|
assertThat(decision.type(), equalTo(Decision.Type.YES));
|
||||||
|
assertEquals("there is enough disk on this node for the shard to remain, free: [60b]",
|
||||||
|
((Decision.Single) decision).getExplanation());
|
||||||
decision = diskThresholdDecider.canAllocate(fooRouting, firstRoutingNode, routingAllocation);
|
decision = diskThresholdDecider.canAllocate(fooRouting, firstRoutingNode, routingAllocation);
|
||||||
assertThat(decision.type(), equalTo(Decision.Type.NO));
|
assertThat(decision.type(), equalTo(Decision.Type.NO));
|
||||||
|
if (fooRouting.recoverySource().getType() == RecoverySource.Type.EMPTY_STORE) {
|
||||||
|
assertThat(((Decision.Single) decision).getExplanation(), containsString(
|
||||||
|
"the node is above the high watermark [cluster.routing.allocation.disk.watermark.high=70%], using more disk space than " +
|
||||||
|
"the maximum allowed [70.0%], actual free: [20.0%]"));
|
||||||
|
} else {
|
||||||
|
assertThat(((Decision.Single) decision).getExplanation(), containsString(
|
||||||
|
"the node is above the low watermark [cluster.routing.allocation.disk.watermark.low=60%], using more disk space than " +
|
||||||
|
"the maximum allowed [60.0%], actual free: [20.0%]"));
|
||||||
|
}
|
||||||
|
|
||||||
// Creating AllocationService instance and the services it depends on...
|
// Creating AllocationService instance and the services it depends on...
|
||||||
ClusterInfoService cis = new ClusterInfoService() {
|
ClusterInfoService cis = new ClusterInfoService() {
|
||||||
|
@ -972,10 +991,12 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build();
|
ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build();
|
||||||
RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), clusterState, clusterInfo,
|
RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), clusterState, clusterInfo,
|
||||||
System.nanoTime(), false);
|
System.nanoTime(), false);
|
||||||
|
routingAllocation.debugDecision(true);
|
||||||
Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
||||||
|
|
||||||
// Two shards should start happily
|
// Two shards should start happily
|
||||||
assertThat(decision.type(), equalTo(Decision.Type.YES));
|
assertThat(decision.type(), equalTo(Decision.Type.YES));
|
||||||
|
assertThat(((Decision.Single) decision).getExplanation(), containsString("there is only a single data node present"));
|
||||||
ClusterInfoService cis = new ClusterInfoService() {
|
ClusterInfoService cis = new ClusterInfoService() {
|
||||||
@Override
|
@Override
|
||||||
public ClusterInfo getClusterInfo() {
|
public ClusterInfo getClusterInfo() {
|
||||||
|
@ -1032,8 +1053,11 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
clusterState = ClusterState.builder(updateClusterState).routingTable(builder.build()).build();
|
clusterState = ClusterState.builder(updateClusterState).routingTable(builder.build()).build();
|
||||||
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), clusterState, clusterInfo, System.nanoTime(),
|
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), clusterState, clusterInfo, System.nanoTime(),
|
||||||
false);
|
false);
|
||||||
|
routingAllocation.debugDecision(true);
|
||||||
decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
||||||
assertThat(decision.type(), equalTo(Decision.Type.YES));
|
assertThat(decision.type(), equalTo(Decision.Type.YES));
|
||||||
|
assertThat(((Decision.Single) decision).getExplanation(), containsString(
|
||||||
|
"there is enough disk on this node for the shard to remain, free: [60b]"));
|
||||||
|
|
||||||
result = strategy.reroute(clusterState, "reroute");
|
result = strategy.reroute(clusterState, "reroute");
|
||||||
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().state(), equalTo(STARTED));
|
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().state(), equalTo(STARTED));
|
||||||
|
|
|
@ -52,6 +52,7 @@ import java.util.HashSet;
|
||||||
|
|
||||||
import static java.util.Collections.emptyMap;
|
import static java.util.Collections.emptyMap;
|
||||||
import static java.util.Collections.emptySet;
|
import static java.util.Collections.emptySet;
|
||||||
|
import static org.hamcrest.Matchers.containsString;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit tests for the DiskThresholdDecider
|
* Unit tests for the DiskThresholdDecider
|
||||||
|
@ -98,8 +99,15 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
|
||||||
shardSizes.put("[test][0][p]", 10L); // 10 bytes
|
shardSizes.put("[test][0][p]", 10L); // 10 bytes
|
||||||
final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(), shardSizes.build(), ImmutableOpenMap.of());
|
final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(), shardSizes.build(), ImmutableOpenMap.of());
|
||||||
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, Collections.singleton(decider)), clusterState.getRoutingNodes(), clusterState, clusterInfo, System.nanoTime(), false);
|
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, Collections.singleton(decider)), clusterState.getRoutingNodes(), clusterState, clusterInfo, System.nanoTime(), false);
|
||||||
assertEquals(mostAvailableUsage.toString(), Decision.YES, decider.canAllocate(test_0, new RoutingNode("node_0", node_0), allocation));
|
allocation.debugDecision(true);
|
||||||
assertEquals(mostAvailableUsage.toString(), Decision.NO, decider.canAllocate(test_0, new RoutingNode("node_1", node_1), allocation));
|
Decision decision = decider.canAllocate(test_0, new RoutingNode("node_0", node_0), allocation);
|
||||||
|
assertEquals(mostAvailableUsage.toString(), Decision.Type.YES, decision.type());
|
||||||
|
assertThat(((Decision.Single) decision).getExplanation(), containsString("enough disk for shard on node"));
|
||||||
|
decision = decider.canAllocate(test_0, new RoutingNode("node_1", node_1), allocation);
|
||||||
|
assertEquals(mostAvailableUsage.toString(), Decision.Type.NO, decision.type());
|
||||||
|
assertThat(((Decision.Single) decision).getExplanation(), containsString(
|
||||||
|
"the node is above the high watermark [cluster.routing.allocation.disk.watermark.high=90%], using more disk space than " +
|
||||||
|
"the maximum allowed [90.0%]"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCanRemainUsesLeastAvailableSpace() {
|
public void testCanRemainUsesLeastAvailableSpace() {
|
||||||
|
@ -165,8 +173,16 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
|
||||||
|
|
||||||
final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(), shardSizes.build(), shardRoutingMap.build());
|
final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(), shardSizes.build(), shardRoutingMap.build());
|
||||||
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, Collections.singleton(decider)), clusterState.getRoutingNodes(), clusterState, clusterInfo, System.nanoTime(), false);
|
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, Collections.singleton(decider)), clusterState.getRoutingNodes(), clusterState, clusterInfo, System.nanoTime(), false);
|
||||||
assertEquals(Decision.YES, decider.canRemain(test_0, new RoutingNode("node_0", node_0), allocation));
|
allocation.debugDecision(true);
|
||||||
assertEquals(Decision.NO, decider.canRemain(test_1, new RoutingNode("node_1", node_1), allocation));
|
Decision decision = decider.canRemain(test_0, new RoutingNode("node_0", node_0), allocation);
|
||||||
|
assertEquals(Decision.Type.YES, decision.type());
|
||||||
|
assertThat(((Decision.Single) decision).getExplanation(), containsString(
|
||||||
|
"there is enough disk on this node for the shard to remain, free: [10b]"));
|
||||||
|
decision = decider.canRemain(test_1, new RoutingNode("node_1", node_1), allocation);
|
||||||
|
assertEquals(Decision.Type.NO, decision.type());
|
||||||
|
assertThat(((Decision.Single) decision).getExplanation(), containsString("the shard cannot remain on this node because it is " +
|
||||||
|
"above the high watermark [cluster.routing.allocation.disk.watermark.high=90%] and there is less than the required [10.0%] " +
|
||||||
|
"free disk on node, actual free: [9.0%]"));
|
||||||
try {
|
try {
|
||||||
decider.canRemain(test_0, new RoutingNode("node_1", node_1), allocation);
|
decider.canRemain(test_0, new RoutingNode("node_1", node_1), allocation);
|
||||||
fail("not allocated on this node");
|
fail("not allocated on this node");
|
||||||
|
@ -180,9 +196,15 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
|
||||||
// not allocated on that node
|
// not allocated on that node
|
||||||
}
|
}
|
||||||
|
|
||||||
assertEquals("can stay since allocated on a different path with enough space", Decision.YES, decider.canRemain(test_2, new RoutingNode("node_1", node_1), allocation));
|
decision = decider.canRemain(test_2, new RoutingNode("node_1", node_1), allocation);
|
||||||
|
assertEquals("can stay since allocated on a different path with enough space", Decision.Type.YES, decision.type());
|
||||||
|
assertThat(((Decision.Single) decision).getExplanation(), containsString(
|
||||||
|
"this shard is not allocated on the most utilized disk and can remain"));
|
||||||
|
|
||||||
assertEquals("can stay since we don't have information about this shard", Decision.YES, decider.canRemain(test_2, new RoutingNode("node_1", node_1), allocation));
|
decision = decider.canRemain(test_2, new RoutingNode("node_1", node_1), allocation);
|
||||||
|
assertEquals("can stay since we don't have information about this shard", Decision.Type.YES, decision.type());
|
||||||
|
assertThat(((Decision.Single) decision).getExplanation(), containsString(
|
||||||
|
"this shard is not allocated on the most utilized disk and can remain"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue