Cache result of RoutingNodes.node(...) in ModelNode
This commit is contained in:
parent
eea791de15
commit
7e134da7d3
|
@ -349,7 +349,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|||
for (int i = 0; i < modelNodes.length; i++) {
|
||||
ModelNode modelNode = modelNodes[i];
|
||||
if (modelNode.getIndex(index) != null
|
||||
|| deciders.canAllocate(indexMetaData, routingNodes.node(modelNode.getNodeId()), allocation).type() != Type.NO) {
|
||||
|| deciders.canAllocate(indexMetaData, modelNode.getRoutingNode(routingNodes), allocation).type() != Type.NO) {
|
||||
// swap nodes at position i and relevantNodes
|
||||
modelNodes[i] = modelNodes[relevantNodes];
|
||||
modelNodes[relevantNodes] = modelNode;
|
||||
|
@ -527,7 +527,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|||
if (currentNode.getNodeId().equals(node.nodeId())) {
|
||||
continue;
|
||||
}
|
||||
RoutingNode target = routingNodes.node(currentNode.getNodeId());
|
||||
RoutingNode target = currentNode.getRoutingNode(routingNodes);
|
||||
Decision allocationDecision = allocation.deciders().canAllocate(shard, target, allocation);
|
||||
Decision rebalanceDecision = allocation.deciders().canRebalance(shard, allocation);
|
||||
Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
|
||||
|
@ -661,7 +661,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|||
* don't check deciders
|
||||
*/
|
||||
if (currentWeight <= minWeight) {
|
||||
Decision currentDecision = deciders.canAllocate(shard, routingNodes.node(node.getNodeId()), allocation);
|
||||
Decision currentDecision = deciders.canAllocate(shard, node.getRoutingNode(routingNodes), allocation);
|
||||
NOUPDATE:
|
||||
if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) {
|
||||
if (currentWeight == minWeight) {
|
||||
|
@ -707,11 +707,11 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
|
||||
}
|
||||
routingNodes.initialize(shard, routingNodes.node(minNode.getNodeId()).nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
||||
routingNodes.initialize(shard, minNode.getNodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
||||
changed = true;
|
||||
continue; // don't add to ignoreUnassigned
|
||||
} else {
|
||||
final RoutingNode node = routingNodes.node(minNode.getNodeId());
|
||||
final RoutingNode node = minNode.getRoutingNode(routingNodes);
|
||||
if (deciders.canAllocate(node, allocation).type() != Type.YES) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Can not allocate on node [{}] remove from round decision [{}]", node, decision.type());
|
||||
|
@ -755,13 +755,12 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|||
logger.trace("Try relocating shard for index index [{}] from node [{}] to node [{}]", idx, maxNode.getNodeId(),
|
||||
minNode.getNodeId());
|
||||
}
|
||||
final RoutingNode node = routingNodes.node(minNode.getNodeId());
|
||||
ShardRouting candidate = null;
|
||||
final AllocationDeciders deciders = allocation.deciders();
|
||||
for (ShardRouting shard : index.getAllShards()) {
|
||||
if (shard.started()) {
|
||||
// skip initializing, unassigned and relocating shards we can't relocate them anyway
|
||||
Decision allocationDecision = deciders.canAllocate(shard, node, allocation);
|
||||
Decision allocationDecision = deciders.canAllocate(shard, minNode.getRoutingNode(routingNodes), allocation);
|
||||
Decision rebalanceDecision = deciders.canRebalance(shard, allocation);
|
||||
if (((allocationDecision.type() == Type.YES) || (allocationDecision.type() == Type.THROTTLE))
|
||||
&& ((rebalanceDecision.type() == Type.YES) || (rebalanceDecision.type() == Type.THROTTLE))) {
|
||||
|
@ -793,11 +792,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|||
}
|
||||
/* now allocate on the cluster - if we are started we need to relocate the shard */
|
||||
if (candidate.started()) {
|
||||
RoutingNode lowRoutingNode = routingNodes.node(minNode.getNodeId());
|
||||
routingNodes.relocate(candidate, lowRoutingNode.nodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
||||
routingNodes.relocate(candidate, minNode.getNodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
||||
|
||||
} else {
|
||||
routingNodes.initialize(candidate, routingNodes.node(minNode.getNodeId()).nodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
||||
routingNodes.initialize(candidate, minNode.getNodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
||||
}
|
||||
return true;
|
||||
|
||||
|
@ -817,6 +815,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|||
private final String id;
|
||||
private final Map<String, ModelIndex> indices = new HashMap<>();
|
||||
private int numShards = 0;
|
||||
// lazily calculated
|
||||
private RoutingNode routingNode;
|
||||
|
||||
public ModelNode(String id) {
|
||||
this.id = id;
|
||||
|
@ -830,6 +830,13 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|||
return id;
|
||||
}
|
||||
|
||||
public RoutingNode getRoutingNode(RoutingNodes routingNodes) {
|
||||
if (routingNode == null) {
|
||||
routingNode = routingNodes.node(id);
|
||||
}
|
||||
return routingNode;
|
||||
}
|
||||
|
||||
public int numShards() {
|
||||
return numShards;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue