diff --git a/docs/reference/cluster/update-settings.asciidoc b/docs/reference/cluster/update-settings.asciidoc index 42571fbb0fe..c3863f86cf2 100644 --- a/docs/reference/cluster/update-settings.asciidoc +++ b/docs/reference/cluster/update-settings.asciidoc @@ -78,14 +78,14 @@ due to forced awareness or allocation filtering. `cluster.routing.allocation.balance.index`:: Defines a factor to the number of shards per index allocated - on a specific node (float). Defaults to `0.5f`. Raising this raises the + on a specific node (float). Defaults to `0.55f`. Raising this raises the tendency to equalize the number of shards per index across all nodes in the cluster. `cluster.routing.allocation.balance.primary`:: Defines a weight factor for the number of primaries of a specific index - allocated on a node (float). `0.05f`. Raising this raises the tendency - to equalize the number of primary shards across all nodes in the cluster. + allocated on a node (float). `0.00f`. Raising this raises the tendency + to equalize the number of primary shards across all nodes in the cluster. deprecated[1.3.8] `cluster.routing.allocation.balance.threshold`:: Minimal optimization value of operations that should be performed (non diff --git a/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java index 175620edb1a..5f0356d3572 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java +++ b/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java @@ -259,13 +259,13 @@ public class IndexRoutingTable implements Iterable { /** * Returns a {@link List} of shards that match one of the states listed in {@link ShardRoutingState states} * - * @param states a set of {@link ShardRoutingState states} + * @param state {@link ShardRoutingState} to retrieve * @return a {@link List} of shards that match one of the given {@link ShardRoutingState states} */ - public List shardsWithState(ShardRoutingState... states) { + public List shardsWithState(ShardRoutingState state) { List shards = newArrayList(); for (IndexShardRoutingTable shardRoutingTable : this) { - shards.addAll(shardRoutingTable.shardsWithState(states)); + shards.addAll(shardRoutingTable.shardsWithState(state)); } return shards; } diff --git a/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index ceccd3564c2..00e50b76129 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -476,13 +476,14 @@ public class IndexShardRoutingTable implements Iterable { return shards; } - public List shardsWithState(ShardRoutingState... states) { + public List shardsWithState(ShardRoutingState state) { + if (state == ShardRoutingState.INITIALIZING) { + return allInitializingShards; + } List shards = newArrayList(); for (ShardRouting shardEntry : this) { - for (ShardRoutingState state : states) { - if (shardEntry.state() == state) { - shards.add(shardEntry); - } + if (shardEntry.state() == state) { + shards.add(shardEntry); } } return shards; diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index 578426100fe..6f44a1d11fc 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -108,10 +108,10 @@ public class RoutingTable implements Iterable { return validation; } - public List shardsWithState(ShardRoutingState... states) { + public List shardsWithState(ShardRoutingState state) { List shards = newArrayList(); for (IndexRoutingTable indexRoutingTable : this) { - shards.addAll(indexRoutingTable.shardsWithState(states)); + shards.addAll(indexRoutingTable.shardsWithState(state)); } return shards; } diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 5484498237d..90b7cdb95b1 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -71,9 +71,18 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards public static final String SETTING_SHARD_BALANCE_FACTOR = "cluster.routing.allocation.balance.shard"; public static final String SETTING_PRIMARY_BALANCE_FACTOR = "cluster.routing.allocation.balance.primary"; - private static final float DEFAULT_INDEX_BALANCE_FACTOR = 0.5f; + private static final float DEFAULT_INDEX_BALANCE_FACTOR = 0.55f; private static final float DEFAULT_SHARD_BALANCE_FACTOR = 0.45f; - private static final float DEFAULT_PRIMARY_BALANCE_FACTOR = 0.05f; + /** + * The primary balance factor was introduces as a tie-breaker to make the initial allocation + * more deterministic. Yet other mechanism have been added ensure that the algorithm is more deterministic such that this + * setting is not needed anymore. Additionally, this setting was abused to balance shards based on their primary flag which can lead + * to unexpected behavior when allocating or balancing the shards. + * + * @deprecated the threshold primary balance factor is deprecated and should not be used. + */ + @Deprecated + private static final float DEFAULT_PRIMARY_BALANCE_FACTOR = 0.0f; class ApplySettings implements NodeSettingsService.Listener { @Override @@ -191,44 +200,23 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards private final float indexBalance; private final float shardBalance; private final float primaryBalance; - private final EnumMap thetaMap = new EnumMap<>(Operation.class); + private final float[] theta; public WeightFunction(float indexBalance, float shardBalance, float primaryBalance) { float sum = indexBalance + shardBalance + primaryBalance; if (sum <= 0.0f) { throw new ElasticsearchIllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); } - final float[] defaultTheta = new float[]{shardBalance / sum, indexBalance / sum, primaryBalance / sum}; - for (Operation operation : Operation.values()) { - switch (operation) { - case THRESHOLD_CHECK: - sum = indexBalance + shardBalance; - if (sum <= 0.0f) { - thetaMap.put(operation, defaultTheta); - } else { - thetaMap.put(operation, new float[]{shardBalance / sum, indexBalance / sum, 0}); - } - break; - case BALANCE: - case ALLOCATE: - case MOVE: - thetaMap.put(operation, defaultTheta); - break; - default: - assert false; - } - } + theta = new float[]{shardBalance / sum, indexBalance / sum, primaryBalance / sum}; this.indexBalance = indexBalance; this.shardBalance = shardBalance; this.primaryBalance = primaryBalance; } public float weight(Operation operation, Balancer balancer, ModelNode node, String index) { - final float weightShard = (node.numShards() - balancer.avgShardsPerNode()); - final float weightIndex = (node.numShards(index) - balancer.avgShardsPerNode(index)); - final float weightPrimary = (node.numPrimaries() - balancer.avgPrimariesPerNode()); - final float[] theta = thetaMap.get(operation); - assert theta != null; + final float weightShard = node.numShards() - balancer.avgShardsPerNode(); + final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index); + final float weightPrimary = node.numPrimaries() - balancer.avgPrimariesPerNode(); return theta[0] * weightShard + theta[1] * weightIndex + theta[2] * weightPrimary; } @@ -250,13 +238,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards /** * Provided during move operation. */ - MOVE, - /** - * Provided when the weight delta is checked against the configured threshold. - * This can be used to ignore tie-breaking weight factors that should not - * solely trigger a relocation unless the delta is above the threshold. - */ - THRESHOLD_CHECK + MOVE } /** @@ -348,11 +330,16 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards return allocateUnassigned(unassigned, routing.ignoredUnassigned()); } + private static float absDelta(float lower, float higher) { + assert higher >= lower : higher + " lt " + lower +" but was expected to be gte"; + return Math.abs(higher - lower); + } + private static boolean lessThan(float delta, float threshold) { /* deltas close to the threshold are "rounded" to the threshold manually to prevent floating point problems if the delta is very close to the threshold ie. 1.000000002 which can trigger unnecessary balance actions*/ - return delta <= threshold + 0.001f; + return delta <= (threshold + 0.001f); } /** @@ -393,11 +380,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards final ModelNode maxNode = modelNodes[highIdx]; advance_range: if (maxNode.numShards(index) > 0) { - float delta = weights[highIdx] - weights[lowIdx]; - delta = lessThan(delta, threshold) ? delta : sorter.weight(Operation.THRESHOLD_CHECK, maxNode) - sorter.weight(Operation.THRESHOLD_CHECK, minNode); + float delta = absDelta(weights[lowIdx], weights[highIdx]); if (lessThan(delta, threshold)) { if (lowIdx > 0 && highIdx-1 > 0 // is there a chance for a higher delta? - && (weights[highIdx-1] - weights[0] > threshold) // check if we need to break at all + && (absDelta(weights[0], weights[highIdx-1]) > threshold) // check if we need to break at all ) { /* This is a special case if allocations from the "heaviest" to the "lighter" nodes is not possible * due to some allocation decider restrictions like zone awareness. if one zone has for instance @@ -747,7 +733,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards final RoutingNode node = routingNodes.node(minNode.getNodeId()); if (deciders.canAllocate(node, allocation).type() != Type.YES) { if (logger.isTraceEnabled()) { - logger.trace("Can not allocate on node [{}] remove from round decisin [{}]", node, decision.type()); + logger.trace("Can not allocate on node [{}] remove from round decision [{}]", node, decision.type()); } throttledNodes.add(minNode); } diff --git a/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceUnbalancedClusterTest.java b/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceUnbalancedClusterTest.java new file mode 100644 index 00000000000..fc3fb6af17d --- /dev/null +++ b/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceUnbalancedClusterTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.routing.allocation; + +import org.apache.lucene.util.TestUtil; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; + +import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; + +/** + * see issue #9023 + */ +public class BalanceUnbalancedClusterTest extends CatAllocationTestBase { + + @Override + protected Path getCatPath() throws IOException { + Path tmp = newTempDirPath(); + try (InputStream stream = Files.newInputStream(getResourcePath("/org/elasticsearch/cluster/routing/issue_9023.zip"))) { + TestUtil.unzip(stream, tmp); + } + return tmp.resolve("issue_9023"); + } + + @Override + protected ClusterState allocateNew(ClusterState state) { + String index = "tweets-2014-12-29:00"; + AllocationService strategy = createAllocationService(settingsBuilder() + .build()); + MetaData metaData = MetaData.builder(state.metaData()) + .put(IndexMetaData.builder(index).settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = RoutingTable.builder(state.routingTable()) + .addAsNew(metaData.index(index)) + .build(); + + ClusterState clusterState = ClusterState.builder(state).metaData(metaData).routingTable(routingTable).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + while (true) { + if (routingTable.shardsWithState(INITIALIZING).isEmpty()) { + break; + } + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + } + Map counts = new HashMap<>(); + for (IndexShardRoutingTable table : routingTable.index(index)) { + for (ShardRouting r : table) { + String s = r.currentNodeId(); + Integer count = counts.get(s); + if (count == null) { + count = 0; + } + count++; + counts.put(s, count); + } + } + for (Map.Entry count : counts.entrySet()) { + // we have 10 shards and 4 nodes so 2 nodes have 3 shards and 2 nodes have 2 shards + assertTrue("Node: " + count.getKey() + " has shard mismatch: " + count.getValue(), count.getValue() >= 2); + assertTrue("Node: " + count.getKey() + " has shard mismatch: " + count.getValue(), count.getValue() <= 3); + + } + return clusterState; + } + +} diff --git a/src/test/java/org/elasticsearch/cluster/routing/allocation/CatAllocationTestBase.java b/src/test/java/org/elasticsearch/cluster/routing/allocation/CatAllocationTestBase.java new file mode 100644 index 00000000000..15726d1ea05 --- /dev/null +++ b/src/test/java/org/elasticsearch/cluster/routing/allocation/CatAllocationTestBase.java @@ -0,0 +1,191 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import com.google.common.base.Charsets; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.engine.internal.InternalEngine; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ElasticsearchAllocationTestCase; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; + +/** + * A base testscase that allows to run tests based on the output of the CAT API + * The input is a line based cat/shards output like: + * kibana-int 0 p STARTED 2 24.8kb 10.202.245.2 r5-9-35 + * + * the test builds up a clusterstate from the cat input and optionally runs a full balance on it. + * This can be used to debug cluster allocation decisions. + */ +@Ignore +public abstract class CatAllocationTestBase extends ElasticsearchAllocationTestCase { + + protected abstract Path getCatPath() throws IOException; + + @Test + public void run() throws IOException { + Set nodes = new HashSet<>(); + Map indices = new HashMap<>(); + try (BufferedReader reader = Files.newBufferedReader(getCatPath(), Charsets.UTF_8)) { + String line = null; + // regexp FTW + Pattern pattern = Pattern.compile("^(.+)\\s+(\\d)\\s+([rp])\\s+(STARTED|RELOCATING|INITIALIZING|UNASSIGNED)\\s+\\d+\\s+[0-9.a-z]+\\s+(\\d+\\.\\d+\\.\\d+\\.\\d+).*$"); + while((line = reader.readLine()) != null) { + final Matcher matcher; + if ((matcher = pattern.matcher(line)).matches()) { + final String index = matcher.group(1); + Idx idx = indices.get(index); + if (idx == null) { + idx = new Idx(index); + indices.put(index, idx); + } + final int shard = Integer.parseInt(matcher.group(2)); + final boolean primary = matcher.group(3).equals("p"); + ShardRoutingState state = ShardRoutingState.valueOf(matcher.group(4)); + String ip = matcher.group(5); + nodes.add(ip); + MutableShardRouting routing = new MutableShardRouting(index, shard, ip, primary, state, 1); + idx.add(routing); + logger.debug("Add routing {}", routing); + } else { + fail("can't read line: " + line); + } + } + + } + + logger.info("Building initial routing table"); + MetaData.Builder builder = MetaData.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + for(Idx idx : indices.values()) { + IndexMetaData idxMeta = IndexMetaData.builder(idx.name).settings(settings(Version.CURRENT)).numberOfShards(idx.numShards()).numberOfReplicas(idx.numReplicas()).build(); + builder.put(idxMeta, false); + IndexRoutingTable.Builder tableBuilder = new IndexRoutingTable.Builder(idx.name).initializeAsRecovery(idxMeta); + Map shardIdToRouting = new HashMap<>(); + for (MutableShardRouting r : idx.routing) { + IndexShardRoutingTable refData = new IndexShardRoutingTable.Builder(new ShardId(idx.name, r.id()), true).addShard(r).build(); + if (shardIdToRouting.containsKey(r.getId())) { + refData = new IndexShardRoutingTable.Builder(shardIdToRouting.get(r.getId())).addShard(r).build(); + } + shardIdToRouting.put(r.getId(), refData); + + } + for (IndexShardRoutingTable t: shardIdToRouting.values()) { + tableBuilder.addIndexShard(t); + } + IndexRoutingTable table = tableBuilder.build(); + routingTableBuilder.add(table); + } + MetaData metaData = builder.build(); + + RoutingTable routingTable = routingTableBuilder.build(); + DiscoveryNodes.Builder builderDiscoNodes = DiscoveryNodes.builder(); + for (String node : nodes) { + builderDiscoNodes.put(newNode(node)); + } + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).nodes(builderDiscoNodes.build()).build(); + if (balanceFirst()) { + clusterState = rebalance(clusterState); + } + clusterState = allocateNew(clusterState); + } + + protected abstract ClusterState allocateNew(ClusterState clusterState); + + protected boolean balanceFirst() { + return true; + } + + private ClusterState rebalance(ClusterState clusterState) { + RoutingTable routingTable;AllocationService strategy = createAllocationService(settingsBuilder() + .build()); + RoutingAllocation.Result reroute = strategy.reroute(clusterState); + routingTable = reroute.routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + routingTable = clusterState.routingTable(); + int numRelocations = 0; + while (true) { + List initializing = routingTable.shardsWithState(INITIALIZING); + if (initializing.isEmpty()) { + break; + } + logger.debug(initializing.toString()); + numRelocations += initializing.size(); + routingTable = strategy.applyStartedShards(clusterState, initializing).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + } + logger.debug("--> num relocations to get balance: " + numRelocations); + return clusterState; + } + + + + public class Idx { + final String name; + final List routing = new ArrayList<>(); + + public Idx(String name) { + this.name = name; + } + + + public void add(MutableShardRouting r) { + routing.add(r); + } + + public int numReplicas() { + int count = 0; + for (MutableShardRouting msr : routing) { + if (msr.primary() == false && msr.id()==0) { + count++; + } + } + return count; + } + + public int numShards() { + int max = 0; + for (MutableShardRouting msr : routing) { + if (msr.primary()) { + max = Math.max(msr.getId()+1, max); + } + } + return max; + } + } +} diff --git a/src/test/resources/org/elasticsearch/cluster/routing/issue_9023.zip b/src/test/resources/org/elasticsearch/cluster/routing/issue_9023.zip new file mode 100644 index 00000000000..951573d5f25 Binary files /dev/null and b/src/test/resources/org/elasticsearch/cluster/routing/issue_9023.zip differ