Merge pull request #17028 from ywelsch/enhance/simplify-balancer
Simplify shard balancer interface
This commit is contained in:
commit
32bdb3d617
|
@ -597,6 +597,13 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of routing nodes
|
||||||
|
*/
|
||||||
|
public int size() {
|
||||||
|
return nodesToShards.size();
|
||||||
|
}
|
||||||
|
|
||||||
public static final class UnassignedShards implements Iterable<ShardRouting> {
|
public static final class UnassignedShards implements Iterable<ShardRouting> {
|
||||||
|
|
||||||
private final RoutingNodes nodes;
|
private final RoutingNodes nodes;
|
||||||
|
|
|
@ -20,7 +20,6 @@
|
||||||
package org.elasticsearch.cluster.routing.allocation;
|
package org.elasticsearch.cluster.routing.allocation;
|
||||||
|
|
||||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||||
import org.apache.lucene.util.ArrayUtil;
|
|
||||||
import org.elasticsearch.cluster.ClusterInfoService;
|
import org.elasticsearch.cluster.ClusterInfoService;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||||
|
@ -36,13 +35,13 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||||
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
|
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
|
||||||
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
|
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.gateway.GatewayAllocator;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -63,14 +62,17 @@ import java.util.stream.Collectors;
|
||||||
public class AllocationService extends AbstractComponent {
|
public class AllocationService extends AbstractComponent {
|
||||||
|
|
||||||
private final AllocationDeciders allocationDeciders;
|
private final AllocationDeciders allocationDeciders;
|
||||||
|
private final GatewayAllocator gatewayAllocator;
|
||||||
|
private final ShardsAllocator shardsAllocator;
|
||||||
private final ClusterInfoService clusterInfoService;
|
private final ClusterInfoService clusterInfoService;
|
||||||
private final ShardsAllocators shardsAllocators;
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public AllocationService(Settings settings, AllocationDeciders allocationDeciders, ShardsAllocators shardsAllocators, ClusterInfoService clusterInfoService) {
|
public AllocationService(Settings settings, AllocationDeciders allocationDeciders, GatewayAllocator gatewayAllocator,
|
||||||
|
ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.allocationDeciders = allocationDeciders;
|
this.allocationDeciders = allocationDeciders;
|
||||||
this.shardsAllocators = shardsAllocators;
|
this.gatewayAllocator = gatewayAllocator;
|
||||||
|
this.shardsAllocator = shardsAllocator;
|
||||||
this.clusterInfoService = clusterInfoService;
|
this.clusterInfoService = clusterInfoService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,7 +94,7 @@ public class AllocationService extends AbstractComponent {
|
||||||
if (!changed) {
|
if (!changed) {
|
||||||
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
|
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
|
||||||
}
|
}
|
||||||
shardsAllocators.applyStartedShards(allocation);
|
gatewayAllocator.applyStartedShards(allocation);
|
||||||
if (withReroute) {
|
if (withReroute) {
|
||||||
reroute(allocation);
|
reroute(allocation);
|
||||||
}
|
}
|
||||||
|
@ -192,7 +194,7 @@ public class AllocationService extends AbstractComponent {
|
||||||
if (!changed) {
|
if (!changed) {
|
||||||
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
|
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
|
||||||
}
|
}
|
||||||
shardsAllocators.applyFailedShards(allocation);
|
gatewayAllocator.applyFailedShards(allocation);
|
||||||
reroute(allocation);
|
reroute(allocation);
|
||||||
final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
|
final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
|
||||||
String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.shard.shardId().toString());
|
String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.shard.shardId().toString());
|
||||||
|
@ -306,14 +308,10 @@ public class AllocationService extends AbstractComponent {
|
||||||
if (allocation.routingNodes().unassigned().size() > 0) {
|
if (allocation.routingNodes().unassigned().size() > 0) {
|
||||||
updateLeftDelayOfUnassignedShards(allocation, settings);
|
updateLeftDelayOfUnassignedShards(allocation, settings);
|
||||||
|
|
||||||
changed |= shardsAllocators.allocateUnassigned(allocation);
|
changed |= gatewayAllocator.allocateUnassigned(allocation);
|
||||||
}
|
}
|
||||||
|
|
||||||
// move shards that no longer can be allocated
|
changed |= shardsAllocator.allocate(allocation);
|
||||||
changed |= shardsAllocators.moveShards(allocation);
|
|
||||||
|
|
||||||
// rebalance
|
|
||||||
changed |= shardsAllocators.rebalance(allocation);
|
|
||||||
assert RoutingNodes.assertShardStats(allocation.routingNodes());
|
assert RoutingNodes.assertShardStats(allocation.routingNodes());
|
||||||
return changed;
|
return changed;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
|
|
||||||
package org.elasticsearch.cluster.routing.allocation.allocator;
|
package org.elasticsearch.cluster.routing.allocation.allocator;
|
||||||
|
|
||||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
|
||||||
import org.apache.lucene.util.ArrayUtil;
|
import org.apache.lucene.util.ArrayUtil;
|
||||||
import org.apache.lucene.util.IntroSorter;
|
import org.apache.lucene.util.IntroSorter;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
@ -28,9 +27,7 @@ import org.elasticsearch.cluster.routing.RoutingNode;
|
||||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||||
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
|
||||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||||
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
|
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
|
||||||
|
@ -42,18 +39,14 @@ import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.gateway.PriorityComparator;
|
import org.elasticsearch.gateway.PriorityComparator;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.IdentityHashMap;
|
import java.util.IdentityHashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.function.Predicate;
|
|
||||||
|
|
||||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
|
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
|
||||||
|
|
||||||
|
@ -103,27 +96,16 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void applyStartedShards(StartedRerouteAllocation allocation) { /* ONLY FOR GATEWAYS */ }
|
public boolean allocate(RoutingAllocation allocation) {
|
||||||
|
if (allocation.routingNodes().size() == 0) {
|
||||||
@Override
|
/* with no nodes this is pointless */
|
||||||
public void applyFailedShards(FailedRerouteAllocation allocation) { /* ONLY FOR GATEWAYS */ }
|
return false;
|
||||||
|
}
|
||||||
@Override
|
|
||||||
public boolean allocateUnassigned(RoutingAllocation allocation) {
|
|
||||||
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
|
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
|
||||||
return balancer.allocateUnassigned();
|
boolean changed = balancer.allocateUnassigned();
|
||||||
}
|
changed |= balancer.moveShards();
|
||||||
|
changed |= balancer.balance();
|
||||||
@Override
|
return changed;
|
||||||
public boolean rebalance(RoutingAllocation allocation) {
|
|
||||||
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
|
|
||||||
return balancer.balance();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean moveShards(RoutingAllocation allocation) {
|
|
||||||
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
|
|
||||||
return balancer.moveShards();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -203,8 +185,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
}
|
}
|
||||||
|
|
||||||
private float weight(Balancer balancer, ModelNode node, String index, int numAdditionalShards) {
|
private float weight(Balancer balancer, ModelNode node, String index, int numAdditionalShards) {
|
||||||
final float weightShard = (node.numShards() + numAdditionalShards - balancer.avgShardsPerNode());
|
final float weightShard = node.numShards() + numAdditionalShards - balancer.avgShardsPerNode();
|
||||||
final float weightIndex = (node.numShards(index) + numAdditionalShards - balancer.avgShardsPerNode(index));
|
final float weightIndex = node.numShards(index) + numAdditionalShards - balancer.avgShardsPerNode(index);
|
||||||
return theta0 * weightShard + theta1 * weightIndex;
|
return theta0 * weightShard + theta1 * weightIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -216,7 +198,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
public static class Balancer {
|
public static class Balancer {
|
||||||
private final ESLogger logger;
|
private final ESLogger logger;
|
||||||
private final Map<String, ModelNode> nodes = new HashMap<>();
|
private final Map<String, ModelNode> nodes = new HashMap<>();
|
||||||
private final HashSet<String> indices = new HashSet<>();
|
|
||||||
private final RoutingAllocation allocation;
|
private final RoutingAllocation allocation;
|
||||||
private final RoutingNodes routingNodes;
|
private final RoutingNodes routingNodes;
|
||||||
private final WeightFunction weight;
|
private final WeightFunction weight;
|
||||||
|
@ -225,19 +206,15 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
private final MetaData metaData;
|
private final MetaData metaData;
|
||||||
private final float avgShardsPerNode;
|
private final float avgShardsPerNode;
|
||||||
|
|
||||||
private final Predicate<ShardRouting> assignedFilter = shard -> shard.assignedToNode();
|
|
||||||
|
|
||||||
public Balancer(ESLogger logger, RoutingAllocation allocation, WeightFunction weight, float threshold) {
|
public Balancer(ESLogger logger, RoutingAllocation allocation, WeightFunction weight, float threshold) {
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
this.allocation = allocation;
|
this.allocation = allocation;
|
||||||
this.weight = weight;
|
this.weight = weight;
|
||||||
this.threshold = threshold;
|
this.threshold = threshold;
|
||||||
this.routingNodes = allocation.routingNodes();
|
this.routingNodes = allocation.routingNodes();
|
||||||
for (RoutingNode node : routingNodes) {
|
|
||||||
nodes.put(node.nodeId(), new ModelNode(node.nodeId()));
|
|
||||||
}
|
|
||||||
metaData = routingNodes.metaData();
|
metaData = routingNodes.metaData();
|
||||||
avgShardsPerNode = ((float) metaData.totalNumberOfShards()) / nodes.size();
|
avgShardsPerNode = ((float) metaData.totalNumberOfShards()) / routingNodes.size();
|
||||||
|
buildModelFromAssigned();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -271,17 +248,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
return new NodeSorter(nodesArray(), weight, this);
|
return new NodeSorter(nodesArray(), weight, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean initialize(RoutingNodes routing, RoutingNodes.UnassignedShards unassigned) {
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("Start distributing Shards");
|
|
||||||
}
|
|
||||||
for (ObjectCursor<String> index : allocation.routingTable().indicesRouting().keys()) {
|
|
||||||
indices.add(index.value);
|
|
||||||
}
|
|
||||||
buildModelFromAssigned(routing.shards(assignedFilter));
|
|
||||||
return allocateUnassigned(unassigned);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static float absDelta(float lower, float higher) {
|
private static float absDelta(float lower, float higher) {
|
||||||
assert higher >= lower : higher + " lt " + lower +" but was expected to be gte";
|
assert higher >= lower : higher + " lt " + lower +" but was expected to be gte";
|
||||||
return Math.abs(higher - lower);
|
return Math.abs(higher - lower);
|
||||||
|
@ -295,12 +261,36 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allocates all possible unassigned shards
|
* Balances the nodes on the cluster model according to the weight function.
|
||||||
|
* The actual balancing is delegated to {@link #balanceByWeights()}
|
||||||
|
*
|
||||||
* @return <code>true</code> if the current configuration has been
|
* @return <code>true</code> if the current configuration has been
|
||||||
* changed, otherwise <code>false</code>
|
* changed, otherwise <code>false</code>
|
||||||
*/
|
*/
|
||||||
final boolean allocateUnassigned() {
|
private boolean balance() {
|
||||||
return balance(true);
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("Start balancing cluster");
|
||||||
|
}
|
||||||
|
if (allocation.hasPendingAsyncFetch()) {
|
||||||
|
/*
|
||||||
|
* see https://github.com/elastic/elasticsearch/issues/14387
|
||||||
|
* if we allow rebalance operations while we are still fetching shard store data
|
||||||
|
* we might end up with unnecessary rebalance operations which can be super confusion/frustrating
|
||||||
|
* since once the fetches come back we might just move all the shards back again.
|
||||||
|
* Therefore we only do a rebalance if we have fetched all information.
|
||||||
|
*/
|
||||||
|
logger.debug("skipping rebalance due to in-flight shard/store fetches");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (allocation.deciders().canRebalance(allocation).type() != Type.YES) {
|
||||||
|
logger.trace("skipping rebalance as it is disabled");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (nodes.size() < 2) { /* skip if we only have one node */
|
||||||
|
logger.trace("skipping rebalance as single node only");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return balanceByWeights();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -317,120 +307,100 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
* @return <code>true</code> if the current configuration has been
|
* @return <code>true</code> if the current configuration has been
|
||||||
* changed, otherwise <code>false</code>
|
* changed, otherwise <code>false</code>
|
||||||
*/
|
*/
|
||||||
public boolean balance() {
|
private boolean balanceByWeights() {
|
||||||
return balance(false);
|
boolean changed = false;
|
||||||
}
|
final NodeSorter sorter = newNodeSorter();
|
||||||
|
final AllocationDeciders deciders = allocation.deciders();
|
||||||
|
final ModelNode[] modelNodes = sorter.modelNodes;
|
||||||
|
final float[] weights = sorter.weights;
|
||||||
|
for (String index : buildWeightOrderedIndices(sorter)) {
|
||||||
|
IndexMetaData indexMetaData = metaData.index(index);
|
||||||
|
|
||||||
private boolean balance(boolean onlyAssign) {
|
// find nodes that have a shard of this index or where shards of this index are allowed to stay
|
||||||
if (this.nodes.isEmpty()) {
|
// move these nodes to the front of modelNodes so that we can only balance based on these nodes
|
||||||
/* with no nodes this is pointless */
|
int relevantNodes = 0;
|
||||||
return false;
|
for (int i = 0; i < modelNodes.length; i++) {
|
||||||
}
|
ModelNode modelNode = modelNodes[i];
|
||||||
if (logger.isTraceEnabled()) {
|
if (modelNode.getIndex(index) != null
|
||||||
if (onlyAssign) {
|
|| deciders.canAllocate(indexMetaData, modelNode.getRoutingNode(), allocation).type() != Type.NO) {
|
||||||
logger.trace("Start balancing cluster");
|
// swap nodes at position i and relevantNodes
|
||||||
} else {
|
modelNodes[i] = modelNodes[relevantNodes];
|
||||||
logger.trace("Start assigning unassigned shards");
|
modelNodes[relevantNodes] = modelNode;
|
||||||
|
relevantNodes++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
|
|
||||||
boolean changed = initialize(routingNodes, unassigned);
|
|
||||||
if (onlyAssign == false && changed == false && allocation.deciders().canRebalance(allocation).type() == Type.YES) {
|
|
||||||
NodeSorter sorter = newNodeSorter();
|
|
||||||
if (nodes.size() > 1) { /* skip if we only have one node */
|
|
||||||
AllocationDeciders deciders = allocation.deciders();
|
|
||||||
final ModelNode[] modelNodes = sorter.modelNodes;
|
|
||||||
final float[] weights = sorter.weights;
|
|
||||||
for (String index : buildWeightOrderedIndices(sorter)) {
|
|
||||||
IndexMetaData indexMetaData = metaData.index(index);
|
|
||||||
|
|
||||||
// find nodes that have a shard of this index or where shards of this index are allowed to stay
|
if (relevantNodes < 2) {
|
||||||
// move these nodes to the front of modelNodes so that we can only balance based on these nodes
|
continue;
|
||||||
int relevantNodes = 0;
|
}
|
||||||
for (int i = 0; i < modelNodes.length; i++) {
|
|
||||||
ModelNode modelNode = modelNodes[i];
|
sorter.reset(index, 0, relevantNodes);
|
||||||
if (modelNode.getIndex(index) != null
|
int lowIdx = 0;
|
||||||
|| deciders.canAllocate(indexMetaData, modelNode.getRoutingNode(routingNodes), allocation).type() != Type.NO) {
|
int highIdx = relevantNodes - 1;
|
||||||
// swap nodes at position i and relevantNodes
|
while (true) {
|
||||||
modelNodes[i] = modelNodes[relevantNodes];
|
final ModelNode minNode = modelNodes[lowIdx];
|
||||||
modelNodes[relevantNodes] = modelNode;
|
final ModelNode maxNode = modelNodes[highIdx];
|
||||||
relevantNodes++;
|
advance_range:
|
||||||
|
if (maxNode.numShards(index) > 0) {
|
||||||
|
final float delta = absDelta(weights[lowIdx], weights[highIdx]);
|
||||||
|
if (lessThan(delta, threshold)) {
|
||||||
|
if (lowIdx > 0 && highIdx-1 > 0 // is there a chance for a higher delta?
|
||||||
|
&& (absDelta(weights[0], weights[highIdx-1]) > threshold) // check if we need to break at all
|
||||||
|
) {
|
||||||
|
/* This is a special case if allocations from the "heaviest" to the "lighter" nodes is not possible
|
||||||
|
* due to some allocation decider restrictions like zone awareness. if one zone has for instance
|
||||||
|
* less nodes than another zone. so one zone is horribly overloaded from a balanced perspective but we
|
||||||
|
* can't move to the "lighter" shards since otherwise the zone would go over capacity.
|
||||||
|
*
|
||||||
|
* This break jumps straight to the condition below were we start moving from the high index towards
|
||||||
|
* the low index to shrink the window we are considering for balance from the other direction.
|
||||||
|
* (check shrinking the window from MAX to MIN)
|
||||||
|
* See #3580
|
||||||
|
*/
|
||||||
|
break advance_range;
|
||||||
}
|
}
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("Stop balancing index [{}] min_node [{}] weight: [{}] max_node [{}] weight: [{}] delta: [{}]",
|
||||||
|
index, maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
|
||||||
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
if (relevantNodes < 2) {
|
logger.trace("Balancing from node [{}] weight: [{}] to node [{}] weight: [{}] delta: [{}]",
|
||||||
|
maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
|
||||||
|
}
|
||||||
|
/* pass the delta to the replication function to prevent relocations that only swap the weights of the two nodes.
|
||||||
|
* a relocation must bring us closer to the balance if we only achieve the same delta the relocation is useless */
|
||||||
|
if (tryRelocateShard(minNode, maxNode, index, delta)) {
|
||||||
|
/*
|
||||||
|
* TODO we could be a bit smarter here, we don't need to fully sort necessarily
|
||||||
|
* we could just find the place to insert linearly but the win might be minor
|
||||||
|
* compared to the added complexity
|
||||||
|
*/
|
||||||
|
weights[lowIdx] = sorter.weight(modelNodes[lowIdx]);
|
||||||
|
weights[highIdx] = sorter.weight(modelNodes[highIdx]);
|
||||||
|
sorter.sort(0, relevantNodes);
|
||||||
|
lowIdx = 0;
|
||||||
|
highIdx = relevantNodes - 1;
|
||||||
|
changed = true;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
sorter.reset(index, 0, relevantNodes);
|
if (lowIdx < highIdx - 1) {
|
||||||
int lowIdx = 0;
|
/* Shrinking the window from MIN to MAX
|
||||||
int highIdx = relevantNodes - 1;
|
* we can't move from any shard from the min node lets move on to the next node
|
||||||
while (true) {
|
* and see if the threshold still holds. We either don't have any shard of this
|
||||||
final ModelNode minNode = modelNodes[lowIdx];
|
* index on this node of allocation deciders prevent any relocation.*/
|
||||||
final ModelNode maxNode = modelNodes[highIdx];
|
lowIdx++;
|
||||||
advance_range:
|
} else if (lowIdx > 0) {
|
||||||
if (maxNode.numShards(index) > 0) {
|
/* Shrinking the window from MAX to MIN
|
||||||
final float delta = absDelta(weights[lowIdx], weights[highIdx]);
|
* now we go max to min since obviously we can't move anything to the max node
|
||||||
if (lessThan(delta, threshold)) {
|
* lets pick the next highest */
|
||||||
if (lowIdx > 0 && highIdx-1 > 0 // is there a chance for a higher delta?
|
lowIdx = 0;
|
||||||
&& (absDelta(weights[0], weights[highIdx-1]) > threshold) // check if we need to break at all
|
highIdx--;
|
||||||
) {
|
} else {
|
||||||
/* This is a special case if allocations from the "heaviest" to the "lighter" nodes is not possible
|
/* we are done here, we either can't relocate anymore or we are balanced */
|
||||||
* due to some allocation decider restrictions like zone awareness. if one zone has for instance
|
break;
|
||||||
* less nodes than another zone. so one zone is horribly overloaded from a balanced perspective but we
|
|
||||||
* can't move to the "lighter" shards since otherwise the zone would go over capacity.
|
|
||||||
*
|
|
||||||
* This break jumps straight to the condition below were we start moving from the high index towards
|
|
||||||
* the low index to shrink the window we are considering for balance from the other direction.
|
|
||||||
* (check shrinking the window from MAX to MIN)
|
|
||||||
* See #3580
|
|
||||||
*/
|
|
||||||
break advance_range;
|
|
||||||
}
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("Stop balancing index [{}] min_node [{}] weight: [{}] max_node [{}] weight: [{}] delta: [{}]",
|
|
||||||
index, maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("Balancing from node [{}] weight: [{}] to node [{}] weight: [{}] delta: [{}]",
|
|
||||||
maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
|
|
||||||
}
|
|
||||||
/* pass the delta to the replication function to prevent relocations that only swap the weights of the two nodes.
|
|
||||||
* a relocation must bring us closer to the balance if we only achieve the same delta the relocation is useless */
|
|
||||||
if (tryRelocateShard(minNode, maxNode, index, delta)) {
|
|
||||||
/*
|
|
||||||
* TODO we could be a bit smarter here, we don't need to fully sort necessarily
|
|
||||||
* we could just find the place to insert linearly but the win might be minor
|
|
||||||
* compared to the added complexity
|
|
||||||
*/
|
|
||||||
weights[lowIdx] = sorter.weight(modelNodes[lowIdx]);
|
|
||||||
weights[highIdx] = sorter.weight(modelNodes[highIdx]);
|
|
||||||
sorter.sort(0, relevantNodes);
|
|
||||||
lowIdx = 0;
|
|
||||||
highIdx = relevantNodes - 1;
|
|
||||||
changed = true;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (lowIdx < highIdx - 1) {
|
|
||||||
/* Shrinking the window from MIN to MAX
|
|
||||||
* we can't move from any shard from the min node lets move on to the next node
|
|
||||||
* and see if the threshold still holds. We either don't have any shard of this
|
|
||||||
* index on this node of allocation deciders prevent any relocation.*/
|
|
||||||
lowIdx++;
|
|
||||||
} else if (lowIdx > 0) {
|
|
||||||
/* Shrinking the window from MAX to MIN
|
|
||||||
* now we go max to min since obviously we can't move anything to the max node
|
|
||||||
* lets pick the next highest */
|
|
||||||
lowIdx = 0;
|
|
||||||
highIdx--;
|
|
||||||
} else {
|
|
||||||
/* we are done here, we either can't relocate anymore or we are balanced */
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -451,7 +421,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
* to the nodes we relocated them from.
|
* to the nodes we relocated them from.
|
||||||
*/
|
*/
|
||||||
private String[] buildWeightOrderedIndices(NodeSorter sorter) {
|
private String[] buildWeightOrderedIndices(NodeSorter sorter) {
|
||||||
final String[] indices = this.indices.toArray(new String[this.indices.size()]);
|
final String[] indices = allocation.routingTable().indicesRouting().keys().toArray(String.class);
|
||||||
final float[] deltas = new float[indices.length];
|
final float[] deltas = new float[indices.length];
|
||||||
for (int i = 0; i < deltas.length; i++) {
|
for (int i = 0; i < deltas.length; i++) {
|
||||||
sorter.reset(indices[i]);
|
sorter.reset(indices[i]);
|
||||||
|
@ -503,20 +473,16 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
|
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
|
||||||
*/
|
*/
|
||||||
public boolean moveShards() {
|
public boolean moveShards() {
|
||||||
if (nodes.isEmpty()) {
|
// Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
|
||||||
/* with no nodes this is pointless */
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a copy of the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
|
|
||||||
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
|
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
|
||||||
// offloading the shards.
|
// offloading the shards.
|
||||||
List<ShardRouting> shards = new ArrayList<>();
|
boolean changed = false;
|
||||||
int index = 0;
|
int index = 0;
|
||||||
boolean found = true;
|
boolean found = true;
|
||||||
|
final NodeSorter sorter = newNodeSorter();
|
||||||
while (found) {
|
while (found) {
|
||||||
found = false;
|
found = false;
|
||||||
for (RoutingNode routingNode : routingNodes) {
|
for (RoutingNode routingNode : allocation.routingNodes()) {
|
||||||
if (index >= routingNode.size()) {
|
if (index >= routingNode.size()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -524,64 +490,52 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
ShardRouting shardRouting = routingNode.get(index);
|
ShardRouting shardRouting = routingNode.get(index);
|
||||||
// we can only move started shards...
|
// we can only move started shards...
|
||||||
if (shardRouting.started()) {
|
if (shardRouting.started()) {
|
||||||
shards.add(shardRouting);
|
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
|
||||||
|
assert sourceNode != null && sourceNode.containsShard(shardRouting);
|
||||||
|
Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
|
||||||
|
if (decision.type() == Decision.Type.NO) {
|
||||||
|
changed |= moveShard(sorter, shardRouting, sourceNode, routingNode);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
index++;
|
index++;
|
||||||
}
|
}
|
||||||
if (shards.isEmpty()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
|
return changed;
|
||||||
boolean changed = initialize(routingNodes, unassigned);
|
}
|
||||||
if (changed == false) {
|
|
||||||
final NodeSorter sorter = newNodeSorter();
|
/**
|
||||||
final ModelNode[] modelNodes = sorter.modelNodes;
|
* Move started shard to the minimal eligible node with respect to the weight function
|
||||||
for (ShardRouting shardRouting : shards) {
|
*
|
||||||
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
|
* @return <code>true</code> if the shard was moved successfully, otherwise <code>false</code>
|
||||||
assert sourceNode != null && sourceNode.containsShard(shardRouting);
|
*/
|
||||||
final RoutingNode routingNode = sourceNode.getRoutingNode(routingNodes);
|
private boolean moveShard(NodeSorter sorter, ShardRouting shardRouting, ModelNode sourceNode, RoutingNode routingNode) {
|
||||||
Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
|
logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
|
||||||
if (decision.type() == Decision.Type.NO) {
|
sorter.reset(shardRouting.getIndexName());
|
||||||
logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
|
/*
|
||||||
sorter.reset(shardRouting.getIndexName());
|
* the sorter holds the minimum weight node first for the shards index.
|
||||||
/*
|
* We now walk through the nodes until we find a node to allocate the shard.
|
||||||
* the sorter holds the minimum weight node first for the shards index.
|
* This is not guaranteed to be balanced after this operation we still try best effort to
|
||||||
* We now walk through the nodes until we find a node to allocate the shard.
|
* allocate on the minimal eligible node.
|
||||||
* This is not guaranteed to be balanced after this operation we still try best effort to
|
*/
|
||||||
* allocate on the minimal eligible node.
|
for (ModelNode currentNode : sorter.modelNodes) {
|
||||||
*/
|
if (currentNode != sourceNode) {
|
||||||
boolean moved = false;
|
RoutingNode target = currentNode.getRoutingNode();
|
||||||
for (ModelNode currentNode : modelNodes) {
|
Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
|
||||||
if (currentNode == sourceNode) {
|
Decision rebalanceDecision = allocation.deciders().canRebalance(shardRouting, allocation);
|
||||||
continue;
|
if (allocationDecision.type() == Type.YES && rebalanceDecision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
|
||||||
}
|
sourceNode.removeShard(shardRouting);
|
||||||
RoutingNode target = currentNode.getRoutingNode(routingNodes);
|
ShardRouting targetRelocatingShard = routingNodes.relocate(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
||||||
Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
|
currentNode.addShard(targetRelocatingShard);
|
||||||
Decision rebalanceDecision = allocation.deciders().canRebalance(shardRouting, allocation);
|
if (logger.isTraceEnabled()) {
|
||||||
if (allocationDecision.type() == Type.YES && rebalanceDecision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
|
logger.trace("Moved shard [{}] to node [{}]", shardRouting, routingNode.node());
|
||||||
Decision sourceDecision = sourceNode.removeShard(shardRouting);
|
|
||||||
ShardRouting targetRelocatingShard = routingNodes.relocate(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
|
||||||
// re-add (now relocating shard) to source node
|
|
||||||
sourceNode.addShard(shardRouting, sourceDecision);
|
|
||||||
Decision targetDecision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
|
|
||||||
currentNode.addShard(targetRelocatingShard, targetDecision);
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("Moved shard [{}] to node [{}]", shardRouting, routingNode.node());
|
|
||||||
}
|
|
||||||
moved = true;
|
|
||||||
changed = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (moved == false) {
|
|
||||||
logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
|
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return changed;
|
logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -593,18 +547,19 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
* on the target node which we respect during the allocation / balancing
|
* on the target node which we respect during the allocation / balancing
|
||||||
* process. In short, this method recreates the status-quo in the cluster.
|
* process. In short, this method recreates the status-quo in the cluster.
|
||||||
*/
|
*/
|
||||||
private void buildModelFromAssigned(Iterable<ShardRouting> shards) {
|
private void buildModelFromAssigned() {
|
||||||
for (ShardRouting shard : shards) {
|
for (RoutingNode rn : routingNodes) {
|
||||||
assert shard.assignedToNode();
|
ModelNode node = new ModelNode(rn);
|
||||||
/* we skip relocating shards here since we expect an initializing shard with the same id coming in */
|
nodes.put(rn.nodeId(), node);
|
||||||
if (shard.state() == RELOCATING) {
|
for (ShardRouting shard : rn) {
|
||||||
continue;
|
assert rn.nodeId().equals(shard.currentNodeId());
|
||||||
}
|
/* we skip relocating shards here since we expect an initializing shard with the same id coming in */
|
||||||
ModelNode node = nodes.get(shard.currentNodeId());
|
if (shard.state() != RELOCATING) {
|
||||||
assert node != null;
|
node.addShard(shard);
|
||||||
node.addShard(shard, Decision.single(Type.YES, "Already allocated on node", node.getNodeId()));
|
if (logger.isTraceEnabled()) {
|
||||||
if (logger.isTraceEnabled()) {
|
logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId());
|
||||||
logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId());
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -612,8 +567,11 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
/**
|
/**
|
||||||
* Allocates all given shards on the minimal eligible node for the shards index
|
* Allocates all given shards on the minimal eligible node for the shards index
|
||||||
* with respect to the weight function. All given shards must be unassigned.
|
* with respect to the weight function. All given shards must be unassigned.
|
||||||
|
* @return <code>true</code> if the current configuration has been
|
||||||
|
* changed, otherwise <code>false</code>
|
||||||
*/
|
*/
|
||||||
private boolean allocateUnassigned(RoutingNodes.UnassignedShards unassigned) {
|
private boolean allocateUnassigned() {
|
||||||
|
RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
|
||||||
assert !nodes.isEmpty();
|
assert !nodes.isEmpty();
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("Start allocating unassigned shards");
|
logger.trace("Start allocating unassigned shards");
|
||||||
|
@ -657,7 +615,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
int secondaryLength = 0;
|
int secondaryLength = 0;
|
||||||
int primaryLength = primary.length;
|
int primaryLength = primary.length;
|
||||||
ArrayUtil.timSort(primary, comparator);
|
ArrayUtil.timSort(primary, comparator);
|
||||||
final Set<ModelNode> throttledNodes = Collections.newSetFromMap(new IdentityHashMap<ModelNode, Boolean>());
|
final Set<ModelNode> throttledNodes = Collections.newSetFromMap(new IdentityHashMap<>());
|
||||||
do {
|
do {
|
||||||
for (int i = 0; i < primaryLength; i++) {
|
for (int i = 0; i < primaryLength; i++) {
|
||||||
ShardRouting shard = primary[i];
|
ShardRouting shard = primary[i];
|
||||||
|
@ -695,7 +653,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
* don't check deciders
|
* don't check deciders
|
||||||
*/
|
*/
|
||||||
if (currentWeight <= minWeight) {
|
if (currentWeight <= minWeight) {
|
||||||
Decision currentDecision = deciders.canAllocate(shard, node.getRoutingNode(routingNodes), allocation);
|
Decision currentDecision = deciders.canAllocate(shard, node.getRoutingNode(), allocation);
|
||||||
NOUPDATE:
|
NOUPDATE:
|
||||||
if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) {
|
if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) {
|
||||||
if (currentWeight == minWeight) {
|
if (currentWeight == minWeight) {
|
||||||
|
@ -736,7 +694,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
}
|
}
|
||||||
assert decision != null && minNode != null || decision == null && minNode == null;
|
assert decision != null && minNode != null || decision == null && minNode == null;
|
||||||
if (minNode != null) {
|
if (minNode != null) {
|
||||||
minNode.addShard(shard, decision);
|
minNode.addShard(shard);
|
||||||
if (decision.type() == Type.YES) {
|
if (decision.type() == Type.YES) {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
|
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
|
||||||
|
@ -745,7 +703,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
changed = true;
|
changed = true;
|
||||||
continue; // don't add to ignoreUnassigned
|
continue; // don't add to ignoreUnassigned
|
||||||
} else {
|
} else {
|
||||||
final RoutingNode node = minNode.getRoutingNode(routingNodes);
|
final RoutingNode node = minNode.getRoutingNode();
|
||||||
if (deciders.canAllocate(node, allocation).type() != Type.YES) {
|
if (deciders.canAllocate(node, allocation).type() != Type.YES) {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("Can not allocate on node [{}] remove from round decision [{}]", node, decision.type());
|
logger.trace("Can not allocate on node [{}] remove from round decision [{}]", node, decision.type());
|
||||||
|
@ -791,10 +749,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
}
|
}
|
||||||
ShardRouting candidate = null;
|
ShardRouting candidate = null;
|
||||||
final AllocationDeciders deciders = allocation.deciders();
|
final AllocationDeciders deciders = allocation.deciders();
|
||||||
for (ShardRouting shard : index.getAllShards()) {
|
for (ShardRouting shard : index) {
|
||||||
if (shard.started()) {
|
if (shard.started()) {
|
||||||
// skip initializing, unassigned and relocating shards we can't relocate them anyway
|
// skip initializing, unassigned and relocating shards we can't relocate them anyway
|
||||||
Decision allocationDecision = deciders.canAllocate(shard, minNode.getRoutingNode(routingNodes), allocation);
|
Decision allocationDecision = deciders.canAllocate(shard, minNode.getRoutingNode(), allocation);
|
||||||
Decision rebalanceDecision = deciders.canRebalance(shard, allocation);
|
Decision rebalanceDecision = deciders.canRebalance(shard, allocation);
|
||||||
if (((allocationDecision.type() == Type.YES) || (allocationDecision.type() == Type.THROTTLE))
|
if (((allocationDecision.type() == Type.YES) || (allocationDecision.type() == Type.THROTTLE))
|
||||||
&& ((rebalanceDecision.type() == Type.YES) || (rebalanceDecision.type() == Type.THROTTLE))) {
|
&& ((rebalanceDecision.type() == Type.YES) || (rebalanceDecision.type() == Type.THROTTLE))) {
|
||||||
|
@ -815,24 +773,17 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
}
|
}
|
||||||
|
|
||||||
if (candidate != null) {
|
if (candidate != null) {
|
||||||
|
|
||||||
/* allocate on the model even if not throttled */
|
/* allocate on the model even if not throttled */
|
||||||
maxNode.removeShard(candidate);
|
maxNode.removeShard(candidate);
|
||||||
minNode.addShard(candidate, decision);
|
minNode.addShard(candidate);
|
||||||
if (decision.type() == Type.YES) { /* only allocate on the cluster if we are not throttled */
|
if (decision.type() == Type.YES) { /* only allocate on the cluster if we are not throttled */
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("Relocate shard [{}] from node [{}] to node [{}]", candidate, maxNode.getNodeId(),
|
logger.trace("Relocate shard [{}] from node [{}] to node [{}]", candidate, maxNode.getNodeId(),
|
||||||
minNode.getNodeId());
|
minNode.getNodeId());
|
||||||
}
|
}
|
||||||
/* now allocate on the cluster - if we are started we need to relocate the shard */
|
/* now allocate on the cluster */
|
||||||
if (candidate.started()) {
|
routingNodes.relocate(candidate, minNode.getNodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
||||||
routingNodes.relocate(candidate, minNode.getNodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
|
||||||
|
|
||||||
} else {
|
|
||||||
routingNodes.initialize(candidate, minNode.getNodeId(), null, allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
|
||||||
}
|
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -846,14 +797,12 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
}
|
}
|
||||||
|
|
||||||
static class ModelNode implements Iterable<ModelIndex> {
|
static class ModelNode implements Iterable<ModelIndex> {
|
||||||
private final String id;
|
|
||||||
private final Map<String, ModelIndex> indices = new HashMap<>();
|
private final Map<String, ModelIndex> indices = new HashMap<>();
|
||||||
private int numShards = 0;
|
private int numShards = 0;
|
||||||
// lazily calculated
|
private final RoutingNode routingNode;
|
||||||
private RoutingNode routingNode;
|
|
||||||
|
|
||||||
public ModelNode(String id) {
|
public ModelNode(RoutingNode routingNode) {
|
||||||
this.id = id;
|
this.routingNode = routingNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ModelIndex getIndex(String indexId) {
|
public ModelIndex getIndex(String indexId) {
|
||||||
|
@ -861,13 +810,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getNodeId() {
|
public String getNodeId() {
|
||||||
return id;
|
return routingNode.nodeId();
|
||||||
}
|
}
|
||||||
|
|
||||||
public RoutingNode getRoutingNode(RoutingNodes routingNodes) {
|
public RoutingNode getRoutingNode() {
|
||||||
if (routingNode == null) {
|
|
||||||
routingNode = routingNodes.node(id);
|
|
||||||
}
|
|
||||||
return routingNode;
|
return routingNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -888,33 +834,31 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addShard(ShardRouting shard, Decision decision) {
|
public void addShard(ShardRouting shard) {
|
||||||
ModelIndex index = indices.get(shard.getIndexName());
|
ModelIndex index = indices.get(shard.getIndexName());
|
||||||
if (index == null) {
|
if (index == null) {
|
||||||
index = new ModelIndex(shard.getIndexName());
|
index = new ModelIndex(shard.getIndexName());
|
||||||
indices.put(index.getIndexId(), index);
|
indices.put(index.getIndexId(), index);
|
||||||
}
|
}
|
||||||
index.addShard(shard, decision);
|
index.addShard(shard);
|
||||||
numShards++;
|
numShards++;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Decision removeShard(ShardRouting shard) {
|
public void removeShard(ShardRouting shard) {
|
||||||
ModelIndex index = indices.get(shard.getIndexName());
|
ModelIndex index = indices.get(shard.getIndexName());
|
||||||
Decision removed = null;
|
|
||||||
if (index != null) {
|
if (index != null) {
|
||||||
removed = index.removeShard(shard);
|
index.removeShard(shard);
|
||||||
if (removed != null && index.numShards() == 0) {
|
if (index.numShards() == 0) {
|
||||||
indices.remove(shard.getIndexName());
|
indices.remove(shard.getIndexName());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
numShards--;
|
numShards--;
|
||||||
return removed;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
sb.append("Node(").append(id).append(")");
|
sb.append("Node(").append(routingNode.nodeId()).append(")");
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -930,9 +874,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static final class ModelIndex {
|
static final class ModelIndex implements Iterable<ShardRouting> {
|
||||||
private final String id;
|
private final String id;
|
||||||
private final Map<ShardRouting, Decision> shards = new HashMap<>();
|
private final Set<ShardRouting> shards = new HashSet<>(4); // expect few shards of same index to be allocated on same node
|
||||||
private int highestPrimary = -1;
|
private int highestPrimary = -1;
|
||||||
|
|
||||||
public ModelIndex(String id) {
|
public ModelIndex(String id) {
|
||||||
|
@ -942,7 +886,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
public int highestPrimary() {
|
public int highestPrimary() {
|
||||||
if (highestPrimary == -1) {
|
if (highestPrimary == -1) {
|
||||||
int maxId = -1;
|
int maxId = -1;
|
||||||
for (ShardRouting shard : shards.keySet()) {
|
for (ShardRouting shard : shards) {
|
||||||
if (shard.primary()) {
|
if (shard.primary()) {
|
||||||
maxId = Math.max(maxId, shard.id());
|
maxId = Math.max(maxId, shard.id());
|
||||||
}
|
}
|
||||||
|
@ -960,24 +904,25 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
return shards.size();
|
return shards.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Collection<ShardRouting> getAllShards() {
|
@Override
|
||||||
return shards.keySet();
|
public Iterator<ShardRouting> iterator() {
|
||||||
|
return shards.iterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Decision removeShard(ShardRouting shard) {
|
public void removeShard(ShardRouting shard) {
|
||||||
highestPrimary = -1;
|
highestPrimary = -1;
|
||||||
return shards.remove(shard);
|
assert shards.contains(shard) : "Shard not allocated on current node: " + shard;
|
||||||
|
shards.remove(shard);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addShard(ShardRouting shard, Decision decision) {
|
public void addShard(ShardRouting shard) {
|
||||||
highestPrimary = -1;
|
highestPrimary = -1;
|
||||||
assert decision != null;
|
assert !shards.contains(shard) : "Shard already allocated on current node: " + shard;
|
||||||
assert !shards.containsKey(shard) : "Shard already allocated on current node: " + shards.get(shard) + " " + shard;
|
shards.add(shard);
|
||||||
shards.put(shard, decision);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean containsShard(ShardRouting shard) {
|
public boolean containsShard(ShardRouting shard) {
|
||||||
return shards.containsKey(shard);
|
return shards.contains(shard);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,56 +19,25 @@
|
||||||
|
|
||||||
package org.elasticsearch.cluster.routing.allocation.allocator;
|
package org.elasticsearch.cluster.routing.allocation.allocator;
|
||||||
|
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
|
||||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
|
||||||
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
|
||||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||||
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* A {@link ShardsAllocator} is the main entry point for shard allocation on nodes in the cluster.
|
* A {@link ShardsAllocator} is the main entry point for shard allocation on nodes in the cluster.
|
||||||
* The allocator makes basic decision where a shard instance will be allocated, if already allocated instances
|
* The allocator makes basic decision where a shard instance will be allocated, if already allocated instances
|
||||||
* need relocate to other nodes due to node failures or due to rebalancing decisions.
|
* need to relocate to other nodes due to node failures or due to rebalancing decisions.
|
||||||
* </p>
|
* </p>
|
||||||
*/
|
*/
|
||||||
public interface ShardsAllocator {
|
public interface ShardsAllocator {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Applies changes on started nodes based on the implemented algorithm. For example if a
|
* Allocates shards to nodes in the cluster. An implementation of this method should:
|
||||||
* shard has changed to {@link ShardRoutingState#STARTED} from {@link ShardRoutingState#RELOCATING}
|
* - assign unassigned shards
|
||||||
* this allocator might apply some cleanups on the node that used to hold the shard.
|
* - relocate shards that cannot stay on a node anymore
|
||||||
* @param allocation all started {@link ShardRouting shards}
|
* - relocate shards to find a good shard balance in the cluster
|
||||||
*/
|
|
||||||
void applyStartedShards(StartedRerouteAllocation allocation);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Applies changes on failed nodes based on the implemented algorithm.
|
|
||||||
* @param allocation all failed {@link ShardRouting shards}
|
|
||||||
*/
|
|
||||||
void applyFailedShards(FailedRerouteAllocation allocation);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Assign all unassigned shards to nodes
|
|
||||||
*
|
*
|
||||||
* @param allocation current node allocation
|
* @param allocation current node allocation
|
||||||
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
|
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
|
||||||
*/
|
*/
|
||||||
boolean allocateUnassigned(RoutingAllocation allocation);
|
boolean allocate(RoutingAllocation allocation);
|
||||||
|
|
||||||
/**
|
|
||||||
* Rebalancing number of shards on all nodes
|
|
||||||
*
|
|
||||||
* @param allocation current node allocation
|
|
||||||
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
|
|
||||||
*/
|
|
||||||
boolean rebalance(RoutingAllocation allocation);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Move started shards that can not be allocated to a node anymore
|
|
||||||
*
|
|
||||||
* @param allocation current node allocation
|
|
||||||
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
|
|
||||||
*/
|
|
||||||
boolean moveShards(RoutingAllocation allocation);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,100 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to Elasticsearch under one or more contributor
|
|
||||||
* license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright
|
|
||||||
* ownership. Elasticsearch licenses this file to you under
|
|
||||||
* the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
* not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.elasticsearch.cluster.routing.allocation.allocator;
|
|
||||||
|
|
||||||
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
|
||||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
|
||||||
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
|
||||||
import org.elasticsearch.common.inject.Inject;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
|
||||||
import org.elasticsearch.gateway.GatewayAllocator;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The {@link ShardsAllocator} class offers methods for allocating shard within a cluster.
|
|
||||||
* These methods include moving shards and re-balancing the cluster. It also allows management
|
|
||||||
* of shards by their state.
|
|
||||||
*/
|
|
||||||
public class ShardsAllocators extends AbstractComponent implements ShardsAllocator {
|
|
||||||
|
|
||||||
private final GatewayAllocator gatewayAllocator;
|
|
||||||
private final ShardsAllocator allocator;
|
|
||||||
|
|
||||||
public ShardsAllocators(GatewayAllocator allocator) {
|
|
||||||
this(Settings.Builder.EMPTY_SETTINGS, allocator);
|
|
||||||
}
|
|
||||||
|
|
||||||
public ShardsAllocators(Settings settings, GatewayAllocator allocator) {
|
|
||||||
this(settings, allocator, new BalancedShardsAllocator(settings));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
public ShardsAllocators(Settings settings, GatewayAllocator gatewayAllocator, ShardsAllocator allocator) {
|
|
||||||
super(settings);
|
|
||||||
this.gatewayAllocator = gatewayAllocator;
|
|
||||||
this.allocator = allocator;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void applyStartedShards(StartedRerouteAllocation allocation) {
|
|
||||||
gatewayAllocator.applyStartedShards(allocation);
|
|
||||||
allocator.applyStartedShards(allocation);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void applyFailedShards(FailedRerouteAllocation allocation) {
|
|
||||||
gatewayAllocator.applyFailedShards(allocation);
|
|
||||||
allocator.applyFailedShards(allocation);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean allocateUnassigned(RoutingAllocation allocation) {
|
|
||||||
boolean changed = false;
|
|
||||||
changed |= gatewayAllocator.allocateUnassigned(allocation);
|
|
||||||
changed |= allocator.allocateUnassigned(allocation);
|
|
||||||
return changed;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected long nanoTime() {
|
|
||||||
return System.nanoTime();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean rebalance(RoutingAllocation allocation) {
|
|
||||||
if (allocation.hasPendingAsyncFetch() == false) {
|
|
||||||
/*
|
|
||||||
* see https://github.com/elastic/elasticsearch/issues/14387
|
|
||||||
* if we allow rebalance operations while we are still fetching shard store data
|
|
||||||
* we might end up with unnecessary rebalance operations which can be super confusion/frustrating
|
|
||||||
* since once the fetches come back we might just move all the shards back again.
|
|
||||||
* Therefore we only do a rebalance if we have fetched all information.
|
|
||||||
*/
|
|
||||||
return allocator.rebalance(allocation);
|
|
||||||
} else {
|
|
||||||
logger.debug("skipping rebalance due to in-flight shard/store fetches");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean moveShards(RoutingAllocation allocation) {
|
|
||||||
return allocator.moveShards(allocation);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -48,19 +48,7 @@ public class ClusterModuleTests extends ModuleTestCase {
|
||||||
|
|
||||||
static class FakeShardsAllocator implements ShardsAllocator {
|
static class FakeShardsAllocator implements ShardsAllocator {
|
||||||
@Override
|
@Override
|
||||||
public void applyStartedShards(StartedRerouteAllocation allocation) {}
|
public boolean allocate(RoutingAllocation allocation) {
|
||||||
@Override
|
|
||||||
public void applyFailedShards(FailedRerouteAllocation allocation) {}
|
|
||||||
@Override
|
|
||||||
public boolean allocateUnassigned(RoutingAllocation allocation) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
public boolean rebalance(RoutingAllocation allocation) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
public boolean moveShards(RoutingAllocation allocation) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,7 +35,6 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||||
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
|
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
|
||||||
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
|
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
|
||||||
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
|
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
|
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
|
||||||
import org.elasticsearch.common.logging.ESLogger;
|
import org.elasticsearch.common.logging.ESLogger;
|
||||||
import org.elasticsearch.common.logging.Loggers;
|
import org.elasticsearch.common.logging.Loggers;
|
||||||
|
@ -311,29 +310,9 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
|
||||||
public void testNoRebalanceOnPrimaryOverload() {
|
public void testNoRebalanceOnPrimaryOverload() {
|
||||||
Settings.Builder settings = settingsBuilder();
|
Settings.Builder settings = settingsBuilder();
|
||||||
AllocationService strategy = new AllocationService(settings.build(), randomAllocationDeciders(settings.build(),
|
AllocationService strategy = new AllocationService(settings.build(), randomAllocationDeciders(settings.build(),
|
||||||
new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), getRandom()), new ShardsAllocators(settings.build(),
|
new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), getRandom()),
|
||||||
NoopGatewayAllocator.INSTANCE, new ShardsAllocator() {
|
NoopGatewayAllocator.INSTANCE, new ShardsAllocator() {
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean rebalance(RoutingAllocation allocation) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean moveShards(RoutingAllocation allocation) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void applyStartedShards(StartedRerouteAllocation allocation) {
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void applyFailedShards(FailedRerouteAllocation allocation) {
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* // this allocator tries to rebuild this scenario where a rebalance is
|
* // this allocator tries to rebuild this scenario where a rebalance is
|
||||||
* // triggered solely by the primary overload on node [1] where a shard
|
* // triggered solely by the primary overload on node [1] where a shard
|
||||||
|
@ -354,9 +333,8 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
|
||||||
--------[test][2], node[3], [P], s[STARTED]
|
--------[test][2], node[3], [P], s[STARTED]
|
||||||
--------[test][3], node[3], [P], s[STARTED]
|
--------[test][3], node[3], [P], s[STARTED]
|
||||||
---- unassigned
|
---- unassigned
|
||||||
*/
|
*/
|
||||||
@Override
|
public boolean allocate(RoutingAllocation allocation) {
|
||||||
public boolean allocateUnassigned(RoutingAllocation allocation) {
|
|
||||||
RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned();
|
RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned();
|
||||||
boolean changed = !unassigned.isEmpty();
|
boolean changed = !unassigned.isEmpty();
|
||||||
ShardRouting[] drain = unassigned.drain();
|
ShardRouting[] drain = unassigned.drain();
|
||||||
|
@ -403,7 +381,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
|
||||||
}
|
}
|
||||||
return changed;
|
return changed;
|
||||||
}
|
}
|
||||||
}), EmptyClusterInfoService.INSTANCE);
|
}, EmptyClusterInfoService.INSTANCE);
|
||||||
MetaData.Builder metaDataBuilder = MetaData.builder();
|
MetaData.Builder metaDataBuilder = MetaData.builder();
|
||||||
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
|
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
|
||||||
IndexMetaData.Builder indexMeta = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(1);
|
IndexMetaData.Builder indexMeta = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(1);
|
||||||
|
|
|
@ -36,7 +36,7 @@ import org.elasticsearch.cluster.routing.RoutingTable;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||||
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
|
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
|
||||||
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
|
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
|
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||||
|
@ -333,7 +333,7 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
|
||||||
AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY, new AllocationDecider[] {new NodeVersionAllocationDecider(Settings.EMPTY)});
|
AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY, new AllocationDecider[] {new NodeVersionAllocationDecider(Settings.EMPTY)});
|
||||||
AllocationService strategy = new MockAllocationService(Settings.EMPTY,
|
AllocationService strategy = new MockAllocationService(Settings.EMPTY,
|
||||||
allocationDeciders,
|
allocationDeciders,
|
||||||
new ShardsAllocators(Settings.EMPTY, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE);
|
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
|
||||||
RoutingAllocation.Result result = strategy.reroute(state, new AllocationCommands(), true);
|
RoutingAllocation.Result result = strategy.reroute(state, new AllocationCommands(), true);
|
||||||
// the two indices must stay as is, the replicas cannot move to oldNode2 because versions don't match
|
// the two indices must stay as is, the replicas cannot move to oldNode2 because versions don't match
|
||||||
state = ClusterState.builder(state).routingResult(result).build();
|
state = ClusterState.builder(state).routingResult(result).build();
|
||||||
|
@ -363,7 +363,7 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
|
||||||
new NodeVersionAllocationDecider(Settings.EMPTY)});
|
new NodeVersionAllocationDecider(Settings.EMPTY)});
|
||||||
AllocationService strategy = new MockAllocationService(Settings.EMPTY,
|
AllocationService strategy = new MockAllocationService(Settings.EMPTY,
|
||||||
allocationDeciders,
|
allocationDeciders,
|
||||||
new ShardsAllocators(Settings.EMPTY, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE);
|
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
|
||||||
RoutingAllocation.Result result = strategy.reroute(state, new AllocationCommands(), true);
|
RoutingAllocation.Result result = strategy.reroute(state, new AllocationCommands(), true);
|
||||||
|
|
||||||
// Make sure that primary shards are only allocated on the new node
|
// Make sure that primary shards are only allocated on the new node
|
||||||
|
|
|
@ -30,7 +30,7 @@ import org.elasticsearch.cluster.routing.RoutingNode;
|
||||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||||
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
|
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
|
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||||
|
@ -59,7 +59,7 @@ public class RandomAllocationDeciderTests extends ESAllocationTestCase {
|
||||||
RandomAllocationDecider randomAllocationDecider = new RandomAllocationDecider(getRandom());
|
RandomAllocationDecider randomAllocationDecider = new RandomAllocationDecider(getRandom());
|
||||||
AllocationService strategy = new AllocationService(settingsBuilder().build(), new AllocationDeciders(Settings.EMPTY,
|
AllocationService strategy = new AllocationService(settingsBuilder().build(), new AllocationDeciders(Settings.EMPTY,
|
||||||
new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY), new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY),
|
new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY), new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY),
|
||||||
randomAllocationDecider))), new ShardsAllocators(NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE);
|
randomAllocationDecider))), NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
|
||||||
int indices = scaledRandomIntBetween(1, 20);
|
int indices = scaledRandomIntBetween(1, 20);
|
||||||
Builder metaBuilder = MetaData.builder();
|
Builder metaBuilder = MetaData.builder();
|
||||||
int maxNumReplicas = 1;
|
int maxNumReplicas = 1;
|
||||||
|
|
|
@ -39,7 +39,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||||
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
|
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
|
||||||
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
|
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
|
||||||
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
|
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
|
||||||
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
||||||
|
@ -65,10 +65,6 @@ import static org.hamcrest.Matchers.nullValue;
|
||||||
|
|
||||||
public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
|
|
||||||
private static ShardsAllocators makeShardsAllocators() {
|
|
||||||
return new ShardsAllocators(NoopGatewayAllocator.INSTANCE);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testDiskThreshold() {
|
public void testDiskThreshold() {
|
||||||
Settings diskSettings = settingsBuilder()
|
Settings diskSettings = settingsBuilder()
|
||||||
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
|
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
|
||||||
|
@ -109,7 +105,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
||||||
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
||||||
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
||||||
.build(), deciders, makeShardsAllocators(), cis);
|
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
|
||||||
|
|
||||||
MetaData metaData = MetaData.builder()
|
MetaData metaData = MetaData.builder()
|
||||||
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
|
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
|
||||||
|
@ -194,7 +190,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
||||||
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
||||||
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
||||||
.build(), deciders, makeShardsAllocators(), cis);
|
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
|
||||||
|
|
||||||
routingTable = strategy.reroute(clusterState, "reroute").routingTable();
|
routingTable = strategy.reroute(clusterState, "reroute").routingTable();
|
||||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||||
|
@ -225,7 +221,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
||||||
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
||||||
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
||||||
.build(), deciders, makeShardsAllocators(), cis);
|
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
|
||||||
|
|
||||||
routingTable = strategy.reroute(clusterState, "reroute").routingTable();
|
routingTable = strategy.reroute(clusterState, "reroute").routingTable();
|
||||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||||
|
@ -305,7 +301,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
||||||
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
||||||
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
||||||
.build(), deciders, makeShardsAllocators(), cis);
|
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
|
||||||
|
|
||||||
MetaData metaData = MetaData.builder()
|
MetaData metaData = MetaData.builder()
|
||||||
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
|
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
|
||||||
|
@ -362,7 +358,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
||||||
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
||||||
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
||||||
.build(), deciders, makeShardsAllocators(), cis);
|
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
|
||||||
|
|
||||||
routingTable = strategy.reroute(clusterState, "reroute").routingTable();
|
routingTable = strategy.reroute(clusterState, "reroute").routingTable();
|
||||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||||
|
@ -429,7 +425,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
||||||
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
||||||
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
||||||
.build(), deciders, makeShardsAllocators(), cis);
|
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
|
||||||
|
|
||||||
routingTable = strategy.reroute(clusterState, "reroute").routingTable();
|
routingTable = strategy.reroute(clusterState, "reroute").routingTable();
|
||||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||||
|
@ -460,7 +456,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
||||||
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
||||||
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
||||||
.build(), deciders, makeShardsAllocators(), cis);
|
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
|
||||||
|
|
||||||
routingTable = strategy.reroute(clusterState, "reroute").routingTable();
|
routingTable = strategy.reroute(clusterState, "reroute").routingTable();
|
||||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||||
|
@ -569,7 +565,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
||||||
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
||||||
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
||||||
.build(), deciders, makeShardsAllocators(), cis);
|
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
|
||||||
|
|
||||||
MetaData metaData = MetaData.builder()
|
MetaData metaData = MetaData.builder()
|
||||||
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
|
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
|
||||||
|
@ -637,7 +633,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
||||||
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
||||||
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
||||||
.build(), deciders, makeShardsAllocators(), cis);
|
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
|
||||||
|
|
||||||
MetaData metaData = MetaData.builder()
|
MetaData metaData = MetaData.builder()
|
||||||
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
|
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
|
||||||
|
@ -740,7 +736,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
||||||
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
||||||
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
||||||
.build(), deciders, makeShardsAllocators(), cis);
|
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
|
||||||
|
|
||||||
MetaData metaData = MetaData.builder()
|
MetaData metaData = MetaData.builder()
|
||||||
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
|
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
|
||||||
|
@ -902,7 +898,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
||||||
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
||||||
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
||||||
.build(), deciders, makeShardsAllocators(), cis);
|
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
|
||||||
// Ensure that the reroute call doesn't alter the routing table, since the first primary is relocating away
|
// Ensure that the reroute call doesn't alter the routing table, since the first primary is relocating away
|
||||||
// and therefor we will have sufficient disk space on node1.
|
// and therefor we will have sufficient disk space on node1.
|
||||||
RoutingAllocation.Result result = strategy.reroute(clusterState, "reroute");
|
RoutingAllocation.Result result = strategy.reroute(clusterState, "reroute");
|
||||||
|
@ -1003,7 +999,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
||||||
|
|
||||||
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
||||||
.build(), deciders, makeShardsAllocators(), cis);
|
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
|
||||||
RoutingAllocation.Result result = strategy.reroute(clusterState, "reroute");
|
RoutingAllocation.Result result = strategy.reroute(clusterState, "reroute");
|
||||||
|
|
||||||
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().state(), equalTo(STARTED));
|
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().state(), equalTo(STARTED));
|
||||||
|
|
|
@ -492,7 +492,7 @@ public class NodeJoinControllerTests extends ESTestCase {
|
||||||
static class NoopAllocationService extends AllocationService {
|
static class NoopAllocationService extends AllocationService {
|
||||||
|
|
||||||
public NoopAllocationService(Settings settings) {
|
public NoopAllocationService(Settings settings) {
|
||||||
super(settings, null, null, null);
|
super(settings, null, null, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -33,7 +33,8 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||||
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
||||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||||
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
||||||
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
|
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
|
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||||
|
@ -79,19 +80,19 @@ public abstract class ESAllocationTestCase extends ESTestCase {
|
||||||
public static MockAllocationService createAllocationService(Settings settings, ClusterSettings clusterSettings, Random random) {
|
public static MockAllocationService createAllocationService(Settings settings, ClusterSettings clusterSettings, Random random) {
|
||||||
return new MockAllocationService(settings,
|
return new MockAllocationService(settings,
|
||||||
randomAllocationDeciders(settings, clusterSettings, random),
|
randomAllocationDeciders(settings, clusterSettings, random),
|
||||||
new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE);
|
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static MockAllocationService createAllocationService(Settings settings, ClusterInfoService clusterInfoService) {
|
public static MockAllocationService createAllocationService(Settings settings, ClusterInfoService clusterInfoService) {
|
||||||
return new MockAllocationService(settings,
|
return new MockAllocationService(settings,
|
||||||
randomAllocationDeciders(settings, new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), getRandom()),
|
randomAllocationDeciders(settings, new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), getRandom()),
|
||||||
new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), clusterInfoService);
|
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(settings), clusterInfoService);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static MockAllocationService createAllocationService(Settings settings, GatewayAllocator allocator) {
|
public static MockAllocationService createAllocationService(Settings settings, GatewayAllocator gatewayAllocator) {
|
||||||
return new MockAllocationService(settings,
|
return new MockAllocationService(settings,
|
||||||
randomAllocationDeciders(settings, new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), getRandom()),
|
randomAllocationDeciders(settings, new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), getRandom()),
|
||||||
new ShardsAllocators(settings, allocator), EmptyClusterInfoService.INSTANCE);
|
gatewayAllocator, new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -193,8 +194,9 @@ public abstract class ESAllocationTestCase extends ESTestCase {
|
||||||
|
|
||||||
private Long nanoTimeOverride = null;
|
private Long nanoTimeOverride = null;
|
||||||
|
|
||||||
public MockAllocationService(Settings settings, AllocationDeciders allocationDeciders, ShardsAllocators shardsAllocators, ClusterInfoService clusterInfoService) {
|
public MockAllocationService(Settings settings, AllocationDeciders allocationDeciders, GatewayAllocator gatewayAllocator,
|
||||||
super(settings, allocationDeciders, shardsAllocators, clusterInfoService);
|
ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService) {
|
||||||
|
super(settings, allocationDeciders, gatewayAllocator, shardsAllocator, clusterInfoService);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setNanoTimeOverride(long nanoTime) {
|
public void setNanoTimeOverride(long nanoTime) {
|
||||||
|
|
Loading…
Reference in New Issue