From 64e84dcc76660f3d62c0901137fb01980891f0f5 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 7 Mar 2016 17:21:41 +0100 Subject: [PATCH] Reuse shard model across 3 phases in BalancedShardsAllocator --- .../cluster/routing/RoutingNodes.java | 7 + .../allocator/BalancedShardsAllocator.java | 486 ++++++++---------- .../allocation/allocator/ShardsAllocator.java | 9 +- 3 files changed, 228 insertions(+), 274 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 4f2f9d06097..a6ef564904c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -597,6 +597,13 @@ public class RoutingNodes implements Iterable { } + /** + * Returns the number of routing nodes + */ + public int size() { + return nodesToShards.size(); + } + public static final class UnassignedShards implements Iterable { private final RoutingNodes nodes; diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 3e5b0847b0a..6377e06e245 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster.routing.allocation.allocator; -import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.IntroSorter; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -28,9 +27,7 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type; @@ -42,18 +39,14 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.PriorityComparator; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Predicate; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; @@ -104,24 +97,14 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards @Override public boolean allocate(RoutingAllocation allocation) { + if (allocation.routingNodes().size() == 0) { + /* with no nodes this is pointless */ + return false; + } final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); - boolean changed = false; - if (allocation.routingNodes().unassigned().size() > 0) { - changed |= balancer.allocateUnassigned(); - } + boolean changed = balancer.allocateUnassigned(); changed |= balancer.moveShards(); - if (allocation.hasPendingAsyncFetch() == false) { - /* - * see https://github.com/elastic/elasticsearch/issues/14387 - * if we allow rebalance operations while we are still fetching shard store data - * we might end up with unnecessary rebalance operations which can be super confusion/frustrating - * since once the fetches come back we might just move all the shards back again. - * Therefore we only do a rebalance if we have fetched all information. - */ - changed |= balancer.balance(); - } else { - logger.debug("skipping rebalance due to in-flight shard/store fetches"); - } + changed |= balancer.balance(); return changed; } @@ -202,8 +185,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards } private float weight(Balancer balancer, ModelNode node, String index, int numAdditionalShards) { - final float weightShard = (node.numShards() + numAdditionalShards - balancer.avgShardsPerNode()); - final float weightIndex = (node.numShards(index) + numAdditionalShards - balancer.avgShardsPerNode(index)); + final float weightShard = node.numShards() + numAdditionalShards - balancer.avgShardsPerNode(); + final float weightIndex = node.numShards(index) + numAdditionalShards - balancer.avgShardsPerNode(index); return theta0 * weightShard + theta1 * weightIndex; } @@ -215,7 +198,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards public static class Balancer { private final ESLogger logger; private final Map nodes = new HashMap<>(); - private final HashSet indices = new HashSet<>(); private final RoutingAllocation allocation; private final RoutingNodes routingNodes; private final WeightFunction weight; @@ -224,19 +206,15 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards private final MetaData metaData; private final float avgShardsPerNode; - private final Predicate assignedFilter = shard -> shard.assignedToNode(); - public Balancer(ESLogger logger, RoutingAllocation allocation, WeightFunction weight, float threshold) { this.logger = logger; this.allocation = allocation; this.weight = weight; this.threshold = threshold; this.routingNodes = allocation.routingNodes(); - for (RoutingNode node : routingNodes) { - nodes.put(node.nodeId(), new ModelNode(node.nodeId())); - } metaData = routingNodes.metaData(); - avgShardsPerNode = ((float) metaData.totalNumberOfShards()) / nodes.size(); + avgShardsPerNode = ((float) metaData.totalNumberOfShards()) / routingNodes.size(); + buildModelFromAssigned(); } /** @@ -270,17 +248,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards return new NodeSorter(nodesArray(), weight, this); } - private boolean initialize(RoutingNodes routing, RoutingNodes.UnassignedShards unassigned) { - if (logger.isTraceEnabled()) { - logger.trace("Start distributing Shards"); - } - for (ObjectCursor index : allocation.routingTable().indicesRouting().keys()) { - indices.add(index.value); - } - buildModelFromAssigned(routing.shards(assignedFilter)); - return allocateUnassigned(unassigned); - } - private static float absDelta(float lower, float higher) { assert higher >= lower : higher + " lt " + lower +" but was expected to be gte"; return Math.abs(higher - lower); @@ -294,12 +261,36 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards } /** - * Allocates all possible unassigned shards + * Balances the nodes on the cluster model according to the weight function. + * The actual balancing is delegated to {@link #balanceByWeights()} + * * @return true if the current configuration has been * changed, otherwise false */ - final boolean allocateUnassigned() { - return balance(true); + private boolean balance() { + if (logger.isTraceEnabled()) { + logger.trace("Start balancing cluster"); + } + if (allocation.hasPendingAsyncFetch()) { + /* + * see https://github.com/elastic/elasticsearch/issues/14387 + * if we allow rebalance operations while we are still fetching shard store data + * we might end up with unnecessary rebalance operations which can be super confusion/frustrating + * since once the fetches come back we might just move all the shards back again. + * Therefore we only do a rebalance if we have fetched all information. + */ + logger.debug("skipping rebalance due to in-flight shard/store fetches"); + return false; + } + if (allocation.deciders().canRebalance(allocation).type() != Type.YES) { + logger.trace("skipping rebalance as it is disabled"); + return false; + } + if (nodes.size() < 2) { /* skip if we only have one node */ + logger.trace("skipping rebalance as single node only"); + return false; + } + return balanceByWeights(); } /** @@ -316,120 +307,100 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards * @return true if the current configuration has been * changed, otherwise false */ - public boolean balance() { - return balance(false); - } + private boolean balanceByWeights() { + boolean changed = false; + final NodeSorter sorter = newNodeSorter(); + final AllocationDeciders deciders = allocation.deciders(); + final ModelNode[] modelNodes = sorter.modelNodes; + final float[] weights = sorter.weights; + for (String index : buildWeightOrderedIndices(sorter)) { + IndexMetaData indexMetaData = metaData.index(index); - private boolean balance(boolean onlyAssign) { - if (this.nodes.isEmpty()) { - /* with no nodes this is pointless */ - return false; - } - if (logger.isTraceEnabled()) { - if (onlyAssign) { - logger.trace("Start balancing cluster"); - } else { - logger.trace("Start assigning unassigned shards"); + // find nodes that have a shard of this index or where shards of this index are allowed to stay + // move these nodes to the front of modelNodes so that we can only balance based on these nodes + int relevantNodes = 0; + for (int i = 0; i < modelNodes.length; i++) { + ModelNode modelNode = modelNodes[i]; + if (modelNode.getIndex(index) != null + || deciders.canAllocate(indexMetaData, modelNode.getRoutingNode(), allocation).type() != Type.NO) { + // swap nodes at position i and relevantNodes + modelNodes[i] = modelNodes[relevantNodes]; + modelNodes[relevantNodes] = modelNode; + relevantNodes++; + } } - } - final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned(); - boolean changed = initialize(routingNodes, unassigned); - if (onlyAssign == false && changed == false && allocation.deciders().canRebalance(allocation).type() == Type.YES) { - NodeSorter sorter = newNodeSorter(); - if (nodes.size() > 1) { /* skip if we only have one node */ - AllocationDeciders deciders = allocation.deciders(); - final ModelNode[] modelNodes = sorter.modelNodes; - final float[] weights = sorter.weights; - for (String index : buildWeightOrderedIndices(sorter)) { - IndexMetaData indexMetaData = metaData.index(index); - // find nodes that have a shard of this index or where shards of this index are allowed to stay - // move these nodes to the front of modelNodes so that we can only balance based on these nodes - int relevantNodes = 0; - for (int i = 0; i < modelNodes.length; i++) { - ModelNode modelNode = modelNodes[i]; - if (modelNode.getIndex(index) != null - || deciders.canAllocate(indexMetaData, modelNode.getRoutingNode(routingNodes), allocation).type() != Type.NO) { - // swap nodes at position i and relevantNodes - modelNodes[i] = modelNodes[relevantNodes]; - modelNodes[relevantNodes] = modelNode; - relevantNodes++; + if (relevantNodes < 2) { + continue; + } + + sorter.reset(index, 0, relevantNodes); + int lowIdx = 0; + int highIdx = relevantNodes - 1; + while (true) { + final ModelNode minNode = modelNodes[lowIdx]; + final ModelNode maxNode = modelNodes[highIdx]; + advance_range: + if (maxNode.numShards(index) > 0) { + final float delta = absDelta(weights[lowIdx], weights[highIdx]); + if (lessThan(delta, threshold)) { + if (lowIdx > 0 && highIdx-1 > 0 // is there a chance for a higher delta? + && (absDelta(weights[0], weights[highIdx-1]) > threshold) // check if we need to break at all + ) { + /* This is a special case if allocations from the "heaviest" to the "lighter" nodes is not possible + * due to some allocation decider restrictions like zone awareness. if one zone has for instance + * less nodes than another zone. so one zone is horribly overloaded from a balanced perspective but we + * can't move to the "lighter" shards since otherwise the zone would go over capacity. + * + * This break jumps straight to the condition below were we start moving from the high index towards + * the low index to shrink the window we are considering for balance from the other direction. + * (check shrinking the window from MAX to MIN) + * See #3580 + */ + break advance_range; } + if (logger.isTraceEnabled()) { + logger.trace("Stop balancing index [{}] min_node [{}] weight: [{}] max_node [{}] weight: [{}] delta: [{}]", + index, maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta); + } + break; } - - if (relevantNodes < 2) { + if (logger.isTraceEnabled()) { + logger.trace("Balancing from node [{}] weight: [{}] to node [{}] weight: [{}] delta: [{}]", + maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta); + } + /* pass the delta to the replication function to prevent relocations that only swap the weights of the two nodes. + * a relocation must bring us closer to the balance if we only achieve the same delta the relocation is useless */ + if (tryRelocateShard(minNode, maxNode, index, delta)) { + /* + * TODO we could be a bit smarter here, we don't need to fully sort necessarily + * we could just find the place to insert linearly but the win might be minor + * compared to the added complexity + */ + weights[lowIdx] = sorter.weight(modelNodes[lowIdx]); + weights[highIdx] = sorter.weight(modelNodes[highIdx]); + sorter.sort(0, relevantNodes); + lowIdx = 0; + highIdx = relevantNodes - 1; + changed = true; continue; } - - sorter.reset(index, 0, relevantNodes); - int lowIdx = 0; - int highIdx = relevantNodes - 1; - while (true) { - final ModelNode minNode = modelNodes[lowIdx]; - final ModelNode maxNode = modelNodes[highIdx]; - advance_range: - if (maxNode.numShards(index) > 0) { - final float delta = absDelta(weights[lowIdx], weights[highIdx]); - if (lessThan(delta, threshold)) { - if (lowIdx > 0 && highIdx-1 > 0 // is there a chance for a higher delta? - && (absDelta(weights[0], weights[highIdx-1]) > threshold) // check if we need to break at all - ) { - /* This is a special case if allocations from the "heaviest" to the "lighter" nodes is not possible - * due to some allocation decider restrictions like zone awareness. if one zone has for instance - * less nodes than another zone. so one zone is horribly overloaded from a balanced perspective but we - * can't move to the "lighter" shards since otherwise the zone would go over capacity. - * - * This break jumps straight to the condition below were we start moving from the high index towards - * the low index to shrink the window we are considering for balance from the other direction. - * (check shrinking the window from MAX to MIN) - * See #3580 - */ - break advance_range; - } - if (logger.isTraceEnabled()) { - logger.trace("Stop balancing index [{}] min_node [{}] weight: [{}] max_node [{}] weight: [{}] delta: [{}]", - index, maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta); - } - break; - } - if (logger.isTraceEnabled()) { - logger.trace("Balancing from node [{}] weight: [{}] to node [{}] weight: [{}] delta: [{}]", - maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta); - } - /* pass the delta to the replication function to prevent relocations that only swap the weights of the two nodes. - * a relocation must bring us closer to the balance if we only achieve the same delta the relocation is useless */ - if (tryRelocateShard(minNode, maxNode, index, delta)) { - /* - * TODO we could be a bit smarter here, we don't need to fully sort necessarily - * we could just find the place to insert linearly but the win might be minor - * compared to the added complexity - */ - weights[lowIdx] = sorter.weight(modelNodes[lowIdx]); - weights[highIdx] = sorter.weight(modelNodes[highIdx]); - sorter.sort(0, relevantNodes); - lowIdx = 0; - highIdx = relevantNodes - 1; - changed = true; - continue; - } - } - if (lowIdx < highIdx - 1) { - /* Shrinking the window from MIN to MAX - * we can't move from any shard from the min node lets move on to the next node - * and see if the threshold still holds. We either don't have any shard of this - * index on this node of allocation deciders prevent any relocation.*/ - lowIdx++; - } else if (lowIdx > 0) { - /* Shrinking the window from MAX to MIN - * now we go max to min since obviously we can't move anything to the max node - * lets pick the next highest */ - lowIdx = 0; - highIdx--; - } else { - /* we are done here, we either can't relocate anymore or we are balanced */ - break; - } - } + } + if (lowIdx < highIdx - 1) { + /* Shrinking the window from MIN to MAX + * we can't move from any shard from the min node lets move on to the next node + * and see if the threshold still holds. We either don't have any shard of this + * index on this node of allocation deciders prevent any relocation.*/ + lowIdx++; + } else if (lowIdx > 0) { + /* Shrinking the window from MAX to MIN + * now we go max to min since obviously we can't move anything to the max node + * lets pick the next highest */ + lowIdx = 0; + highIdx--; + } else { + /* we are done here, we either can't relocate anymore or we are balanced */ + break; } } } @@ -450,7 +421,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards * to the nodes we relocated them from. */ private String[] buildWeightOrderedIndices(NodeSorter sorter) { - final String[] indices = this.indices.toArray(new String[this.indices.size()]); + final String[] indices = allocation.routingTable().indicesRouting().keys().toArray(String.class); final float[] deltas = new float[indices.length]; for (int i = 0; i < deltas.length; i++) { sorter.reset(indices[i]); @@ -502,20 +473,16 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards * @return true if the allocation has changed, otherwise false */ public boolean moveShards() { - if (nodes.isEmpty()) { - /* with no nodes this is pointless */ - return false; - } - - // Create a copy of the started shards interleaving between nodes, and check if they can remain. In the presence of throttling + // Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling // shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are // offloading the shards. - List shards = new ArrayList<>(); + boolean changed = false; int index = 0; boolean found = true; + final NodeSorter sorter = newNodeSorter(); while (found) { found = false; - for (RoutingNode routingNode : routingNodes) { + for (RoutingNode routingNode : allocation.routingNodes()) { if (index >= routingNode.size()) { continue; } @@ -523,64 +490,52 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards ShardRouting shardRouting = routingNode.get(index); // we can only move started shards... if (shardRouting.started()) { - shards.add(shardRouting); + final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); + assert sourceNode != null && sourceNode.containsShard(shardRouting); + Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation); + if (decision.type() == Decision.Type.NO) { + changed |= moveShard(sorter, shardRouting, sourceNode, routingNode); + } } } index++; } - if (shards.isEmpty()) { - return false; - } - final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned(); - boolean changed = initialize(routingNodes, unassigned); - if (changed == false) { - final NodeSorter sorter = newNodeSorter(); - final ModelNode[] modelNodes = sorter.modelNodes; - for (ShardRouting shardRouting : shards) { - final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); - assert sourceNode != null && sourceNode.containsShard(shardRouting); - final RoutingNode routingNode = sourceNode.getRoutingNode(routingNodes); - Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation); - if (decision.type() == Decision.Type.NO) { - logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node()); - sorter.reset(shardRouting.getIndexName()); - /* - * the sorter holds the minimum weight node first for the shards index. - * We now walk through the nodes until we find a node to allocate the shard. - * This is not guaranteed to be balanced after this operation we still try best effort to - * allocate on the minimal eligible node. - */ - boolean moved = false; - for (ModelNode currentNode : modelNodes) { - if (currentNode == sourceNode) { - continue; - } - RoutingNode target = currentNode.getRoutingNode(routingNodes); - Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation); - Decision rebalanceDecision = allocation.deciders().canRebalance(shardRouting, allocation); - if (allocationDecision.type() == Type.YES && rebalanceDecision.type() == Type.YES) { // TODO maybe we can respect throttling here too? - Decision sourceDecision = sourceNode.removeShard(shardRouting); - ShardRouting targetRelocatingShard = routingNodes.relocate(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); - // re-add (now relocating shard) to source node - sourceNode.addShard(shardRouting, sourceDecision); - Decision targetDecision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision); - currentNode.addShard(targetRelocatingShard, targetDecision); - if (logger.isTraceEnabled()) { - logger.trace("Moved shard [{}] to node [{}]", shardRouting, routingNode.node()); - } - moved = true; - changed = true; - break; - } - } - if (moved == false) { - logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); + return changed; + } + + /** + * Move started shard to the minimal eligible node with respect to the weight function + * + * @return true if the shard was moved successfully, otherwise false + */ + private boolean moveShard(NodeSorter sorter, ShardRouting shardRouting, ModelNode sourceNode, RoutingNode routingNode) { + logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node()); + sorter.reset(shardRouting.getIndexName()); + /* + * the sorter holds the minimum weight node first for the shards index. + * We now walk through the nodes until we find a node to allocate the shard. + * This is not guaranteed to be balanced after this operation we still try best effort to + * allocate on the minimal eligible node. + */ + for (ModelNode currentNode : sorter.modelNodes) { + if (currentNode != sourceNode) { + RoutingNode target = currentNode.getRoutingNode(); + Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation); + Decision rebalanceDecision = allocation.deciders().canRebalance(shardRouting, allocation); + if (allocationDecision.type() == Type.YES && rebalanceDecision.type() == Type.YES) { // TODO maybe we can respect throttling here too? + sourceNode.removeShard(shardRouting); + ShardRouting targetRelocatingShard = routingNodes.relocate(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); + currentNode.addShard(targetRelocatingShard); + if (logger.isTraceEnabled()) { + logger.trace("Moved shard [{}] to node [{}]", shardRouting, routingNode.node()); } + return true; } } } - return changed; + logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); + return false; } /** @@ -592,18 +547,19 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards * on the target node which we respect during the allocation / balancing * process. In short, this method recreates the status-quo in the cluster. */ - private void buildModelFromAssigned(Iterable shards) { - for (ShardRouting shard : shards) { - assert shard.assignedToNode(); - /* we skip relocating shards here since we expect an initializing shard with the same id coming in */ - if (shard.state() == RELOCATING) { - continue; - } - ModelNode node = nodes.get(shard.currentNodeId()); - assert node != null; - node.addShard(shard, Decision.single(Type.YES, "Already allocated on node", node.getNodeId())); - if (logger.isTraceEnabled()) { - logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId()); + private void buildModelFromAssigned() { + for (RoutingNode rn : routingNodes) { + ModelNode node = new ModelNode(rn); + nodes.put(rn.nodeId(), node); + for (ShardRouting shard : rn) { + assert rn.nodeId().equals(shard.currentNodeId()); + /* we skip relocating shards here since we expect an initializing shard with the same id coming in */ + if (shard.state() != RELOCATING) { + node.addShard(shard); + if (logger.isTraceEnabled()) { + logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId()); + } + } } } } @@ -611,8 +567,11 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards /** * Allocates all given shards on the minimal eligible node for the shards index * with respect to the weight function. All given shards must be unassigned. + * @return true if the current configuration has been + * changed, otherwise false */ - private boolean allocateUnassigned(RoutingNodes.UnassignedShards unassigned) { + private boolean allocateUnassigned() { + RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned(); assert !nodes.isEmpty(); if (logger.isTraceEnabled()) { logger.trace("Start allocating unassigned shards"); @@ -656,7 +615,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards int secondaryLength = 0; int primaryLength = primary.length; ArrayUtil.timSort(primary, comparator); - final Set throttledNodes = Collections.newSetFromMap(new IdentityHashMap()); + final Set throttledNodes = Collections.newSetFromMap(new IdentityHashMap<>()); do { for (int i = 0; i < primaryLength; i++) { ShardRouting shard = primary[i]; @@ -694,7 +653,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards * don't check deciders */ if (currentWeight <= minWeight) { - Decision currentDecision = deciders.canAllocate(shard, node.getRoutingNode(routingNodes), allocation); + Decision currentDecision = deciders.canAllocate(shard, node.getRoutingNode(), allocation); NOUPDATE: if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) { if (currentWeight == minWeight) { @@ -735,7 +694,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards } assert decision != null && minNode != null || decision == null && minNode == null; if (minNode != null) { - minNode.addShard(shard, decision); + minNode.addShard(shard); if (decision.type() == Type.YES) { if (logger.isTraceEnabled()) { logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId()); @@ -744,7 +703,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards changed = true; continue; // don't add to ignoreUnassigned } else { - final RoutingNode node = minNode.getRoutingNode(routingNodes); + final RoutingNode node = minNode.getRoutingNode(); if (deciders.canAllocate(node, allocation).type() != Type.YES) { if (logger.isTraceEnabled()) { logger.trace("Can not allocate on node [{}] remove from round decision [{}]", node, decision.type()); @@ -790,10 +749,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards } ShardRouting candidate = null; final AllocationDeciders deciders = allocation.deciders(); - for (ShardRouting shard : index.getAllShards()) { + for (ShardRouting shard : index) { if (shard.started()) { // skip initializing, unassigned and relocating shards we can't relocate them anyway - Decision allocationDecision = deciders.canAllocate(shard, minNode.getRoutingNode(routingNodes), allocation); + Decision allocationDecision = deciders.canAllocate(shard, minNode.getRoutingNode(), allocation); Decision rebalanceDecision = deciders.canRebalance(shard, allocation); if (((allocationDecision.type() == Type.YES) || (allocationDecision.type() == Type.THROTTLE)) && ((rebalanceDecision.type() == Type.YES) || (rebalanceDecision.type() == Type.THROTTLE))) { @@ -814,24 +773,17 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards } if (candidate != null) { - /* allocate on the model even if not throttled */ maxNode.removeShard(candidate); - minNode.addShard(candidate, decision); + minNode.addShard(candidate); if (decision.type() == Type.YES) { /* only allocate on the cluster if we are not throttled */ if (logger.isTraceEnabled()) { logger.trace("Relocate shard [{}] from node [{}] to node [{}]", candidate, maxNode.getNodeId(), minNode.getNodeId()); } - /* now allocate on the cluster - if we are started we need to relocate the shard */ - if (candidate.started()) { - routingNodes.relocate(candidate, minNode.getNodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); - - } else { - routingNodes.initialize(candidate, minNode.getNodeId(), null, allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); - } + /* now allocate on the cluster */ + routingNodes.relocate(candidate, minNode.getNodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); return true; - } } } @@ -845,14 +797,12 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards } static class ModelNode implements Iterable { - private final String id; private final Map indices = new HashMap<>(); private int numShards = 0; - // lazily calculated - private RoutingNode routingNode; + private final RoutingNode routingNode; - public ModelNode(String id) { - this.id = id; + public ModelNode(RoutingNode routingNode) { + this.routingNode = routingNode; } public ModelIndex getIndex(String indexId) { @@ -860,13 +810,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards } public String getNodeId() { - return id; + return routingNode.nodeId(); } - public RoutingNode getRoutingNode(RoutingNodes routingNodes) { - if (routingNode == null) { - routingNode = routingNodes.node(id); - } + public RoutingNode getRoutingNode() { return routingNode; } @@ -887,33 +834,31 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards return -1; } - public void addShard(ShardRouting shard, Decision decision) { + public void addShard(ShardRouting shard) { ModelIndex index = indices.get(shard.getIndexName()); if (index == null) { index = new ModelIndex(shard.getIndexName()); indices.put(index.getIndexId(), index); } - index.addShard(shard, decision); + index.addShard(shard); numShards++; } - public Decision removeShard(ShardRouting shard) { + public void removeShard(ShardRouting shard) { ModelIndex index = indices.get(shard.getIndexName()); - Decision removed = null; if (index != null) { - removed = index.removeShard(shard); - if (removed != null && index.numShards() == 0) { + index.removeShard(shard); + if (index.numShards() == 0) { indices.remove(shard.getIndexName()); } } numShards--; - return removed; } @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("Node(").append(id).append(")"); + sb.append("Node(").append(routingNode.nodeId()).append(")"); return sb.toString(); } @@ -929,9 +874,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards } - static final class ModelIndex { + static final class ModelIndex implements Iterable { private final String id; - private final Map shards = new HashMap<>(); + private final Set shards = new HashSet<>(4); // expect few shards of same index to be allocated on same node private int highestPrimary = -1; public ModelIndex(String id) { @@ -941,7 +886,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards public int highestPrimary() { if (highestPrimary == -1) { int maxId = -1; - for (ShardRouting shard : shards.keySet()) { + for (ShardRouting shard : shards) { if (shard.primary()) { maxId = Math.max(maxId, shard.id()); } @@ -959,24 +904,25 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards return shards.size(); } - public Collection getAllShards() { - return shards.keySet(); + @Override + public Iterator iterator() { + return shards.iterator(); } - public Decision removeShard(ShardRouting shard) { + public void removeShard(ShardRouting shard) { highestPrimary = -1; - return shards.remove(shard); + assert shards.contains(shard) : "Shard not allocated on current node: " + shard; + shards.remove(shard); } - public void addShard(ShardRouting shard, Decision decision) { + public void addShard(ShardRouting shard) { highestPrimary = -1; - assert decision != null; - assert !shards.containsKey(shard) : "Shard already allocated on current node: " + shards.get(shard) + " " + shard; - shards.put(shard, decision); + assert !shards.contains(shard) : "Shard already allocated on current node: " + shard; + shards.add(shard); } public boolean containsShard(ShardRouting shard) { - return shards.containsKey(shard); + return shards.contains(shard); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java index 2656e2e3167..0bf07e8cba9 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java @@ -25,15 +25,16 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; *

* A {@link ShardsAllocator} is the main entry point for shard allocation on nodes in the cluster. * The allocator makes basic decision where a shard instance will be allocated, if already allocated instances - * need relocate to other nodes due to node failures or due to rebalancing decisions. + * need to relocate to other nodes due to node failures or due to rebalancing decisions. *

*/ public interface ShardsAllocator { /** - * Assign all unassigned shards to nodes - * Move started shards that can not be allocated to a node anymore - * Rebalancing number of shards on all nodes + * Allocates shards to nodes in the cluster. An implementation of this method should: + * - assign unassigned shards + * - relocate shards that cannot stay on a node anymore + * - relocate shards to find a good shard balance in the cluster * * @param allocation current node allocation * @return true if the allocation has changed, otherwise false