externalize shard allocation decision to a separate module
This commit is contained in:
parent
d86c116273
commit
203564a5b0
|
@ -20,12 +20,13 @@
|
|||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocatorModule;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecidersModule;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.collect.Lists;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
import org.elasticsearch.common.inject.SpawnModules;
|
||||
import org.elasticsearch.common.inject.multibindings.Multibinder;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -37,34 +38,17 @@ public class AllocationModule extends AbstractModule implements SpawnModules {
|
|||
|
||||
private final Settings settings;
|
||||
|
||||
private List<Class<? extends NodeAllocation>> allocations = Lists.newArrayList();
|
||||
private List<Class<? extends AllocationDecider>> allocations = Lists.newArrayList();
|
||||
|
||||
public AllocationModule(Settings settings) {
|
||||
this.settings = settings;
|
||||
}
|
||||
|
||||
@Override public Iterable<? extends Module> spawnModules() {
|
||||
return ImmutableList.of(new ShardsAllocatorModule(settings));
|
||||
}
|
||||
|
||||
public void addNodeAllocation(Class<? extends NodeAllocation> nodeAllocation) {
|
||||
allocations.add(nodeAllocation);
|
||||
return ImmutableList.of(new ShardsAllocatorModule(settings), new AllocationDecidersModule(settings));
|
||||
}
|
||||
|
||||
@Override protected void configure() {
|
||||
bind(AllocationService.class).asEagerSingleton();
|
||||
|
||||
Multibinder<NodeAllocation> allocationMultibinder = Multibinder.newSetBinder(binder(), NodeAllocation.class);
|
||||
allocationMultibinder.addBinding().to(SameShardNodeAllocation.class);
|
||||
allocationMultibinder.addBinding().to(ReplicaAfterPrimaryActiveNodeAllocation.class);
|
||||
allocationMultibinder.addBinding().to(ThrottlingNodeAllocation.class);
|
||||
allocationMultibinder.addBinding().to(RebalanceOnlyWhenActiveNodeAllocation.class);
|
||||
allocationMultibinder.addBinding().to(ClusterRebalanceNodeAllocation.class);
|
||||
allocationMultibinder.addBinding().to(ConcurrentRebalanceNodeAllocation.class);
|
||||
for (Class<? extends NodeAllocation> allocation : allocations) {
|
||||
allocationMultibinder.addBinding().to(allocation);
|
||||
}
|
||||
|
||||
bind(NodeAllocations.class).asEagerSingleton();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.cluster.routing.RoutingTable;
|
|||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
|
@ -47,7 +48,7 @@ import static org.elasticsearch.common.collect.Sets.*;
|
|||
*/
|
||||
public class AllocationService extends AbstractComponent {
|
||||
|
||||
private final NodeAllocations nodeAllocations;
|
||||
private final AllocationDeciders allocationDeciders;
|
||||
|
||||
private final ShardsAllocators shardsAllocators;
|
||||
|
||||
|
@ -57,14 +58,14 @@ public class AllocationService extends AbstractComponent {
|
|||
|
||||
public AllocationService(Settings settings) {
|
||||
this(settings,
|
||||
new NodeAllocations(settings, new NodeSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS)),
|
||||
new AllocationDeciders(settings, new NodeSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS)),
|
||||
new ShardsAllocators(settings)
|
||||
);
|
||||
}
|
||||
|
||||
@Inject public AllocationService(Settings settings, NodeAllocations nodeAllocations, ShardsAllocators shardsAllocators) {
|
||||
@Inject public AllocationService(Settings settings, AllocationDeciders allocationDeciders, ShardsAllocators shardsAllocators) {
|
||||
super(settings);
|
||||
this.nodeAllocations = nodeAllocations;
|
||||
this.allocationDeciders = allocationDeciders;
|
||||
this.shardsAllocators = shardsAllocators;
|
||||
}
|
||||
|
||||
|
@ -75,12 +76,12 @@ public class AllocationService extends AbstractComponent {
|
|||
*/
|
||||
public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List<? extends ShardRouting> startedShards) {
|
||||
RoutingNodes routingNodes = clusterState.routingNodes();
|
||||
StartedRerouteAllocation allocation = new StartedRerouteAllocation(routingNodes, clusterState.nodes(), startedShards);
|
||||
StartedRerouteAllocation allocation = new StartedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), startedShards);
|
||||
boolean changed = applyStartedShards(routingNodes, startedShards);
|
||||
if (!changed) {
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
|
||||
}
|
||||
shardsAllocators.applyStartedShards(nodeAllocations, allocation);
|
||||
shardsAllocators.applyStartedShards(allocation);
|
||||
reroute(allocation);
|
||||
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation());
|
||||
}
|
||||
|
@ -92,12 +93,12 @@ public class AllocationService extends AbstractComponent {
|
|||
*/
|
||||
public RoutingAllocation.Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) {
|
||||
RoutingNodes routingNodes = clusterState.routingNodes();
|
||||
FailedRerouteAllocation allocation = new FailedRerouteAllocation(routingNodes, clusterState.nodes(), failedShard);
|
||||
FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), failedShard);
|
||||
boolean changed = applyFailedShard(allocation);
|
||||
if (!changed) {
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
|
||||
}
|
||||
shardsAllocators.applyFailedShards(nodeAllocations, allocation);
|
||||
shardsAllocators.applyFailedShards(allocation);
|
||||
reroute(allocation);
|
||||
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation());
|
||||
}
|
||||
|
@ -109,7 +110,7 @@ public class AllocationService extends AbstractComponent {
|
|||
*/
|
||||
public RoutingAllocation.Result reroute(ClusterState clusterState) {
|
||||
RoutingNodes routingNodes = clusterState.routingNodes();
|
||||
RoutingAllocation allocation = new RoutingAllocation(routingNodes, clusterState.nodes());
|
||||
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes());
|
||||
if (!reroute(allocation)) {
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
|
||||
}
|
||||
|
@ -123,7 +124,7 @@ public class AllocationService extends AbstractComponent {
|
|||
*/
|
||||
public RoutingAllocation.Result rerouteWithNoReassign(ClusterState clusterState) {
|
||||
RoutingNodes routingNodes = clusterState.routingNodes();
|
||||
RoutingAllocation allocation = new RoutingAllocation(routingNodes, clusterState.nodes());
|
||||
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes());
|
||||
Iterable<DiscoveryNode> dataNodes = allocation.nodes().dataNodes().values();
|
||||
boolean changed = false;
|
||||
// first, clear from the shards any node id they used to belong to that is now dead
|
||||
|
@ -158,13 +159,13 @@ public class AllocationService extends AbstractComponent {
|
|||
|
||||
// now allocate all the unassigned to available nodes
|
||||
if (allocation.routingNodes().hasUnassigned()) {
|
||||
changed |= shardsAllocators.allocateUnassigned(nodeAllocations, allocation);
|
||||
changed |= shardsAllocators.allocateUnassigned(allocation);
|
||||
// elect primaries again, in case this is needed with unassigned allocation
|
||||
changed |= electPrimaries(allocation.routingNodes());
|
||||
}
|
||||
|
||||
// rebalance
|
||||
changed |= shardsAllocators.rebalance(nodeAllocations, allocation);
|
||||
changed |= shardsAllocators.rebalance(allocation);
|
||||
|
||||
return changed;
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing.allocation;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
|
@ -30,8 +31,8 @@ public class FailedRerouteAllocation extends RoutingAllocation {
|
|||
|
||||
private final ShardRouting failedShard;
|
||||
|
||||
public FailedRerouteAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes, ShardRouting failedShard) {
|
||||
super(routingNodes, nodes);
|
||||
public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, ShardRouting failedShard) {
|
||||
super(deciders, routingNodes, nodes);
|
||||
this.failedShard = failedShard;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing.allocation;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
@ -59,6 +60,8 @@ public class RoutingAllocation {
|
|||
}
|
||||
}
|
||||
|
||||
private final AllocationDeciders deciders;
|
||||
|
||||
private final RoutingNodes routingNodes;
|
||||
|
||||
private final DiscoveryNodes nodes;
|
||||
|
@ -67,11 +70,16 @@ public class RoutingAllocation {
|
|||
|
||||
private Map<ShardId, String> ignoredShardToNodes = null;
|
||||
|
||||
public RoutingAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes) {
|
||||
public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes) {
|
||||
this.deciders = deciders;
|
||||
this.routingNodes = routingNodes;
|
||||
this.nodes = nodes;
|
||||
}
|
||||
|
||||
public AllocationDeciders deciders() {
|
||||
return this.deciders;
|
||||
}
|
||||
|
||||
public RoutingTable routingTable() {
|
||||
return routingNodes.routingTable();
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing.allocation;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
|
@ -32,8 +33,8 @@ public class StartedRerouteAllocation extends RoutingAllocation {
|
|||
|
||||
private final List<? extends ShardRouting> startedShards;
|
||||
|
||||
public StartedRerouteAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> startedShards) {
|
||||
super(routingNodes, nodes);
|
||||
public StartedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> startedShards) {
|
||||
super(deciders, routingNodes, nodes);
|
||||
this.startedShards = startedShards;
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.elasticsearch.cluster.routing.MutableShardRouting;
|
|||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
|
@ -43,13 +42,13 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard
|
|||
super(settings);
|
||||
}
|
||||
|
||||
@Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) {
|
||||
@Override public void applyStartedShards(StartedRerouteAllocation allocation) {
|
||||
}
|
||||
|
||||
@Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) {
|
||||
@Override public void applyFailedShards(FailedRerouteAllocation allocation) {
|
||||
}
|
||||
|
||||
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
|
||||
@Override public boolean allocateUnassigned(RoutingAllocation allocation) {
|
||||
boolean changed = false;
|
||||
RoutingNodes routingNodes = allocation.routingNodes();
|
||||
|
||||
|
@ -69,7 +68,7 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard
|
|||
lastNode = 0;
|
||||
}
|
||||
|
||||
if (nodeAllocations.canAllocate(shard, node, allocation).allocate()) {
|
||||
if (allocation.deciders().canAllocate(shard, node, allocation).allocate()) {
|
||||
int numberOfShardsToAllocate = routingNodes.requiredAverageNumberOfShardsPerNode() - node.shards().size();
|
||||
if (numberOfShardsToAllocate <= 0) {
|
||||
continue;
|
||||
|
@ -88,7 +87,7 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard
|
|||
MutableShardRouting shard = it.next();
|
||||
// go over the nodes and try and allocate the remaining ones
|
||||
for (RoutingNode routingNode : routingNodes.sortedNodesLeastToHigh()) {
|
||||
if (nodeAllocations.canAllocate(shard, routingNode, allocation).allocate()) {
|
||||
if (allocation.deciders().canAllocate(shard, routingNode, allocation).allocate()) {
|
||||
changed = true;
|
||||
routingNode.add(shard);
|
||||
it.remove();
|
||||
|
@ -99,7 +98,7 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard
|
|||
return changed;
|
||||
}
|
||||
|
||||
@Override public boolean rebalance(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
|
||||
@Override public boolean rebalance(RoutingAllocation allocation) {
|
||||
boolean changed = false;
|
||||
List<RoutingNode> sortedNodesLeastToHigh = allocation.routingNodes().sortedNodesLeastToHigh();
|
||||
if (sortedNodesLeastToHigh.isEmpty()) {
|
||||
|
@ -129,11 +128,11 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard
|
|||
boolean relocated = false;
|
||||
List<MutableShardRouting> startedShards = highRoutingNode.shardsWithState(STARTED);
|
||||
for (MutableShardRouting startedShard : startedShards) {
|
||||
if (!nodeAllocations.canRebalance(startedShard, allocation)) {
|
||||
if (!allocation.deciders().canRebalance(startedShard, allocation)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (nodeAllocations.canAllocate(startedShard, lowRoutingNode, allocation).allocate()) {
|
||||
if (allocation.deciders().canAllocate(startedShard, lowRoutingNode, allocation).allocate()) {
|
||||
changed = true;
|
||||
lowRoutingNode.add(new MutableShardRouting(startedShard.index(), startedShard.id(),
|
||||
lowRoutingNode.nodeId(), startedShard.currentNodeId(),
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.cluster.routing.allocation.allocator;
|
||||
|
||||
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
||||
|
||||
|
@ -29,9 +28,9 @@ import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
|||
*/
|
||||
public interface GatewayAllocator {
|
||||
|
||||
void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation);
|
||||
void applyStartedShards(StartedRerouteAllocation allocation);
|
||||
|
||||
void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation);
|
||||
void applyFailedShards(FailedRerouteAllocation allocation);
|
||||
|
||||
boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation);
|
||||
boolean allocateUnassigned(RoutingAllocation allocation);
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.cluster.routing.allocation.allocator;
|
||||
|
||||
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
||||
|
||||
|
@ -28,11 +27,11 @@ import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
|||
*/
|
||||
public interface ShardsAllocator {
|
||||
|
||||
void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation);
|
||||
void applyStartedShards(StartedRerouteAllocation allocation);
|
||||
|
||||
void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation);
|
||||
void applyFailedShards(FailedRerouteAllocation allocation);
|
||||
|
||||
boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation);
|
||||
boolean allocateUnassigned(RoutingAllocation allocation);
|
||||
|
||||
boolean rebalance(NodeAllocations nodeAllocations, RoutingAllocation allocation);
|
||||
boolean rebalance(RoutingAllocation allocation);
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.cluster.routing.allocation.allocator;
|
||||
|
||||
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
|
@ -50,24 +49,24 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat
|
|||
this.allocator = allocator;
|
||||
}
|
||||
|
||||
@Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) {
|
||||
gatewayAllocator.applyStartedShards(nodeAllocations, allocation);
|
||||
allocator.applyStartedShards(nodeAllocations, allocation);
|
||||
@Override public void applyStartedShards(StartedRerouteAllocation allocation) {
|
||||
gatewayAllocator.applyStartedShards(allocation);
|
||||
allocator.applyStartedShards(allocation);
|
||||
}
|
||||
|
||||
@Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) {
|
||||
gatewayAllocator.applyFailedShards(nodeAllocations, allocation);
|
||||
allocator.applyFailedShards(nodeAllocations, allocation);
|
||||
@Override public void applyFailedShards(FailedRerouteAllocation allocation) {
|
||||
gatewayAllocator.applyFailedShards(allocation);
|
||||
allocator.applyFailedShards(allocation);
|
||||
}
|
||||
|
||||
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
|
||||
@Override public boolean allocateUnassigned(RoutingAllocation allocation) {
|
||||
boolean changed = false;
|
||||
changed |= gatewayAllocator.allocateUnassigned(nodeAllocations, allocation);
|
||||
changed |= allocator.allocateUnassigned(nodeAllocations, allocation);
|
||||
changed |= gatewayAllocator.allocateUnassigned(allocation);
|
||||
changed |= allocator.allocateUnassigned(allocation);
|
||||
return changed;
|
||||
}
|
||||
|
||||
@Override public boolean rebalance(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
|
||||
return allocator.rebalance(nodeAllocations, allocation);
|
||||
@Override public boolean rebalance(RoutingAllocation allocation) {
|
||||
return allocator.rebalance(allocation);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,10 +17,11 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
package org.elasticsearch.cluster.routing.allocation.decider;
|
||||
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
|
@ -29,7 +30,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
*
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public abstract class NodeAllocation extends AbstractComponent {
|
||||
public abstract class AllocationDecider extends AbstractComponent {
|
||||
|
||||
public static enum Decision {
|
||||
YES {
|
||||
|
@ -51,7 +52,7 @@ public abstract class NodeAllocation extends AbstractComponent {
|
|||
public abstract boolean allocate();
|
||||
}
|
||||
|
||||
protected NodeAllocation(Settings settings) {
|
||||
protected AllocationDecider(Settings settings) {
|
||||
super(settings);
|
||||
}
|
||||
|
|
@ -17,10 +17,11 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
package org.elasticsearch.cluster.routing.allocation.decider;
|
||||
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.common.collect.ImmutableSet;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -29,33 +30,31 @@ import org.elasticsearch.node.settings.NodeSettingsService;
|
|||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Holds several {@link NodeAllocation}s and combines them into a single allocation decision.
|
||||
*
|
||||
* @author kimchy (shay.banon)
|
||||
* Holds several {@link AllocationDecider}s and combines them into a single allocation decision.
|
||||
*/
|
||||
public class NodeAllocations extends NodeAllocation {
|
||||
public class AllocationDeciders extends AllocationDecider {
|
||||
|
||||
private final NodeAllocation[] allocations;
|
||||
private final AllocationDecider[] allocations;
|
||||
|
||||
public NodeAllocations(Settings settings, NodeSettingsService nodeSettingsService) {
|
||||
this(settings, ImmutableSet.<NodeAllocation>builder()
|
||||
.add(new SameShardNodeAllocation(settings))
|
||||
.add(new ReplicaAfterPrimaryActiveNodeAllocation(settings))
|
||||
.add(new ThrottlingNodeAllocation(settings, nodeSettingsService))
|
||||
.add(new RebalanceOnlyWhenActiveNodeAllocation(settings))
|
||||
.add(new ClusterRebalanceNodeAllocation(settings))
|
||||
.add(new ConcurrentRebalanceNodeAllocation(settings))
|
||||
public AllocationDeciders(Settings settings, NodeSettingsService nodeSettingsService) {
|
||||
this(settings, ImmutableSet.<AllocationDecider>builder()
|
||||
.add(new SameShardAllocationDecider(settings))
|
||||
.add(new ReplicaAfterPrimaryActiveAllocationDecider(settings))
|
||||
.add(new ThrottlingAllocationDecider(settings, nodeSettingsService))
|
||||
.add(new RebalanceOnlyWhenActiveAllocationDecider(settings))
|
||||
.add(new ClusterRebalanceAllocationDecider(settings))
|
||||
.add(new ConcurrentRebalanceAllocationDecider(settings))
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
@Inject public NodeAllocations(Settings settings, Set<NodeAllocation> allocations) {
|
||||
@Inject public AllocationDeciders(Settings settings, Set<AllocationDecider> allocations) {
|
||||
super(settings);
|
||||
this.allocations = allocations.toArray(new NodeAllocation[allocations.size()]);
|
||||
this.allocations = allocations.toArray(new AllocationDecider[allocations.size()]);
|
||||
}
|
||||
|
||||
@Override public boolean canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
|
||||
for (NodeAllocation allocation1 : allocations) {
|
||||
for (AllocationDecider allocation1 : allocations) {
|
||||
if (!allocation1.canRebalance(shardRouting, allocation)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -70,7 +69,7 @@ public class NodeAllocations extends NodeAllocation {
|
|||
return Decision.NO;
|
||||
}
|
||||
// now, go over the registered allocations
|
||||
for (NodeAllocation allocation1 : allocations) {
|
||||
for (AllocationDecider allocation1 : allocations) {
|
||||
Decision decision = allocation1.canAllocate(shardRouting, node, allocation);
|
||||
if (decision == Decision.NO) {
|
||||
return Decision.NO;
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.allocation.decider;
|
||||
|
||||
import org.elasticsearch.common.collect.Lists;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.inject.multibindings.Multibinder;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class AllocationDecidersModule extends AbstractModule {
|
||||
|
||||
private final Settings settings;
|
||||
|
||||
private List<Class<? extends AllocationDecider>> allocations = Lists.newArrayList();
|
||||
|
||||
public AllocationDecidersModule(Settings settings) {
|
||||
this.settings = settings;
|
||||
}
|
||||
|
||||
@Override protected void configure() {
|
||||
Multibinder<AllocationDecider> allocationMultibinder = Multibinder.newSetBinder(binder(), AllocationDecider.class);
|
||||
allocationMultibinder.addBinding().to(SameShardAllocationDecider.class);
|
||||
allocationMultibinder.addBinding().to(ReplicaAfterPrimaryActiveAllocationDecider.class);
|
||||
allocationMultibinder.addBinding().to(ThrottlingAllocationDecider.class);
|
||||
allocationMultibinder.addBinding().to(RebalanceOnlyWhenActiveAllocationDecider.class);
|
||||
allocationMultibinder.addBinding().to(ClusterRebalanceAllocationDecider.class);
|
||||
allocationMultibinder.addBinding().to(ConcurrentRebalanceAllocationDecider.class);
|
||||
for (Class<? extends AllocationDecider> allocation : allocations) {
|
||||
allocationMultibinder.addBinding().to(allocation);
|
||||
}
|
||||
|
||||
bind(AllocationDeciders.class).asEagerSingleton();
|
||||
}
|
||||
}
|
|
@ -17,15 +17,16 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
package org.elasticsearch.cluster.routing.allocation.decider;
|
||||
|
||||
import org.elasticsearch.cluster.routing.MutableShardRouting;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
public class ClusterRebalanceNodeAllocation extends NodeAllocation {
|
||||
public class ClusterRebalanceAllocationDecider extends AllocationDecider {
|
||||
|
||||
public static enum ClusterRebalanceType {
|
||||
ALWAYS,
|
||||
|
@ -35,9 +36,9 @@ public class ClusterRebalanceNodeAllocation extends NodeAllocation {
|
|||
|
||||
private final ClusterRebalanceType type;
|
||||
|
||||
@Inject public ClusterRebalanceNodeAllocation(Settings settings) {
|
||||
@Inject public ClusterRebalanceAllocationDecider(Settings settings) {
|
||||
super(settings);
|
||||
String allowRebalance = componentSettings.get("allow_rebalance", "indices_all_active");
|
||||
String allowRebalance = settings.get("cluster.routing.allocation.allow_rebalance", "indices_all_active");
|
||||
if ("always".equalsIgnoreCase(allowRebalance)) {
|
||||
type = ClusterRebalanceType.ALWAYS;
|
||||
} else if ("indices_primaries_active".equalsIgnoreCase(allowRebalance) || "indicesPrimariesActive".equalsIgnoreCase(allowRebalance)) {
|
|
@ -17,22 +17,23 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
package org.elasticsearch.cluster.routing.allocation.decider;
|
||||
|
||||
import org.elasticsearch.cluster.routing.MutableShardRouting;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
public class ConcurrentRebalanceNodeAllocation extends NodeAllocation {
|
||||
public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
|
||||
|
||||
private final int clusterConcurrentRebalance;
|
||||
|
||||
@Inject public ConcurrentRebalanceNodeAllocation(Settings settings) {
|
||||
@Inject public ConcurrentRebalanceAllocationDecider(Settings settings) {
|
||||
super(settings);
|
||||
this.clusterConcurrentRebalance = componentSettings.getAsInt("cluster_concurrent_rebalance", 2);
|
||||
this.clusterConcurrentRebalance = settings.getAsInt("cluster.routing.allocation.cluster_concurrent_rebalance", 2);
|
||||
logger.debug("using [cluster_concurrent_rebalance] with [{}]", clusterConcurrentRebalance);
|
||||
}
|
||||
|
|
@ -17,10 +17,11 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
package org.elasticsearch.cluster.routing.allocation.decider;
|
||||
|
||||
import org.elasticsearch.cluster.routing.MutableShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
|
@ -28,12 +29,10 @@ import java.util.List;
|
|||
|
||||
/**
|
||||
* Only allow rebalancing when all shards are active within the shard replication group.
|
||||
*
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class RebalanceOnlyWhenActiveNodeAllocation extends NodeAllocation {
|
||||
public class RebalanceOnlyWhenActiveAllocationDecider extends AllocationDecider {
|
||||
|
||||
@Inject public RebalanceOnlyWhenActiveNodeAllocation(Settings settings) {
|
||||
@Inject public RebalanceOnlyWhenActiveAllocationDecider(Settings settings) {
|
||||
super(settings);
|
||||
}
|
||||
|
|
@ -17,22 +17,21 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
package org.elasticsearch.cluster.routing.allocation.decider;
|
||||
|
||||
import org.elasticsearch.cluster.routing.MutableShardRouting;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
/**
|
||||
* An allocation strategy that only allows for a replica to be allocated when the primary is active.
|
||||
*
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class ReplicaAfterPrimaryActiveNodeAllocation extends NodeAllocation {
|
||||
public class ReplicaAfterPrimaryActiveAllocationDecider extends AllocationDecider {
|
||||
|
||||
@Inject public ReplicaAfterPrimaryActiveNodeAllocation(Settings settings) {
|
||||
@Inject public ReplicaAfterPrimaryActiveAllocationDecider(Settings settings) {
|
||||
super(settings);
|
||||
}
|
||||
|
|
@ -17,22 +17,21 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
package org.elasticsearch.cluster.routing.allocation.decider;
|
||||
|
||||
import org.elasticsearch.cluster.routing.MutableShardRouting;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
/**
|
||||
* An allocation strategy that does not allow for the same shard instance to be allocated on the same node.
|
||||
*
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class SameShardNodeAllocation extends NodeAllocation {
|
||||
public class SameShardAllocationDecider extends AllocationDecider {
|
||||
|
||||
@Inject public SameShardNodeAllocation(Settings settings) {
|
||||
@Inject public SameShardAllocationDecider(Settings settings) {
|
||||
super(settings);
|
||||
}
|
||||
|
|
@ -17,21 +17,21 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
package org.elasticsearch.cluster.routing.allocation.decider;
|
||||
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.routing.MutableShardRouting;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class ThrottlingNodeAllocation extends NodeAllocation {
|
||||
public class ThrottlingAllocationDecider extends AllocationDecider {
|
||||
|
||||
static {
|
||||
MetaData.addDynamicSettings(
|
||||
|
@ -43,11 +43,11 @@ public class ThrottlingNodeAllocation extends NodeAllocation {
|
|||
private volatile int primariesInitialRecoveries;
|
||||
private volatile int concurrentRecoveries;
|
||||
|
||||
@Inject public ThrottlingNodeAllocation(Settings settings, NodeSettingsService nodeSettingsService) {
|
||||
@Inject public ThrottlingAllocationDecider(Settings settings, NodeSettingsService nodeSettingsService) {
|
||||
super(settings);
|
||||
|
||||
this.primariesInitialRecoveries = componentSettings.getAsInt("node_initial_primaries_recoveries", 4);
|
||||
this.concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", componentSettings.getAsInt("node_concurrent_recoveries", 2));
|
||||
this.primariesInitialRecoveries = settings.getAsInt("cluster.routing.allocation.node_initial_primaries_recoveries", settings.getAsInt("cluster.routing.allocation.node_initial_primaries_recoveries", 4));
|
||||
this.concurrentRecoveries = settings.getAsInt("cluster.routing.allocation.concurrent_recoveries", settings.getAsInt("cluster.routing.allocation.node_concurrent_recoveries", 2));
|
||||
logger.debug("using node_concurrent_recoveries [{}], node_initial_primaries_recoveries [{}]", concurrentRecoveries, primariesInitialRecoveries);
|
||||
|
||||
nodeSettingsService.addListener(new ApplySettings());
|
||||
|
@ -97,16 +97,16 @@ public class ThrottlingNodeAllocation extends NodeAllocation {
|
|||
|
||||
class ApplySettings implements NodeSettingsService.Listener {
|
||||
@Override public void onRefreshSettings(Settings settings) {
|
||||
int primariesInitialRecoveries = settings.getAsInt("cluster.routing.allocation.node_initial_primaries_recoveries", ThrottlingNodeAllocation.this.primariesInitialRecoveries);
|
||||
if (primariesInitialRecoveries != ThrottlingNodeAllocation.this.primariesInitialRecoveries) {
|
||||
logger.info("updating [cluster.routing.allocation.node_initial_primaries_recoveries] from [{}] to [{}]", ThrottlingNodeAllocation.this.primariesInitialRecoveries, primariesInitialRecoveries);
|
||||
ThrottlingNodeAllocation.this.primariesInitialRecoveries = primariesInitialRecoveries;
|
||||
int primariesInitialRecoveries = settings.getAsInt("cluster.routing.allocation.node_initial_primaries_recoveries", ThrottlingAllocationDecider.this.primariesInitialRecoveries);
|
||||
if (primariesInitialRecoveries != ThrottlingAllocationDecider.this.primariesInitialRecoveries) {
|
||||
logger.info("updating [cluster.routing.allocation.node_initial_primaries_recoveries] from [{}] to [{}]", ThrottlingAllocationDecider.this.primariesInitialRecoveries, primariesInitialRecoveries);
|
||||
ThrottlingAllocationDecider.this.primariesInitialRecoveries = primariesInitialRecoveries;
|
||||
}
|
||||
|
||||
int concurrentRecoveries = settings.getAsInt("cluster.routing.allocation.node_concurrent_recoveries", ThrottlingNodeAllocation.this.concurrentRecoveries);
|
||||
if (concurrentRecoveries != ThrottlingNodeAllocation.this.concurrentRecoveries) {
|
||||
logger.info("updating [cluster.routing.allocation.node_concurrent_recoveries] from [{}] to [{}]", ThrottlingNodeAllocation.this.concurrentRecoveries, concurrentRecoveries);
|
||||
ThrottlingNodeAllocation.this.concurrentRecoveries = concurrentRecoveries;
|
||||
int concurrentRecoveries = settings.getAsInt("cluster.routing.allocation.node_concurrent_recoveries", ThrottlingAllocationDecider.this.concurrentRecoveries);
|
||||
if (concurrentRecoveries != ThrottlingAllocationDecider.this.concurrentRecoveries) {
|
||||
logger.info("updating [cluster.routing.allocation.node_concurrent_recoveries] from [{}] to [{}]", ThrottlingAllocationDecider.this.concurrentRecoveries, concurrentRecoveries);
|
||||
ThrottlingAllocationDecider.this.concurrentRecoveries = concurrentRecoveries;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -27,11 +27,10 @@ import org.elasticsearch.cluster.routing.RoutingNode;
|
|||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.NodeAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
|
||||
import org.elasticsearch.common.collect.Maps;
|
||||
import org.elasticsearch.common.collect.Sets;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
|
@ -78,19 +77,19 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme
|
|||
this.listTimeout = componentSettings.getAsTime("list_timeout", TimeValue.timeValueSeconds(30));
|
||||
}
|
||||
|
||||
@Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) {
|
||||
@Override public void applyStartedShards(StartedRerouteAllocation allocation) {
|
||||
for (ShardRouting shardRouting : allocation.startedShards()) {
|
||||
cachedCommitPoints.remove(shardRouting.shardId());
|
||||
cachedStores.remove(shardRouting.shardId());
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) {
|
||||
@Override public void applyFailedShards(FailedRerouteAllocation allocation) {
|
||||
cachedCommitPoints.remove(allocation.failedShard().shardId());
|
||||
cachedStores.remove(allocation.failedShard().shardId());
|
||||
}
|
||||
|
||||
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
|
||||
@Override public boolean allocateUnassigned(RoutingAllocation allocation) {
|
||||
boolean changed = false;
|
||||
|
||||
DiscoveryNodes nodes = allocation.nodes();
|
||||
|
@ -116,7 +115,7 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme
|
|||
continue;
|
||||
}
|
||||
// if its THROTTLING, we are not going to allocate it to this node, so ignore it as well
|
||||
if (nodeAllocations.canAllocate(shard, node, allocation).allocate()) {
|
||||
if (allocation.deciders().canAllocate(shard, node, allocation).allocate()) {
|
||||
canBeAllocatedToAtLeastOneNode = true;
|
||||
break;
|
||||
}
|
||||
|
@ -150,7 +149,7 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme
|
|||
// check if we can allocate on that node...
|
||||
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
|
||||
// then we will try and assign it next time
|
||||
if (nodeAllocations.canAllocate(shard, node, allocation) == NodeAllocation.Decision.NO) {
|
||||
if (allocation.deciders().canAllocate(shard, node, allocation) == AllocationDecider.Decision.NO) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -233,7 +232,7 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme
|
|||
}
|
||||
|
||||
if (lastNodeMatched != null) {
|
||||
if (nodeAllocations.canAllocate(shard, lastNodeMatched, allocation) == NodeAllocation.Decision.THROTTLE) {
|
||||
if (allocation.deciders().canAllocate(shard, lastNodeMatched, allocation) == AllocationDecider.Decision.THROTTLE) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
|
||||
}
|
||||
|
|
|
@ -28,11 +28,10 @@ import org.elasticsearch.cluster.routing.RoutingNode;
|
|||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.NodeAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
|
||||
import org.elasticsearch.common.collect.Maps;
|
||||
import org.elasticsearch.common.collect.Sets;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
|
@ -86,20 +85,20 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
|
|||
logger.debug("using initial_shards [{}], list_timeout [{}]", initialShards, listTimeout);
|
||||
}
|
||||
|
||||
@Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) {
|
||||
@Override public void applyStartedShards(StartedRerouteAllocation allocation) {
|
||||
for (ShardRouting shardRouting : allocation.startedShards()) {
|
||||
cachedStores.remove(shardRouting.shardId());
|
||||
cachedShardsState.remove(shardRouting.shardId());
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) {
|
||||
@Override public void applyFailedShards(FailedRerouteAllocation allocation) {
|
||||
ShardRouting failedShard = allocation.failedShard();
|
||||
cachedStores.remove(failedShard.shardId());
|
||||
cachedShardsState.remove(failedShard.shardId());
|
||||
}
|
||||
|
||||
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
|
||||
@Override public boolean allocateUnassigned(RoutingAllocation allocation) {
|
||||
boolean changed = false;
|
||||
DiscoveryNodes nodes = allocation.nodes();
|
||||
RoutingNodes routingNodes = allocation.routingNodes();
|
||||
|
@ -189,10 +188,10 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
|
|||
Set<DiscoveryNode> noNodes = Sets.newHashSet();
|
||||
for (DiscoveryNode discoNode : nodesWithHighestVersion) {
|
||||
RoutingNode node = routingNodes.node(discoNode.id());
|
||||
NodeAllocation.Decision decision = nodeAllocations.canAllocate(shard, node, allocation);
|
||||
if (decision == NodeAllocation.Decision.THROTTLE) {
|
||||
AllocationDecider.Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
|
||||
if (decision == AllocationDecider.Decision.THROTTLE) {
|
||||
throttledNodes.add(discoNode);
|
||||
} else if (decision == NodeAllocation.Decision.NO) {
|
||||
} else if (decision == AllocationDecider.Decision.NO) {
|
||||
noNodes.add(discoNode);
|
||||
} else {
|
||||
if (logger.isDebugEnabled()) {
|
||||
|
@ -252,7 +251,7 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
|
|||
}
|
||||
// if we can't allocate it on a node, ignore it, for example, this handles
|
||||
// cases for only allocating a replica after a primary
|
||||
if (nodeAllocations.canAllocate(shard, node, allocation).allocate()) {
|
||||
if (allocation.deciders().canAllocate(shard, node, allocation).allocate()) {
|
||||
canBeAllocatedToAtLeastOneNode = true;
|
||||
break;
|
||||
}
|
||||
|
@ -286,7 +285,7 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
|
|||
// check if we can allocate on that node...
|
||||
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
|
||||
// then we will try and assign it next time
|
||||
if (nodeAllocations.canAllocate(shard, node, allocation) == NodeAllocation.Decision.NO) {
|
||||
if (allocation.deciders().canAllocate(shard, node, allocation) == AllocationDecider.Decision.NO) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -322,7 +321,7 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
|
|||
|
||||
if (lastNodeMatched != null) {
|
||||
// we only check on THROTTLE since we checked before before on NO
|
||||
if (nodeAllocations.canAllocate(shard, lastNodeMatched, allocation) == NodeAllocation.Decision.THROTTLE) {
|
||||
if (allocation.deciders().canAllocate(shard, lastNodeMatched, allocation) == AllocationDecider.Decision.THROTTLE) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.gateway.none;
|
||||
|
||||
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator;
|
||||
|
@ -29,13 +28,13 @@ import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator;
|
|||
*/
|
||||
public class NoneGatewayAllocator implements GatewayAllocator {
|
||||
|
||||
@Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) {
|
||||
@Override public void applyStartedShards(StartedRerouteAllocation allocation) {
|
||||
}
|
||||
|
||||
@Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) {
|
||||
@Override public void applyFailedShards(FailedRerouteAllocation allocation) {
|
||||
}
|
||||
|
||||
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
|
||||
@Override public boolean allocateUnassigned(RoutingAllocation allocation) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.testng.annotations.Test;
|
||||
|
@ -44,7 +45,7 @@ public class ClusterRebalanceRoutingTests {
|
|||
private final ESLogger logger = Loggers.getLogger(ClusterRebalanceRoutingTests.class);
|
||||
|
||||
@Test public void testAlways() {
|
||||
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build());
|
||||
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()).build());
|
||||
|
||||
MetaData metaData = newMetaDataBuilder()
|
||||
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))
|
||||
|
@ -129,7 +130,7 @@ public class ClusterRebalanceRoutingTests {
|
|||
|
||||
|
||||
@Test public void testClusterPrimariesActive1() {
|
||||
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE.toString()).build());
|
||||
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE.toString()).build());
|
||||
|
||||
MetaData metaData = newMetaDataBuilder()
|
||||
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))
|
||||
|
@ -232,7 +233,7 @@ public class ClusterRebalanceRoutingTests {
|
|||
}
|
||||
|
||||
@Test public void testClusterPrimariesActive2() {
|
||||
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE.toString()).build());
|
||||
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE.toString()).build());
|
||||
|
||||
MetaData metaData = newMetaDataBuilder()
|
||||
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))
|
||||
|
@ -315,7 +316,7 @@ public class ClusterRebalanceRoutingTests {
|
|||
}
|
||||
|
||||
@Test public void testClusterAllActive1() {
|
||||
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build());
|
||||
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build());
|
||||
|
||||
MetaData metaData = newMetaDataBuilder()
|
||||
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))
|
||||
|
@ -437,7 +438,7 @@ public class ClusterRebalanceRoutingTests {
|
|||
}
|
||||
|
||||
@Test public void testClusterAllActive2() {
|
||||
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build());
|
||||
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build());
|
||||
|
||||
MetaData metaData = newMetaDataBuilder()
|
||||
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))
|
||||
|
@ -520,7 +521,7 @@ public class ClusterRebalanceRoutingTests {
|
|||
}
|
||||
|
||||
@Test public void testClusterAllActive3() {
|
||||
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build());
|
||||
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build());
|
||||
|
||||
MetaData metaData = newMetaDataBuilder()
|
||||
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
|
|||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.testng.annotations.Test;
|
||||
|
@ -44,7 +45,7 @@ public class FailedNodeRoutingTests {
|
|||
private final ESLogger logger = Loggers.getLogger(FailedNodeRoutingTests.class);
|
||||
|
||||
@Test public void simpleFailedNodeTest() {
|
||||
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build());
|
||||
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()).build());
|
||||
|
||||
MetaData metaData = newMetaDataBuilder()
|
||||
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))
|
||||
|
@ -103,7 +104,7 @@ public class FailedNodeRoutingTests {
|
|||
}
|
||||
|
||||
@Test public void simpleFailedNodeTestNoReassign() {
|
||||
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build());
|
||||
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()).build());
|
||||
|
||||
MetaData metaData = newMetaDataBuilder()
|
||||
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.testng.annotations.Test;
|
||||
|
@ -44,7 +45,7 @@ public class ShardVersioningTests {
|
|||
private final ESLogger logger = Loggers.getLogger(ShardVersioningTests.class);
|
||||
|
||||
@Test public void simple() {
|
||||
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build());
|
||||
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()).build());
|
||||
|
||||
MetaData metaData = newMetaDataBuilder()
|
||||
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))
|
||||
|
|
Loading…
Reference in New Issue