Merge pull request #16926 from ywelsch/fix/balancer-move
Speed up shard balancer by reusing shard model while moving shards that can no longer be allocated to a node
This commit is contained in:
commit
675d940f01
|
@ -310,7 +310,7 @@ public class AllocationService extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
// move shards that no longer can be allocated
|
// move shards that no longer can be allocated
|
||||||
changed |= moveShards(allocation);
|
changed |= shardsAllocators.moveShards(allocation);
|
||||||
|
|
||||||
// rebalance
|
// rebalance
|
||||||
changed |= shardsAllocators.rebalance(allocation);
|
changed |= shardsAllocators.rebalance(allocation);
|
||||||
|
@ -327,46 +327,6 @@ public class AllocationService extends AbstractComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean moveShards(RoutingAllocation allocation) {
|
|
||||||
boolean changed = false;
|
|
||||||
|
|
||||||
// create a copy of the shards interleaving between nodes, and check if they can remain
|
|
||||||
List<ShardRouting> shards = new ArrayList<>();
|
|
||||||
int index = 0;
|
|
||||||
boolean found = true;
|
|
||||||
final RoutingNodes routingNodes = allocation.routingNodes();
|
|
||||||
while (found) {
|
|
||||||
found = false;
|
|
||||||
for (RoutingNode routingNode : routingNodes) {
|
|
||||||
if (index >= routingNode.size()) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
found = true;
|
|
||||||
shards.add(routingNode.get(index));
|
|
||||||
}
|
|
||||||
index++;
|
|
||||||
}
|
|
||||||
for (int i = 0; i < shards.size(); i++) {
|
|
||||||
ShardRouting shardRouting = shards.get(i);
|
|
||||||
// we can only move started shards...
|
|
||||||
if (!shardRouting.started()) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
final RoutingNode routingNode = routingNodes.node(shardRouting.currentNodeId());
|
|
||||||
Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
|
|
||||||
if (decision.type() == Decision.Type.NO) {
|
|
||||||
logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
|
|
||||||
boolean moved = shardsAllocators.move(shardRouting, routingNode, allocation);
|
|
||||||
if (!moved) {
|
|
||||||
logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
|
|
||||||
} else {
|
|
||||||
changed = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return changed;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) {
|
private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) {
|
||||||
boolean changed = false;
|
boolean changed = false;
|
||||||
final RoutingNodes routingNodes = allocation.routingNodes();
|
final RoutingNodes routingNodes = allocation.routingNodes();
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.gateway.PriorityComparator;
|
import org.elasticsearch.gateway.PriorityComparator;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
@ -49,6 +50,7 @@ import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.IdentityHashMap;
|
import java.util.IdentityHashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
@ -119,9 +121,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
public boolean moveShards(RoutingAllocation allocation) {
|
||||||
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
|
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
|
||||||
return balancer.move(shardRouting, node);
|
return balancer.moveShards();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -489,58 +491,95 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This function executes a move operation moving the given shard from
|
* Move started shards that can not be allocated to a node anymore
|
||||||
* the given node to the minimal eligible node with respect to the
|
*
|
||||||
* weight function. Iff the shard is moved the shard will be set to
|
* For each shard to be moved this function executes a move operation
|
||||||
|
* to the minimal eligible node with respect to the
|
||||||
|
* weight function. If a shard is moved the shard will be set to
|
||||||
* {@link ShardRoutingState#RELOCATING} and a shadow instance of this
|
* {@link ShardRoutingState#RELOCATING} and a shadow instance of this
|
||||||
* shard is created with an incremented version in the state
|
* shard is created with an incremented version in the state
|
||||||
* {@link ShardRoutingState#INITIALIZING}.
|
* {@link ShardRoutingState#INITIALIZING}.
|
||||||
*
|
*
|
||||||
* @return <code>true</code> iff the shard has successfully been moved.
|
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
|
||||||
*/
|
*/
|
||||||
public boolean move(ShardRouting shard, RoutingNode node ) {
|
public boolean moveShards() {
|
||||||
if (nodes.isEmpty() || !shard.started()) {
|
if (nodes.isEmpty()) {
|
||||||
/* with no nodes or a not started shard this is pointless */
|
/* with no nodes this is pointless */
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("Try moving shard [{}] from [{}]", shard, node);
|
// Create a copy of the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
|
||||||
|
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
|
||||||
|
// offloading the shards.
|
||||||
|
List<ShardRouting> shards = new ArrayList<>();
|
||||||
|
int index = 0;
|
||||||
|
boolean found = true;
|
||||||
|
while (found) {
|
||||||
|
found = false;
|
||||||
|
for (RoutingNode routingNode : routingNodes) {
|
||||||
|
if (index >= routingNode.size()) {
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
found = true;
|
||||||
|
ShardRouting shardRouting = routingNode.get(index);
|
||||||
|
// we can only move started shards...
|
||||||
|
if (shardRouting.started()) {
|
||||||
|
shards.add(shardRouting);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
index++;
|
||||||
|
}
|
||||||
|
if (shards.isEmpty()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
|
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
|
||||||
boolean changed = initialize(routingNodes, unassigned);
|
boolean changed = initialize(routingNodes, unassigned);
|
||||||
if (!changed) {
|
if (changed == false) {
|
||||||
final ModelNode sourceNode = nodes.get(node.nodeId());
|
|
||||||
assert sourceNode != null;
|
|
||||||
final NodeSorter sorter = newNodeSorter();
|
final NodeSorter sorter = newNodeSorter();
|
||||||
sorter.reset(shard.getIndexName());
|
final ModelNode[] modelNodes = sorter.modelNodes;
|
||||||
final ModelNode[] nodes = sorter.modelNodes;
|
for (ShardRouting shardRouting : shards) {
|
||||||
assert sourceNode.containsShard(shard);
|
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
|
||||||
|
assert sourceNode != null && sourceNode.containsShard(shardRouting);
|
||||||
|
final RoutingNode routingNode = sourceNode.getRoutingNode(routingNodes);
|
||||||
|
Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
|
||||||
|
if (decision.type() == Decision.Type.NO) {
|
||||||
|
logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
|
||||||
|
sorter.reset(shardRouting.getIndexName());
|
||||||
/*
|
/*
|
||||||
* the sorter holds the minimum weight node first for the shards index.
|
* the sorter holds the minimum weight node first for the shards index.
|
||||||
* We now walk through the nodes until we find a node to allocate the shard.
|
* We now walk through the nodes until we find a node to allocate the shard.
|
||||||
* This is not guaranteed to be balanced after this operation we still try best effort to
|
* This is not guaranteed to be balanced after this operation we still try best effort to
|
||||||
* allocate on the minimal eligible node.
|
* allocate on the minimal eligible node.
|
||||||
*/
|
*/
|
||||||
|
boolean moved = false;
|
||||||
for (ModelNode currentNode : nodes) {
|
for (ModelNode currentNode : modelNodes) {
|
||||||
if (currentNode.getNodeId().equals(node.nodeId())) {
|
if (currentNode == sourceNode) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
RoutingNode target = currentNode.getRoutingNode(routingNodes);
|
RoutingNode target = currentNode.getRoutingNode(routingNodes);
|
||||||
Decision allocationDecision = allocation.deciders().canAllocate(shard, target, allocation);
|
Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
|
||||||
Decision rebalanceDecision = allocation.deciders().canRebalance(shard, allocation);
|
Decision rebalanceDecision = allocation.deciders().canRebalance(shardRouting, allocation);
|
||||||
Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
|
if (allocationDecision.type() == Type.YES && rebalanceDecision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
|
||||||
if (decision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
|
Decision sourceDecision = sourceNode.removeShard(shardRouting);
|
||||||
sourceNode.removeShard(shard);
|
ShardRouting targetRelocatingShard = routingNodes.relocate(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
||||||
ShardRouting targetRelocatingShard = routingNodes.relocate(shard, target.nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
// re-add (now relocating shard) to source node
|
||||||
currentNode.addShard(targetRelocatingShard, decision);
|
sourceNode.addShard(shardRouting, sourceDecision);
|
||||||
|
Decision targetDecision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
|
||||||
|
currentNode.addShard(targetRelocatingShard, targetDecision);
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("Moved shard [{}] to node [{}]", shard, currentNode.getNodeId());
|
logger.trace("Moved shard [{}] to node [{}]", shardRouting, routingNode.node());
|
||||||
}
|
}
|
||||||
|
moved = true;
|
||||||
changed = true;
|
changed = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (moved == false) {
|
||||||
|
logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return changed;
|
return changed;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
|
|
||||||
package org.elasticsearch.cluster.routing.allocation.allocator;
|
package org.elasticsearch.cluster.routing.allocation.allocator;
|
||||||
|
|
||||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||||
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
||||||
|
@ -66,12 +65,10 @@ public interface ShardsAllocator {
|
||||||
boolean rebalance(RoutingAllocation allocation);
|
boolean rebalance(RoutingAllocation allocation);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Moves a shard from the given node to other node.
|
* Move started shards that can not be allocated to a node anymore
|
||||||
*
|
*
|
||||||
* @param shardRouting the shard to move
|
|
||||||
* @param node A node containing the shard
|
|
||||||
* @param allocation current node allocation
|
* @param allocation current node allocation
|
||||||
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
|
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
|
||||||
*/
|
*/
|
||||||
boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation);
|
boolean moveShards(RoutingAllocation allocation);
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,8 +19,6 @@
|
||||||
|
|
||||||
package org.elasticsearch.cluster.routing.allocation.allocator;
|
package org.elasticsearch.cluster.routing.allocation.allocator;
|
||||||
|
|
||||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
|
||||||
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
||||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||||
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
||||||
|
@ -96,7 +94,7 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
public boolean moveShards(RoutingAllocation allocation) {
|
||||||
return allocator.move(shardRouting, node, allocation);
|
return allocator.moveShards(allocation);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,7 +60,7 @@ public class ClusterModuleTests extends ModuleTestCase {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
public boolean moveShards(RoutingAllocation allocation) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -320,7 +320,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
public boolean moveShards(RoutingAllocation allocation) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue