[ALLOCATION] Weight deltas must be absolute deltas

In some situations the shard balanceing weight delta becomes negative. Yet,
a negative delta is always treated as `well balanced` which is wrong. I wasn't
able to reproduce the issue in any way other than useing the real world data
from issue #9023. This commit adds a fix for absolute deltas as well as a base
test class that allows to build tests or simulations from the cat API output.

Closes #9023
This commit is contained in:
Simon Willnauer 2015-01-06 00:22:51 +01:00
parent ca68136628
commit 4900f52619
8 changed files with 329 additions and 53 deletions

View File

@ -78,14 +78,14 @@ due to forced awareness or allocation filtering.
`cluster.routing.allocation.balance.index`:: `cluster.routing.allocation.balance.index`::
Defines a factor to the number of shards per index allocated Defines a factor to the number of shards per index allocated
on a specific node (float). Defaults to `0.5f`. Raising this raises the on a specific node (float). Defaults to `0.55f`. Raising this raises the
tendency to equalize the number of shards per index across all nodes in tendency to equalize the number of shards per index across all nodes in
the cluster. the cluster.
`cluster.routing.allocation.balance.primary`:: `cluster.routing.allocation.balance.primary`::
Defines a weight factor for the number of primaries of a specific index Defines a weight factor for the number of primaries of a specific index
allocated on a node (float). `0.05f`. Raising this raises the tendency allocated on a node (float). `0.00f`. Raising this raises the tendency
to equalize the number of primary shards across all nodes in the cluster. to equalize the number of primary shards across all nodes in the cluster. deprecated[1.3.8]
`cluster.routing.allocation.balance.threshold`:: `cluster.routing.allocation.balance.threshold`::
Minimal optimization value of operations that should be performed (non Minimal optimization value of operations that should be performed (non

View File

@ -259,13 +259,13 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
/** /**
* Returns a {@link List} of shards that match one of the states listed in {@link ShardRoutingState states} * Returns a {@link List} of shards that match one of the states listed in {@link ShardRoutingState states}
* *
* @param states a set of {@link ShardRoutingState states} * @param state {@link ShardRoutingState} to retrieve
* @return a {@link List} of shards that match one of the given {@link ShardRoutingState states} * @return a {@link List} of shards that match one of the given {@link ShardRoutingState states}
*/ */
public List<ShardRouting> shardsWithState(ShardRoutingState... states) { public List<ShardRouting> shardsWithState(ShardRoutingState state) {
List<ShardRouting> shards = newArrayList(); List<ShardRouting> shards = newArrayList();
for (IndexShardRoutingTable shardRoutingTable : this) { for (IndexShardRoutingTable shardRoutingTable : this) {
shards.addAll(shardRoutingTable.shardsWithState(states)); shards.addAll(shardRoutingTable.shardsWithState(state));
} }
return shards; return shards;
} }

View File

@ -476,13 +476,14 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return shards; return shards;
} }
public List<ShardRouting> shardsWithState(ShardRoutingState... states) { public List<ShardRouting> shardsWithState(ShardRoutingState state) {
if (state == ShardRoutingState.INITIALIZING) {
return allInitializingShards;
}
List<ShardRouting> shards = newArrayList(); List<ShardRouting> shards = newArrayList();
for (ShardRouting shardEntry : this) { for (ShardRouting shardEntry : this) {
for (ShardRoutingState state : states) { if (shardEntry.state() == state) {
if (shardEntry.state() == state) { shards.add(shardEntry);
shards.add(shardEntry);
}
} }
} }
return shards; return shards;

View File

@ -108,10 +108,10 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
return validation; return validation;
} }
public List<ShardRouting> shardsWithState(ShardRoutingState... states) { public List<ShardRouting> shardsWithState(ShardRoutingState state) {
List<ShardRouting> shards = newArrayList(); List<ShardRouting> shards = newArrayList();
for (IndexRoutingTable indexRoutingTable : this) { for (IndexRoutingTable indexRoutingTable : this) {
shards.addAll(indexRoutingTable.shardsWithState(states)); shards.addAll(indexRoutingTable.shardsWithState(state));
} }
return shards; return shards;
} }

View File

@ -71,9 +71,18 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
public static final String SETTING_SHARD_BALANCE_FACTOR = "cluster.routing.allocation.balance.shard"; public static final String SETTING_SHARD_BALANCE_FACTOR = "cluster.routing.allocation.balance.shard";
public static final String SETTING_PRIMARY_BALANCE_FACTOR = "cluster.routing.allocation.balance.primary"; public static final String SETTING_PRIMARY_BALANCE_FACTOR = "cluster.routing.allocation.balance.primary";
private static final float DEFAULT_INDEX_BALANCE_FACTOR = 0.5f; private static final float DEFAULT_INDEX_BALANCE_FACTOR = 0.55f;
private static final float DEFAULT_SHARD_BALANCE_FACTOR = 0.45f; private static final float DEFAULT_SHARD_BALANCE_FACTOR = 0.45f;
private static final float DEFAULT_PRIMARY_BALANCE_FACTOR = 0.05f; /**
* The primary balance factor was introduces as a tie-breaker to make the initial allocation
* more deterministic. Yet other mechanism have been added ensure that the algorithm is more deterministic such that this
* setting is not needed anymore. Additionally, this setting was abused to balance shards based on their primary flag which can lead
* to unexpected behavior when allocating or balancing the shards.
*
* @deprecated the threshold primary balance factor is deprecated and should not be used.
*/
@Deprecated
private static final float DEFAULT_PRIMARY_BALANCE_FACTOR = 0.0f;
class ApplySettings implements NodeSettingsService.Listener { class ApplySettings implements NodeSettingsService.Listener {
@Override @Override
@ -191,44 +200,23 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
private final float indexBalance; private final float indexBalance;
private final float shardBalance; private final float shardBalance;
private final float primaryBalance; private final float primaryBalance;
private final EnumMap<Operation, float[]> thetaMap = new EnumMap<>(Operation.class); private final float[] theta;
public WeightFunction(float indexBalance, float shardBalance, float primaryBalance) { public WeightFunction(float indexBalance, float shardBalance, float primaryBalance) {
float sum = indexBalance + shardBalance + primaryBalance; float sum = indexBalance + shardBalance + primaryBalance;
if (sum <= 0.0f) { if (sum <= 0.0f) {
throw new ElasticsearchIllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); throw new ElasticsearchIllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
} }
final float[] defaultTheta = new float[]{shardBalance / sum, indexBalance / sum, primaryBalance / sum}; theta = new float[]{shardBalance / sum, indexBalance / sum, primaryBalance / sum};
for (Operation operation : Operation.values()) {
switch (operation) {
case THRESHOLD_CHECK:
sum = indexBalance + shardBalance;
if (sum <= 0.0f) {
thetaMap.put(operation, defaultTheta);
} else {
thetaMap.put(operation, new float[]{shardBalance / sum, indexBalance / sum, 0});
}
break;
case BALANCE:
case ALLOCATE:
case MOVE:
thetaMap.put(operation, defaultTheta);
break;
default:
assert false;
}
}
this.indexBalance = indexBalance; this.indexBalance = indexBalance;
this.shardBalance = shardBalance; this.shardBalance = shardBalance;
this.primaryBalance = primaryBalance; this.primaryBalance = primaryBalance;
} }
public float weight(Operation operation, Balancer balancer, ModelNode node, String index) { public float weight(Operation operation, Balancer balancer, ModelNode node, String index) {
final float weightShard = (node.numShards() - balancer.avgShardsPerNode()); final float weightShard = node.numShards() - balancer.avgShardsPerNode();
final float weightIndex = (node.numShards(index) - balancer.avgShardsPerNode(index)); final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
final float weightPrimary = (node.numPrimaries() - balancer.avgPrimariesPerNode()); final float weightPrimary = node.numPrimaries() - balancer.avgPrimariesPerNode();
final float[] theta = thetaMap.get(operation);
assert theta != null;
return theta[0] * weightShard + theta[1] * weightIndex + theta[2] * weightPrimary; return theta[0] * weightShard + theta[1] * weightIndex + theta[2] * weightPrimary;
} }
@ -250,13 +238,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
/** /**
* Provided during move operation. * Provided during move operation.
*/ */
MOVE, MOVE
/**
* Provided when the weight delta is checked against the configured threshold.
* This can be used to ignore tie-breaking weight factors that should not
* solely trigger a relocation unless the delta is above the threshold.
*/
THRESHOLD_CHECK
} }
/** /**
@ -348,11 +330,16 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
return allocateUnassigned(unassigned, routing.ignoredUnassigned()); return allocateUnassigned(unassigned, routing.ignoredUnassigned());
} }
private static float absDelta(float lower, float higher) {
assert higher >= lower : higher + " lt " + lower +" but was expected to be gte";
return Math.abs(higher - lower);
}
private static boolean lessThan(float delta, float threshold) { private static boolean lessThan(float delta, float threshold) {
/* deltas close to the threshold are "rounded" to the threshold manually /* deltas close to the threshold are "rounded" to the threshold manually
to prevent floating point problems if the delta is very close to the to prevent floating point problems if the delta is very close to the
threshold ie. 1.000000002 which can trigger unnecessary balance actions*/ threshold ie. 1.000000002 which can trigger unnecessary balance actions*/
return delta <= threshold + 0.001f; return delta <= (threshold + 0.001f);
} }
/** /**
@ -393,11 +380,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
final ModelNode maxNode = modelNodes[highIdx]; final ModelNode maxNode = modelNodes[highIdx];
advance_range: advance_range:
if (maxNode.numShards(index) > 0) { if (maxNode.numShards(index) > 0) {
float delta = weights[highIdx] - weights[lowIdx]; float delta = absDelta(weights[lowIdx], weights[highIdx]);
delta = lessThan(delta, threshold) ? delta : sorter.weight(Operation.THRESHOLD_CHECK, maxNode) - sorter.weight(Operation.THRESHOLD_CHECK, minNode);
if (lessThan(delta, threshold)) { if (lessThan(delta, threshold)) {
if (lowIdx > 0 && highIdx-1 > 0 // is there a chance for a higher delta? if (lowIdx > 0 && highIdx-1 > 0 // is there a chance for a higher delta?
&& (weights[highIdx-1] - weights[0] > threshold) // check if we need to break at all && (absDelta(weights[0], weights[highIdx-1]) > threshold) // check if we need to break at all
) { ) {
/* This is a special case if allocations from the "heaviest" to the "lighter" nodes is not possible /* This is a special case if allocations from the "heaviest" to the "lighter" nodes is not possible
* due to some allocation decider restrictions like zone awareness. if one zone has for instance * due to some allocation decider restrictions like zone awareness. if one zone has for instance
@ -747,7 +733,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
final RoutingNode node = routingNodes.node(minNode.getNodeId()); final RoutingNode node = routingNodes.node(minNode.getNodeId());
if (deciders.canAllocate(node, allocation).type() != Type.YES) { if (deciders.canAllocate(node, allocation).type() != Type.YES) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Can not allocate on node [{}] remove from round decisin [{}]", node, decision.type()); logger.trace("Can not allocate on node [{}] remove from round decision [{}]", node, decision.type());
} }
throttledNodes.add(minNode); throttledNodes.add(minNode);
} }

View File

@ -0,0 +1,98 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
/**
* see issue #9023
*/
public class BalanceUnbalancedClusterTest extends CatAllocationTestBase {
@Override
protected Path getCatPath() throws IOException {
Path tmp = newTempDirPath();
try (InputStream stream = Files.newInputStream(getResourcePath("/org/elasticsearch/cluster/routing/issue_9023.zip"))) {
TestUtil.unzip(stream, tmp);
}
return tmp.resolve("issue_9023");
}
@Override
protected ClusterState allocateNew(ClusterState state) {
String index = "tweets-2014-12-29:00";
AllocationService strategy = createAllocationService(settingsBuilder()
.build());
MetaData metaData = MetaData.builder(state.metaData())
.put(IndexMetaData.builder(index).settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(1))
.build();
RoutingTable routingTable = RoutingTable.builder(state.routingTable())
.addAsNew(metaData.index(index))
.build();
ClusterState clusterState = ClusterState.builder(state).metaData(metaData).routingTable(routingTable).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
while (true) {
if (routingTable.shardsWithState(INITIALIZING).isEmpty()) {
break;
}
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
}
Map<String, Integer> counts = new HashMap<>();
for (IndexShardRoutingTable table : routingTable.index(index)) {
for (ShardRouting r : table) {
String s = r.currentNodeId();
Integer count = counts.get(s);
if (count == null) {
count = 0;
}
count++;
counts.put(s, count);
}
}
for (Map.Entry<String, Integer> count : counts.entrySet()) {
// we have 10 shards and 4 nodes so 2 nodes have 3 shards and 2 nodes have 2 shards
assertTrue("Node: " + count.getKey() + " has shard mismatch: " + count.getValue(), count.getValue() >= 2);
assertTrue("Node: " + count.getKey() + " has shard mismatch: " + count.getValue(), count.getValue() <= 3);
}
return clusterState;
}
}

View File

@ -0,0 +1,191 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import com.google.common.base.Charsets;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.internal.InternalEngine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.junit.Ignore;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
/**
* A base testscase that allows to run tests based on the output of the CAT API
* The input is a line based cat/shards output like:
* kibana-int 0 p STARTED 2 24.8kb 10.202.245.2 r5-9-35
*
* the test builds up a clusterstate from the cat input and optionally runs a full balance on it.
* This can be used to debug cluster allocation decisions.
*/
@Ignore
public abstract class CatAllocationTestBase extends ElasticsearchAllocationTestCase {
protected abstract Path getCatPath() throws IOException;
@Test
public void run() throws IOException {
Set<String> nodes = new HashSet<>();
Map<String, Idx> indices = new HashMap<>();
try (BufferedReader reader = Files.newBufferedReader(getCatPath(), Charsets.UTF_8)) {
String line = null;
// regexp FTW
Pattern pattern = Pattern.compile("^(.+)\\s+(\\d)\\s+([rp])\\s+(STARTED|RELOCATING|INITIALIZING|UNASSIGNED)\\s+\\d+\\s+[0-9.a-z]+\\s+(\\d+\\.\\d+\\.\\d+\\.\\d+).*$");
while((line = reader.readLine()) != null) {
final Matcher matcher;
if ((matcher = pattern.matcher(line)).matches()) {
final String index = matcher.group(1);
Idx idx = indices.get(index);
if (idx == null) {
idx = new Idx(index);
indices.put(index, idx);
}
final int shard = Integer.parseInt(matcher.group(2));
final boolean primary = matcher.group(3).equals("p");
ShardRoutingState state = ShardRoutingState.valueOf(matcher.group(4));
String ip = matcher.group(5);
nodes.add(ip);
MutableShardRouting routing = new MutableShardRouting(index, shard, ip, primary, state, 1);
idx.add(routing);
logger.debug("Add routing {}", routing);
} else {
fail("can't read line: " + line);
}
}
}
logger.info("Building initial routing table");
MetaData.Builder builder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
for(Idx idx : indices.values()) {
IndexMetaData idxMeta = IndexMetaData.builder(idx.name).settings(settings(Version.CURRENT)).numberOfShards(idx.numShards()).numberOfReplicas(idx.numReplicas()).build();
builder.put(idxMeta, false);
IndexRoutingTable.Builder tableBuilder = new IndexRoutingTable.Builder(idx.name).initializeAsRecovery(idxMeta);
Map<Integer, IndexShardRoutingTable> shardIdToRouting = new HashMap<>();
for (MutableShardRouting r : idx.routing) {
IndexShardRoutingTable refData = new IndexShardRoutingTable.Builder(new ShardId(idx.name, r.id()), true).addShard(r).build();
if (shardIdToRouting.containsKey(r.getId())) {
refData = new IndexShardRoutingTable.Builder(shardIdToRouting.get(r.getId())).addShard(r).build();
}
shardIdToRouting.put(r.getId(), refData);
}
for (IndexShardRoutingTable t: shardIdToRouting.values()) {
tableBuilder.addIndexShard(t);
}
IndexRoutingTable table = tableBuilder.build();
routingTableBuilder.add(table);
}
MetaData metaData = builder.build();
RoutingTable routingTable = routingTableBuilder.build();
DiscoveryNodes.Builder builderDiscoNodes = DiscoveryNodes.builder();
for (String node : nodes) {
builderDiscoNodes.put(newNode(node));
}
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).nodes(builderDiscoNodes.build()).build();
if (balanceFirst()) {
clusterState = rebalance(clusterState);
}
clusterState = allocateNew(clusterState);
}
protected abstract ClusterState allocateNew(ClusterState clusterState);
protected boolean balanceFirst() {
return true;
}
private ClusterState rebalance(ClusterState clusterState) {
RoutingTable routingTable;AllocationService strategy = createAllocationService(settingsBuilder()
.build());
RoutingAllocation.Result reroute = strategy.reroute(clusterState);
routingTable = reroute.routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingTable = clusterState.routingTable();
int numRelocations = 0;
while (true) {
List<ShardRouting> initializing = routingTable.shardsWithState(INITIALIZING);
if (initializing.isEmpty()) {
break;
}
logger.debug(initializing.toString());
numRelocations += initializing.size();
routingTable = strategy.applyStartedShards(clusterState, initializing).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
}
logger.debug("--> num relocations to get balance: " + numRelocations);
return clusterState;
}
public class Idx {
final String name;
final List<MutableShardRouting> routing = new ArrayList<>();
public Idx(String name) {
this.name = name;
}
public void add(MutableShardRouting r) {
routing.add(r);
}
public int numReplicas() {
int count = 0;
for (MutableShardRouting msr : routing) {
if (msr.primary() == false && msr.id()==0) {
count++;
}
}
return count;
}
public int numShards() {
int max = 0;
for (MutableShardRouting msr : routing) {
if (msr.primary()) {
max = Math.max(msr.getId()+1, max);
}
}
return max;
}
}
}