move rebalance only when active logic into node allocation, add canRebalance hook point

This commit is contained in:
kimchy 2010-08-23 18:26:17 +03:00
parent 0e47898902
commit 4300a6ca18
9 changed files with 87 additions and 26 deletions

View File

@ -23,15 +23,17 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
/** /**
* A pluggable logic allowing to control if allocation of a shard is allowed on a specific node. * A pluggable logic allowing to control if allocation of a shard is allowed on a specific node.
* *
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public interface NodeAllocation { public abstract class NodeAllocation extends AbstractComponent {
enum Decision { public static enum Decision {
YES { YES {
@Override public boolean allocate() { @Override public boolean allocate() {
return true; return true;
@ -48,7 +50,19 @@ public interface NodeAllocation {
public abstract boolean allocate(); public abstract boolean allocate();
} }
boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes); protected NodeAllocation(Settings settings) {
super(settings);
}
Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes); public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
return false;
}
public boolean canRebalance(ShardRouting shardRouting, RoutingNodes routingNodes, DiscoveryNodes nodes) {
return true;
}
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) {
return Decision.YES;
}
} }

View File

@ -24,7 +24,6 @@ import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -35,7 +34,7 @@ import java.util.Set;
* *
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class NodeAllocations extends AbstractComponent implements NodeAllocation { public class NodeAllocations extends NodeAllocation {
private final NodeAllocation[] allocations; private final NodeAllocation[] allocations;
@ -44,6 +43,7 @@ public class NodeAllocations extends AbstractComponent implements NodeAllocation
.add(new SameShardNodeAllocation(settings)) .add(new SameShardNodeAllocation(settings))
.add(new ReplicaAfterPrimaryActiveNodeAllocation(settings)) .add(new ReplicaAfterPrimaryActiveNodeAllocation(settings))
.add(new ThrottlingNodeAllocation(settings)) .add(new ThrottlingNodeAllocation(settings))
.add(new RebalanceOnlyWhenActiveNodeAllocation(settings))
.build() .build()
); );
} }
@ -53,6 +53,15 @@ public class NodeAllocations extends AbstractComponent implements NodeAllocation
this.allocations = allocations.toArray(new NodeAllocation[allocations.size()]); this.allocations = allocations.toArray(new NodeAllocation[allocations.size()]);
} }
@Override public boolean canRebalance(ShardRouting shardRouting, RoutingNodes routingNodes, DiscoveryNodes nodes) {
for (NodeAllocation allocation : allocations) {
if (!allocation.canRebalance(shardRouting, routingNodes, nodes)) {
return false;
}
}
return true;
}
@Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { @Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
boolean changed = false; boolean changed = false;
for (NodeAllocation allocation : allocations) { for (NodeAllocation allocation : allocations) {

View File

@ -0,0 +1,50 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import java.util.List;
/**
* Only allow rebalancing when all shards are active within the shard replication group.
*
* @author kimchy (shay.banon)
*/
public class RebalanceOnlyWhenActiveNodeAllocation extends NodeAllocation {
public RebalanceOnlyWhenActiveNodeAllocation(Settings settings) {
super(settings);
}
@Override public boolean canRebalance(ShardRouting shardRouting, RoutingNodes routingNodes, DiscoveryNodes nodes) {
List<MutableShardRouting> shards = routingNodes.shardsRoutingFor(shardRouting);
for (ShardRouting allShard : shards) {
if (!allShard.active()) {
return false;
}
}
return true;
}
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -33,7 +32,7 @@ import org.elasticsearch.common.settings.Settings;
* *
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class ReplicaAfterPrimaryActiveNodeAllocation extends AbstractComponent implements NodeAllocation { public class ReplicaAfterPrimaryActiveNodeAllocation extends NodeAllocation {
@Inject public ReplicaAfterPrimaryActiveNodeAllocation(Settings settings) { @Inject public ReplicaAfterPrimaryActiveNodeAllocation(Settings settings) {
super(settings); super(settings);

View File

@ -24,7 +24,6 @@ import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -33,7 +32,7 @@ import org.elasticsearch.common.settings.Settings;
* *
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class SameShardNodeAllocation extends AbstractComponent implements NodeAllocation { public class SameShardNodeAllocation extends NodeAllocation {
@Inject public SameShardNodeAllocation(Settings settings) { @Inject public SameShardNodeAllocation(Settings settings) {
super(settings); super(settings);

View File

@ -47,6 +47,7 @@ public class ShardAllocationModule extends AbstractModule {
allocationMultibinder.addBinding().to(SameShardNodeAllocation.class); allocationMultibinder.addBinding().to(SameShardNodeAllocation.class);
allocationMultibinder.addBinding().to(ReplicaAfterPrimaryActiveNodeAllocation.class); allocationMultibinder.addBinding().to(ReplicaAfterPrimaryActiveNodeAllocation.class);
allocationMultibinder.addBinding().to(ThrottlingNodeAllocation.class); allocationMultibinder.addBinding().to(ThrottlingNodeAllocation.class);
allocationMultibinder.addBinding().to(RebalanceOnlyWhenActiveNodeAllocation.class);
for (Class<? extends NodeAllocation> allocation : allocations) { for (Class<? extends NodeAllocation> allocation : allocations) {
allocationMultibinder.addBinding().to(allocation); allocationMultibinder.addBinding().to(allocation);
} }

View File

@ -120,12 +120,12 @@ public class ShardsAllocation extends AbstractComponent {
} }
// rebalance // rebalance
changed |= rebalance(routingNodes); changed |= rebalance(routingNodes, nodes);
return changed; return changed;
} }
private boolean rebalance(RoutingNodes routingNodes) { private boolean rebalance(RoutingNodes routingNodes, DiscoveryNodes nodes) {
boolean changed = false; boolean changed = false;
List<RoutingNode> sortedNodesLeastToHigh = routingNodes.sortedNodesLeastToHigh(); List<RoutingNode> sortedNodesLeastToHigh = routingNodes.sortedNodesLeastToHigh();
if (sortedNodesLeastToHigh.isEmpty()) { if (sortedNodesLeastToHigh.isEmpty()) {
@ -155,16 +155,7 @@ public class ShardsAllocation extends AbstractComponent {
boolean relocated = false; boolean relocated = false;
List<MutableShardRouting> startedShards = highRoutingNode.shardsWithState(STARTED); List<MutableShardRouting> startedShards = highRoutingNode.shardsWithState(STARTED);
for (MutableShardRouting startedShard : startedShards) { for (MutableShardRouting startedShard : startedShards) {
// we only relocate shards that all other shards within the replication group are active if (!nodeAllocations.canRebalance(startedShard, routingNodes, nodes)) {
List<MutableShardRouting> allShards = routingNodes.shardsRoutingFor(startedShard);
boolean ignoreShard = false;
for (MutableShardRouting allShard : allShards) {
if (!allShard.active()) {
ignoreShard = true;
break;
}
}
if (ignoreShard) {
continue; continue;
} }

View File

@ -21,14 +21,13 @@ package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class ThrottlingNodeAllocation extends AbstractComponent implements NodeAllocation { public class ThrottlingNodeAllocation extends NodeAllocation {
private final int concurrentRecoveries; private final int concurrentRecoveries;

View File

@ -28,7 +28,6 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.NodeAllocation; import org.elasticsearch.cluster.routing.allocation.NodeAllocation;
import org.elasticsearch.cluster.routing.allocation.NodeAllocations; import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
@ -47,7 +46,7 @@ import java.util.Iterator;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class BlobReuseExistingNodeAllocation extends AbstractComponent implements NodeAllocation { public class BlobReuseExistingNodeAllocation extends NodeAllocation {
private final IndicesService indicesService; private final IndicesService indicesService;