From 203564a5b0e4d949eaf7d34ea405385a1fefd2db Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Tue, 6 Sep 2011 17:51:11 +0300 Subject: [PATCH] externalize shard allocation decision to a separate module --- .../routing/allocation/AllocationModule.java | 24 ++------ .../routing/allocation/AllocationService.java | 25 +++++---- .../allocation/FailedRerouteAllocation.java | 5 +- .../routing/allocation/RoutingAllocation.java | 10 +++- .../allocation/StartedRerouteAllocation.java | 5 +- .../allocator/EvenShardsCountAllocator.java | 17 +++--- .../allocator/GatewayAllocator.java | 7 +-- .../allocation/allocator/ShardsAllocator.java | 9 ++- .../allocator/ShardsAllocators.java | 23 ++++---- .../AllocationDecider.java} | 7 ++- .../AllocationDeciders.java} | 35 ++++++------ .../decider/AllocationDecidersModule.java | 55 +++++++++++++++++++ .../ClusterRebalanceAllocationDecider.java} | 9 +-- ...ConcurrentRebalanceAllocationDecider.java} | 9 +-- ...lanceOnlyWhenActiveAllocationDecider.java} | 9 ++- ...aAfterPrimaryActiveAllocationDecider.java} | 9 ++- .../SameShardAllocationDecider.java} | 9 ++- .../ThrottlingAllocationDecider.java} | 28 +++++----- .../BlobReuseExistingGatewayAllocator.java | 15 +++-- .../gateway/local/LocalGatewayAllocator.java | 21 ++++--- .../gateway/none/NoneGatewayAllocator.java | 7 +-- .../ClusterRebalanceRoutingTests.java | 13 +++-- .../allocation/FailedNodeRoutingTests.java | 5 +- .../allocation/ShardVersioningTests.java | 3 +- 24 files changed, 202 insertions(+), 157 deletions(-) rename modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/{NodeAllocation.java => decider/AllocationDecider.java} (88%) rename modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/{NodeAllocations.java => decider/AllocationDeciders.java} (64%) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecidersModule.java rename modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/{ClusterRebalanceNodeAllocation.java => decider/ClusterRebalanceAllocationDecider.java} (89%) rename modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/{ConcurrentRebalanceNodeAllocation.java => decider/ConcurrentRebalanceAllocationDecider.java} (82%) rename modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/{RebalanceOnlyWhenActiveNodeAllocation.java => decider/RebalanceOnlyWhenActiveAllocationDecider.java} (85%) rename modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/{ReplicaAfterPrimaryActiveNodeAllocation.java => decider/ReplicaAfterPrimaryActiveAllocationDecider.java} (84%) rename modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/{SameShardNodeAllocation.java => decider/SameShardAllocationDecider.java} (86%) rename modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/{ThrottlingNodeAllocation.java => decider/ThrottlingAllocationDecider.java} (73%) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationModule.java index 4681f2691b3..fbec8881186 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationModule.java @@ -20,12 +20,13 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocatorModule; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecidersModule; import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.SpawnModules; -import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.settings.Settings; import java.util.List; @@ -37,34 +38,17 @@ public class AllocationModule extends AbstractModule implements SpawnModules { private final Settings settings; - private List> allocations = Lists.newArrayList(); + private List> allocations = Lists.newArrayList(); public AllocationModule(Settings settings) { this.settings = settings; } @Override public Iterable spawnModules() { - return ImmutableList.of(new ShardsAllocatorModule(settings)); - } - - public void addNodeAllocation(Class nodeAllocation) { - allocations.add(nodeAllocation); + return ImmutableList.of(new ShardsAllocatorModule(settings), new AllocationDecidersModule(settings)); } @Override protected void configure() { bind(AllocationService.class).asEagerSingleton(); - - Multibinder allocationMultibinder = Multibinder.newSetBinder(binder(), NodeAllocation.class); - allocationMultibinder.addBinding().to(SameShardNodeAllocation.class); - allocationMultibinder.addBinding().to(ReplicaAfterPrimaryActiveNodeAllocation.class); - allocationMultibinder.addBinding().to(ThrottlingNodeAllocation.class); - allocationMultibinder.addBinding().to(RebalanceOnlyWhenActiveNodeAllocation.class); - allocationMultibinder.addBinding().to(ClusterRebalanceNodeAllocation.class); - allocationMultibinder.addBinding().to(ConcurrentRebalanceNodeAllocation.class); - for (Class allocation : allocations) { - allocationMultibinder.addBinding().to(allocation); - } - - bind(NodeAllocations.class).asEagerSingleton(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index a3238926193..e1e1b25bfe2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.ImmutableSettings; @@ -47,7 +48,7 @@ import static org.elasticsearch.common.collect.Sets.*; */ public class AllocationService extends AbstractComponent { - private final NodeAllocations nodeAllocations; + private final AllocationDeciders allocationDeciders; private final ShardsAllocators shardsAllocators; @@ -57,14 +58,14 @@ public class AllocationService extends AbstractComponent { public AllocationService(Settings settings) { this(settings, - new NodeAllocations(settings, new NodeSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS)), + new AllocationDeciders(settings, new NodeSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS)), new ShardsAllocators(settings) ); } - @Inject public AllocationService(Settings settings, NodeAllocations nodeAllocations, ShardsAllocators shardsAllocators) { + @Inject public AllocationService(Settings settings, AllocationDeciders allocationDeciders, ShardsAllocators shardsAllocators) { super(settings); - this.nodeAllocations = nodeAllocations; + this.allocationDeciders = allocationDeciders; this.shardsAllocators = shardsAllocators; } @@ -75,12 +76,12 @@ public class AllocationService extends AbstractComponent { */ public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List startedShards) { RoutingNodes routingNodes = clusterState.routingNodes(); - StartedRerouteAllocation allocation = new StartedRerouteAllocation(routingNodes, clusterState.nodes(), startedShards); + StartedRerouteAllocation allocation = new StartedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), startedShards); boolean changed = applyStartedShards(routingNodes, startedShards); if (!changed) { return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation()); } - shardsAllocators.applyStartedShards(nodeAllocations, allocation); + shardsAllocators.applyStartedShards(allocation); reroute(allocation); return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation()); } @@ -92,12 +93,12 @@ public class AllocationService extends AbstractComponent { */ public RoutingAllocation.Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) { RoutingNodes routingNodes = clusterState.routingNodes(); - FailedRerouteAllocation allocation = new FailedRerouteAllocation(routingNodes, clusterState.nodes(), failedShard); + FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), failedShard); boolean changed = applyFailedShard(allocation); if (!changed) { return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation()); } - shardsAllocators.applyFailedShards(nodeAllocations, allocation); + shardsAllocators.applyFailedShards(allocation); reroute(allocation); return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation()); } @@ -109,7 +110,7 @@ public class AllocationService extends AbstractComponent { */ public RoutingAllocation.Result reroute(ClusterState clusterState) { RoutingNodes routingNodes = clusterState.routingNodes(); - RoutingAllocation allocation = new RoutingAllocation(routingNodes, clusterState.nodes()); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes()); if (!reroute(allocation)) { return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation()); } @@ -123,7 +124,7 @@ public class AllocationService extends AbstractComponent { */ public RoutingAllocation.Result rerouteWithNoReassign(ClusterState clusterState) { RoutingNodes routingNodes = clusterState.routingNodes(); - RoutingAllocation allocation = new RoutingAllocation(routingNodes, clusterState.nodes()); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes()); Iterable dataNodes = allocation.nodes().dataNodes().values(); boolean changed = false; // first, clear from the shards any node id they used to belong to that is now dead @@ -158,13 +159,13 @@ public class AllocationService extends AbstractComponent { // now allocate all the unassigned to available nodes if (allocation.routingNodes().hasUnassigned()) { - changed |= shardsAllocators.allocateUnassigned(nodeAllocations, allocation); + changed |= shardsAllocators.allocateUnassigned(allocation); // elect primaries again, in case this is needed with unassigned allocation changed |= electPrimaries(allocation.routingNodes()); } // rebalance - changed |= shardsAllocators.rebalance(nodeAllocations, allocation); + changed |= shardsAllocators.rebalance(allocation); return changed; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java index 5400468533c..ef1b7fc0f82 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java @@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; /** * @author kimchy (shay.banon) @@ -30,8 +31,8 @@ public class FailedRerouteAllocation extends RoutingAllocation { private final ShardRouting failedShard; - public FailedRerouteAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes, ShardRouting failedShard) { - super(routingNodes, nodes); + public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, ShardRouting failedShard) { + super(deciders, routingNodes, nodes); this.failedShard = failedShard; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index d2489d5d615..6bb6e893b33 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.index.shard.ShardId; import java.util.HashMap; @@ -59,6 +60,8 @@ public class RoutingAllocation { } } + private final AllocationDeciders deciders; + private final RoutingNodes routingNodes; private final DiscoveryNodes nodes; @@ -67,11 +70,16 @@ public class RoutingAllocation { private Map ignoredShardToNodes = null; - public RoutingAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes) { + public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes) { + this.deciders = deciders; this.routingNodes = routingNodes; this.nodes = nodes; } + public AllocationDeciders deciders() { + return this.deciders; + } + public RoutingTable routingTable() { return routingNodes.routingTable(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java index 74ac249ca89..d4385e502a9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java @@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import java.util.List; @@ -32,8 +33,8 @@ public class StartedRerouteAllocation extends RoutingAllocation { private final List startedShards; - public StartedRerouteAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes, List startedShards) { - super(routingNodes, nodes); + public StartedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, List startedShards) { + super(deciders, routingNodes, nodes); this.startedShards = startedShards; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/EvenShardsCountAllocator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/EvenShardsCountAllocator.java index 11a426cecb4..a47d44cef0e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/EvenShardsCountAllocator.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/EvenShardsCountAllocator.java @@ -23,7 +23,6 @@ import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; -import org.elasticsearch.cluster.routing.allocation.NodeAllocations; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; import org.elasticsearch.common.component.AbstractComponent; @@ -43,13 +42,13 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard super(settings); } - @Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) { + @Override public void applyStartedShards(StartedRerouteAllocation allocation) { } - @Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) { + @Override public void applyFailedShards(FailedRerouteAllocation allocation) { } - @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) { + @Override public boolean allocateUnassigned(RoutingAllocation allocation) { boolean changed = false; RoutingNodes routingNodes = allocation.routingNodes(); @@ -69,7 +68,7 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard lastNode = 0; } - if (nodeAllocations.canAllocate(shard, node, allocation).allocate()) { + if (allocation.deciders().canAllocate(shard, node, allocation).allocate()) { int numberOfShardsToAllocate = routingNodes.requiredAverageNumberOfShardsPerNode() - node.shards().size(); if (numberOfShardsToAllocate <= 0) { continue; @@ -88,7 +87,7 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard MutableShardRouting shard = it.next(); // go over the nodes and try and allocate the remaining ones for (RoutingNode routingNode : routingNodes.sortedNodesLeastToHigh()) { - if (nodeAllocations.canAllocate(shard, routingNode, allocation).allocate()) { + if (allocation.deciders().canAllocate(shard, routingNode, allocation).allocate()) { changed = true; routingNode.add(shard); it.remove(); @@ -99,7 +98,7 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard return changed; } - @Override public boolean rebalance(NodeAllocations nodeAllocations, RoutingAllocation allocation) { + @Override public boolean rebalance(RoutingAllocation allocation) { boolean changed = false; List sortedNodesLeastToHigh = allocation.routingNodes().sortedNodesLeastToHigh(); if (sortedNodesLeastToHigh.isEmpty()) { @@ -129,11 +128,11 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard boolean relocated = false; List startedShards = highRoutingNode.shardsWithState(STARTED); for (MutableShardRouting startedShard : startedShards) { - if (!nodeAllocations.canRebalance(startedShard, allocation)) { + if (!allocation.deciders().canRebalance(startedShard, allocation)) { continue; } - if (nodeAllocations.canAllocate(startedShard, lowRoutingNode, allocation).allocate()) { + if (allocation.deciders().canAllocate(startedShard, lowRoutingNode, allocation).allocate()) { changed = true; lowRoutingNode.add(new MutableShardRouting(startedShard.index(), startedShard.id(), lowRoutingNode.nodeId(), startedShard.currentNodeId(), diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GatewayAllocator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GatewayAllocator.java index d8ad63e1124..f80beceb685 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GatewayAllocator.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GatewayAllocator.java @@ -20,7 +20,6 @@ package org.elasticsearch.cluster.routing.allocation.allocator; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; -import org.elasticsearch.cluster.routing.allocation.NodeAllocations; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; @@ -29,9 +28,9 @@ import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; */ public interface GatewayAllocator { - void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation); + void applyStartedShards(StartedRerouteAllocation allocation); - void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation); + void applyFailedShards(FailedRerouteAllocation allocation); - boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation); + boolean allocateUnassigned(RoutingAllocation allocation); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java index fea1cf76c01..25ef9d21a55 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java @@ -20,7 +20,6 @@ package org.elasticsearch.cluster.routing.allocation.allocator; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; -import org.elasticsearch.cluster.routing.allocation.NodeAllocations; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; @@ -28,11 +27,11 @@ import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; */ public interface ShardsAllocator { - void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation); + void applyStartedShards(StartedRerouteAllocation allocation); - void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation); + void applyFailedShards(FailedRerouteAllocation allocation); - boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation); + boolean allocateUnassigned(RoutingAllocation allocation); - boolean rebalance(NodeAllocations nodeAllocations, RoutingAllocation allocation); + boolean rebalance(RoutingAllocation allocation); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java index 6afc7805984..8dd4e00c572 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java @@ -20,7 +20,6 @@ package org.elasticsearch.cluster.routing.allocation.allocator; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; -import org.elasticsearch.cluster.routing.allocation.NodeAllocations; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; import org.elasticsearch.common.component.AbstractComponent; @@ -50,24 +49,24 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat this.allocator = allocator; } - @Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) { - gatewayAllocator.applyStartedShards(nodeAllocations, allocation); - allocator.applyStartedShards(nodeAllocations, allocation); + @Override public void applyStartedShards(StartedRerouteAllocation allocation) { + gatewayAllocator.applyStartedShards(allocation); + allocator.applyStartedShards(allocation); } - @Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) { - gatewayAllocator.applyFailedShards(nodeAllocations, allocation); - allocator.applyFailedShards(nodeAllocations, allocation); + @Override public void applyFailedShards(FailedRerouteAllocation allocation) { + gatewayAllocator.applyFailedShards(allocation); + allocator.applyFailedShards(allocation); } - @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) { + @Override public boolean allocateUnassigned(RoutingAllocation allocation) { boolean changed = false; - changed |= gatewayAllocator.allocateUnassigned(nodeAllocations, allocation); - changed |= allocator.allocateUnassigned(nodeAllocations, allocation); + changed |= gatewayAllocator.allocateUnassigned(allocation); + changed |= allocator.allocateUnassigned(allocation); return changed; } - @Override public boolean rebalance(NodeAllocations nodeAllocations, RoutingAllocation allocation) { - return allocator.rebalance(nodeAllocations, allocation); + @Override public boolean rebalance(RoutingAllocation allocation) { + return allocator.rebalance(allocation); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java similarity index 88% rename from modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocation.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java index f1a4261878f..7c53cd68fe5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java @@ -17,10 +17,11 @@ * under the License. */ -package org.elasticsearch.cluster.routing.allocation; +package org.elasticsearch.cluster.routing.allocation.decider; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; @@ -29,7 +30,7 @@ import org.elasticsearch.common.settings.Settings; * * @author kimchy (shay.banon) */ -public abstract class NodeAllocation extends AbstractComponent { +public abstract class AllocationDecider extends AbstractComponent { public static enum Decision { YES { @@ -51,7 +52,7 @@ public abstract class NodeAllocation extends AbstractComponent { public abstract boolean allocate(); } - protected NodeAllocation(Settings settings) { + protected AllocationDecider(Settings settings) { super(settings); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java similarity index 64% rename from modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java index 9b77cccaf41..32d799179f4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java @@ -17,10 +17,11 @@ * under the License. */ -package org.elasticsearch.cluster.routing.allocation; +package org.elasticsearch.cluster.routing.allocation.decider; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -29,33 +30,31 @@ import org.elasticsearch.node.settings.NodeSettingsService; import java.util.Set; /** - * Holds several {@link NodeAllocation}s and combines them into a single allocation decision. - * - * @author kimchy (shay.banon) + * Holds several {@link AllocationDecider}s and combines them into a single allocation decision. */ -public class NodeAllocations extends NodeAllocation { +public class AllocationDeciders extends AllocationDecider { - private final NodeAllocation[] allocations; + private final AllocationDecider[] allocations; - public NodeAllocations(Settings settings, NodeSettingsService nodeSettingsService) { - this(settings, ImmutableSet.builder() - .add(new SameShardNodeAllocation(settings)) - .add(new ReplicaAfterPrimaryActiveNodeAllocation(settings)) - .add(new ThrottlingNodeAllocation(settings, nodeSettingsService)) - .add(new RebalanceOnlyWhenActiveNodeAllocation(settings)) - .add(new ClusterRebalanceNodeAllocation(settings)) - .add(new ConcurrentRebalanceNodeAllocation(settings)) + public AllocationDeciders(Settings settings, NodeSettingsService nodeSettingsService) { + this(settings, ImmutableSet.builder() + .add(new SameShardAllocationDecider(settings)) + .add(new ReplicaAfterPrimaryActiveAllocationDecider(settings)) + .add(new ThrottlingAllocationDecider(settings, nodeSettingsService)) + .add(new RebalanceOnlyWhenActiveAllocationDecider(settings)) + .add(new ClusterRebalanceAllocationDecider(settings)) + .add(new ConcurrentRebalanceAllocationDecider(settings)) .build() ); } - @Inject public NodeAllocations(Settings settings, Set allocations) { + @Inject public AllocationDeciders(Settings settings, Set allocations) { super(settings); - this.allocations = allocations.toArray(new NodeAllocation[allocations.size()]); + this.allocations = allocations.toArray(new AllocationDecider[allocations.size()]); } @Override public boolean canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { - for (NodeAllocation allocation1 : allocations) { + for (AllocationDecider allocation1 : allocations) { if (!allocation1.canRebalance(shardRouting, allocation)) { return false; } @@ -70,7 +69,7 @@ public class NodeAllocations extends NodeAllocation { return Decision.NO; } // now, go over the registered allocations - for (NodeAllocation allocation1 : allocations) { + for (AllocationDecider allocation1 : allocations) { Decision decision = allocation1.canAllocate(shardRouting, node, allocation); if (decision == Decision.NO) { return Decision.NO; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecidersModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecidersModule.java new file mode 100644 index 00000000000..8a06ea585a6 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecidersModule.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation.decider; + +import org.elasticsearch.common.collect.Lists; +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.multibindings.Multibinder; +import org.elasticsearch.common.settings.Settings; + +import java.util.List; + +/** + */ +public class AllocationDecidersModule extends AbstractModule { + + private final Settings settings; + + private List> allocations = Lists.newArrayList(); + + public AllocationDecidersModule(Settings settings) { + this.settings = settings; + } + + @Override protected void configure() { + Multibinder allocationMultibinder = Multibinder.newSetBinder(binder(), AllocationDecider.class); + allocationMultibinder.addBinding().to(SameShardAllocationDecider.class); + allocationMultibinder.addBinding().to(ReplicaAfterPrimaryActiveAllocationDecider.class); + allocationMultibinder.addBinding().to(ThrottlingAllocationDecider.class); + allocationMultibinder.addBinding().to(RebalanceOnlyWhenActiveAllocationDecider.class); + allocationMultibinder.addBinding().to(ClusterRebalanceAllocationDecider.class); + allocationMultibinder.addBinding().to(ConcurrentRebalanceAllocationDecider.class); + for (Class allocation : allocations) { + allocationMultibinder.addBinding().to(allocation); + } + + bind(AllocationDeciders.class).asEagerSingleton(); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java similarity index 89% rename from modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceNodeAllocation.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java index 7da94d31250..159984e4da8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java @@ -17,15 +17,16 @@ * under the License. */ -package org.elasticsearch.cluster.routing.allocation; +package org.elasticsearch.cluster.routing.allocation.decider; import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -public class ClusterRebalanceNodeAllocation extends NodeAllocation { +public class ClusterRebalanceAllocationDecider extends AllocationDecider { public static enum ClusterRebalanceType { ALWAYS, @@ -35,9 +36,9 @@ public class ClusterRebalanceNodeAllocation extends NodeAllocation { private final ClusterRebalanceType type; - @Inject public ClusterRebalanceNodeAllocation(Settings settings) { + @Inject public ClusterRebalanceAllocationDecider(Settings settings) { super(settings); - String allowRebalance = componentSettings.get("allow_rebalance", "indices_all_active"); + String allowRebalance = settings.get("cluster.routing.allocation.allow_rebalance", "indices_all_active"); if ("always".equalsIgnoreCase(allowRebalance)) { type = ClusterRebalanceType.ALWAYS; } else if ("indices_primaries_active".equalsIgnoreCase(allowRebalance) || "indicesPrimariesActive".equalsIgnoreCase(allowRebalance)) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java similarity index 82% rename from modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceNodeAllocation.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java index 1837cdf108e..93f8f89eba8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java @@ -17,22 +17,23 @@ * under the License. */ -package org.elasticsearch.cluster.routing.allocation; +package org.elasticsearch.cluster.routing.allocation.decider; import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -public class ConcurrentRebalanceNodeAllocation extends NodeAllocation { +public class ConcurrentRebalanceAllocationDecider extends AllocationDecider { private final int clusterConcurrentRebalance; - @Inject public ConcurrentRebalanceNodeAllocation(Settings settings) { + @Inject public ConcurrentRebalanceAllocationDecider(Settings settings) { super(settings); - this.clusterConcurrentRebalance = componentSettings.getAsInt("cluster_concurrent_rebalance", 2); + this.clusterConcurrentRebalance = settings.getAsInt("cluster.routing.allocation.cluster_concurrent_rebalance", 2); logger.debug("using [cluster_concurrent_rebalance] with [{}]", clusterConcurrentRebalance); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/RebalanceOnlyWhenActiveNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java similarity index 85% rename from modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/RebalanceOnlyWhenActiveNodeAllocation.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java index 2efae2ed95c..32c6df5082d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/RebalanceOnlyWhenActiveNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java @@ -17,10 +17,11 @@ * under the License. */ -package org.elasticsearch.cluster.routing.allocation; +package org.elasticsearch.cluster.routing.allocation.decider; import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -28,12 +29,10 @@ import java.util.List; /** * Only allow rebalancing when all shards are active within the shard replication group. - * - * @author kimchy (shay.banon) */ -public class RebalanceOnlyWhenActiveNodeAllocation extends NodeAllocation { +public class RebalanceOnlyWhenActiveAllocationDecider extends AllocationDecider { - @Inject public RebalanceOnlyWhenActiveNodeAllocation(Settings settings) { + @Inject public RebalanceOnlyWhenActiveAllocationDecider(Settings settings) { super(settings); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ReplicaAfterPrimaryActiveNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java similarity index 84% rename from modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ReplicaAfterPrimaryActiveNodeAllocation.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java index 53486c457c1..ffdc8ea420b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ReplicaAfterPrimaryActiveNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java @@ -17,22 +17,21 @@ * under the License. */ -package org.elasticsearch.cluster.routing.allocation; +package org.elasticsearch.cluster.routing.allocation.decider; import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; /** * An allocation strategy that only allows for a replica to be allocated when the primary is active. - * - * @author kimchy (shay.banon) */ -public class ReplicaAfterPrimaryActiveNodeAllocation extends NodeAllocation { +public class ReplicaAfterPrimaryActiveAllocationDecider extends AllocationDecider { - @Inject public ReplicaAfterPrimaryActiveNodeAllocation(Settings settings) { + @Inject public ReplicaAfterPrimaryActiveAllocationDecider(Settings settings) { super(settings); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/SameShardNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java similarity index 86% rename from modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/SameShardNodeAllocation.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java index 3277c800c3f..b2b9d9381d3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/SameShardNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java @@ -17,22 +17,21 @@ * under the License. */ -package org.elasticsearch.cluster.routing.allocation; +package org.elasticsearch.cluster.routing.allocation.decider; import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; /** * An allocation strategy that does not allow for the same shard instance to be allocated on the same node. - * - * @author kimchy (shay.banon) */ -public class SameShardNodeAllocation extends NodeAllocation { +public class SameShardAllocationDecider extends AllocationDecider { - @Inject public SameShardNodeAllocation(Settings settings) { + @Inject public SameShardAllocationDecider(Settings settings) { super(settings); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java similarity index 73% rename from modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index 869e24c8f11..e0da196bae3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -17,21 +17,21 @@ * under the License. */ -package org.elasticsearch.cluster.routing.allocation; +package org.elasticsearch.cluster.routing.allocation.decider; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.node.settings.NodeSettingsService; /** - * @author kimchy (shay.banon) */ -public class ThrottlingNodeAllocation extends NodeAllocation { +public class ThrottlingAllocationDecider extends AllocationDecider { static { MetaData.addDynamicSettings( @@ -43,11 +43,11 @@ public class ThrottlingNodeAllocation extends NodeAllocation { private volatile int primariesInitialRecoveries; private volatile int concurrentRecoveries; - @Inject public ThrottlingNodeAllocation(Settings settings, NodeSettingsService nodeSettingsService) { + @Inject public ThrottlingAllocationDecider(Settings settings, NodeSettingsService nodeSettingsService) { super(settings); - this.primariesInitialRecoveries = componentSettings.getAsInt("node_initial_primaries_recoveries", 4); - this.concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", componentSettings.getAsInt("node_concurrent_recoveries", 2)); + this.primariesInitialRecoveries = settings.getAsInt("cluster.routing.allocation.node_initial_primaries_recoveries", settings.getAsInt("cluster.routing.allocation.node_initial_primaries_recoveries", 4)); + this.concurrentRecoveries = settings.getAsInt("cluster.routing.allocation.concurrent_recoveries", settings.getAsInt("cluster.routing.allocation.node_concurrent_recoveries", 2)); logger.debug("using node_concurrent_recoveries [{}], node_initial_primaries_recoveries [{}]", concurrentRecoveries, primariesInitialRecoveries); nodeSettingsService.addListener(new ApplySettings()); @@ -97,16 +97,16 @@ public class ThrottlingNodeAllocation extends NodeAllocation { class ApplySettings implements NodeSettingsService.Listener { @Override public void onRefreshSettings(Settings settings) { - int primariesInitialRecoveries = settings.getAsInt("cluster.routing.allocation.node_initial_primaries_recoveries", ThrottlingNodeAllocation.this.primariesInitialRecoveries); - if (primariesInitialRecoveries != ThrottlingNodeAllocation.this.primariesInitialRecoveries) { - logger.info("updating [cluster.routing.allocation.node_initial_primaries_recoveries] from [{}] to [{}]", ThrottlingNodeAllocation.this.primariesInitialRecoveries, primariesInitialRecoveries); - ThrottlingNodeAllocation.this.primariesInitialRecoveries = primariesInitialRecoveries; + int primariesInitialRecoveries = settings.getAsInt("cluster.routing.allocation.node_initial_primaries_recoveries", ThrottlingAllocationDecider.this.primariesInitialRecoveries); + if (primariesInitialRecoveries != ThrottlingAllocationDecider.this.primariesInitialRecoveries) { + logger.info("updating [cluster.routing.allocation.node_initial_primaries_recoveries] from [{}] to [{}]", ThrottlingAllocationDecider.this.primariesInitialRecoveries, primariesInitialRecoveries); + ThrottlingAllocationDecider.this.primariesInitialRecoveries = primariesInitialRecoveries; } - int concurrentRecoveries = settings.getAsInt("cluster.routing.allocation.node_concurrent_recoveries", ThrottlingNodeAllocation.this.concurrentRecoveries); - if (concurrentRecoveries != ThrottlingNodeAllocation.this.concurrentRecoveries) { - logger.info("updating [cluster.routing.allocation.node_concurrent_recoveries] from [{}] to [{}]", ThrottlingNodeAllocation.this.concurrentRecoveries, concurrentRecoveries); - ThrottlingNodeAllocation.this.concurrentRecoveries = concurrentRecoveries; + int concurrentRecoveries = settings.getAsInt("cluster.routing.allocation.node_concurrent_recoveries", ThrottlingAllocationDecider.this.concurrentRecoveries); + if (concurrentRecoveries != ThrottlingAllocationDecider.this.concurrentRecoveries) { + logger.info("updating [cluster.routing.allocation.node_concurrent_recoveries] from [{}] to [{}]", ThrottlingAllocationDecider.this.concurrentRecoveries, concurrentRecoveries); + ThrottlingAllocationDecider.this.concurrentRecoveries = concurrentRecoveries; } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingGatewayAllocator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingGatewayAllocator.java index d58af81fc46..cfc35ed5709 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingGatewayAllocator.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingGatewayAllocator.java @@ -27,11 +27,10 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; -import org.elasticsearch.cluster.routing.allocation.NodeAllocation; -import org.elasticsearch.cluster.routing.allocation.NodeAllocations; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.component.AbstractComponent; @@ -78,19 +77,19 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme this.listTimeout = componentSettings.getAsTime("list_timeout", TimeValue.timeValueSeconds(30)); } - @Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) { + @Override public void applyStartedShards(StartedRerouteAllocation allocation) { for (ShardRouting shardRouting : allocation.startedShards()) { cachedCommitPoints.remove(shardRouting.shardId()); cachedStores.remove(shardRouting.shardId()); } } - @Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) { + @Override public void applyFailedShards(FailedRerouteAllocation allocation) { cachedCommitPoints.remove(allocation.failedShard().shardId()); cachedStores.remove(allocation.failedShard().shardId()); } - @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) { + @Override public boolean allocateUnassigned(RoutingAllocation allocation) { boolean changed = false; DiscoveryNodes nodes = allocation.nodes(); @@ -116,7 +115,7 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme continue; } // if its THROTTLING, we are not going to allocate it to this node, so ignore it as well - if (nodeAllocations.canAllocate(shard, node, allocation).allocate()) { + if (allocation.deciders().canAllocate(shard, node, allocation).allocate()) { canBeAllocatedToAtLeastOneNode = true; break; } @@ -150,7 +149,7 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme // check if we can allocate on that node... // we only check for NO, since if this node is THROTTLING and it has enough "same data" // then we will try and assign it next time - if (nodeAllocations.canAllocate(shard, node, allocation) == NodeAllocation.Decision.NO) { + if (allocation.deciders().canAllocate(shard, node, allocation) == AllocationDecider.Decision.NO) { continue; } @@ -233,7 +232,7 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme } if (lastNodeMatched != null) { - if (nodeAllocations.canAllocate(shard, lastNodeMatched, allocation) == NodeAllocation.Decision.THROTTLE) { + if (allocation.deciders().canAllocate(shard, lastNodeMatched, allocation) == AllocationDecider.Decision.THROTTLE) { if (logger.isTraceEnabled()) { logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched)); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java index 31517010043..6eecf040f73 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java @@ -28,11 +28,10 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; -import org.elasticsearch.cluster.routing.allocation.NodeAllocation; -import org.elasticsearch.cluster.routing.allocation.NodeAllocations; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.component.AbstractComponent; @@ -86,20 +85,20 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA logger.debug("using initial_shards [{}], list_timeout [{}]", initialShards, listTimeout); } - @Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) { + @Override public void applyStartedShards(StartedRerouteAllocation allocation) { for (ShardRouting shardRouting : allocation.startedShards()) { cachedStores.remove(shardRouting.shardId()); cachedShardsState.remove(shardRouting.shardId()); } } - @Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) { + @Override public void applyFailedShards(FailedRerouteAllocation allocation) { ShardRouting failedShard = allocation.failedShard(); cachedStores.remove(failedShard.shardId()); cachedShardsState.remove(failedShard.shardId()); } - @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) { + @Override public boolean allocateUnassigned(RoutingAllocation allocation) { boolean changed = false; DiscoveryNodes nodes = allocation.nodes(); RoutingNodes routingNodes = allocation.routingNodes(); @@ -189,10 +188,10 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA Set noNodes = Sets.newHashSet(); for (DiscoveryNode discoNode : nodesWithHighestVersion) { RoutingNode node = routingNodes.node(discoNode.id()); - NodeAllocation.Decision decision = nodeAllocations.canAllocate(shard, node, allocation); - if (decision == NodeAllocation.Decision.THROTTLE) { + AllocationDecider.Decision decision = allocation.deciders().canAllocate(shard, node, allocation); + if (decision == AllocationDecider.Decision.THROTTLE) { throttledNodes.add(discoNode); - } else if (decision == NodeAllocation.Decision.NO) { + } else if (decision == AllocationDecider.Decision.NO) { noNodes.add(discoNode); } else { if (logger.isDebugEnabled()) { @@ -252,7 +251,7 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA } // if we can't allocate it on a node, ignore it, for example, this handles // cases for only allocating a replica after a primary - if (nodeAllocations.canAllocate(shard, node, allocation).allocate()) { + if (allocation.deciders().canAllocate(shard, node, allocation).allocate()) { canBeAllocatedToAtLeastOneNode = true; break; } @@ -286,7 +285,7 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA // check if we can allocate on that node... // we only check for NO, since if this node is THROTTLING and it has enough "same data" // then we will try and assign it next time - if (nodeAllocations.canAllocate(shard, node, allocation) == NodeAllocation.Decision.NO) { + if (allocation.deciders().canAllocate(shard, node, allocation) == AllocationDecider.Decision.NO) { continue; } @@ -322,7 +321,7 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA if (lastNodeMatched != null) { // we only check on THROTTLE since we checked before before on NO - if (nodeAllocations.canAllocate(shard, lastNodeMatched, allocation) == NodeAllocation.Decision.THROTTLE) { + if (allocation.deciders().canAllocate(shard, lastNodeMatched, allocation) == AllocationDecider.Decision.THROTTLE) { if (logger.isTraceEnabled()) { logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched)); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGatewayAllocator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGatewayAllocator.java index 7513a7dddd5..81c9739abc0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGatewayAllocator.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGatewayAllocator.java @@ -20,7 +20,6 @@ package org.elasticsearch.gateway.none; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; -import org.elasticsearch.cluster.routing.allocation.NodeAllocations; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator; @@ -29,13 +28,13 @@ import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator; */ public class NoneGatewayAllocator implements GatewayAllocator { - @Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) { + @Override public void applyStartedShards(StartedRerouteAllocation allocation) { } - @Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) { + @Override public void applyFailedShards(FailedRerouteAllocation allocation) { } - @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) { + @Override public boolean allocateUnassigned(RoutingAllocation allocation) { return false; } } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java index 51f20f7f668..289f93ac0e1 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.testng.annotations.Test; @@ -44,7 +45,7 @@ public class ClusterRebalanceRoutingTests { private final ESLogger logger = Loggers.getLogger(ClusterRebalanceRoutingTests.class); @Test public void testAlways() { - AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()).build()); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1)) @@ -129,7 +130,7 @@ public class ClusterRebalanceRoutingTests { @Test public void testClusterPrimariesActive1() { - AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE.toString()).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE.toString()).build()); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1)) @@ -232,7 +233,7 @@ public class ClusterRebalanceRoutingTests { } @Test public void testClusterPrimariesActive2() { - AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE.toString()).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE.toString()).build()); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1)) @@ -315,7 +316,7 @@ public class ClusterRebalanceRoutingTests { } @Test public void testClusterAllActive1() { - AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build()); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1)) @@ -437,7 +438,7 @@ public class ClusterRebalanceRoutingTests { } @Test public void testClusterAllActive2() { - AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build()); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1)) @@ -520,7 +521,7 @@ public class ClusterRebalanceRoutingTests { } @Test public void testClusterAllActive3() { - AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build()); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1)) diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java index 0b5e8f4fcc5..bbdef0290e2 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.testng.annotations.Test; @@ -44,7 +45,7 @@ public class FailedNodeRoutingTests { private final ESLogger logger = Loggers.getLogger(FailedNodeRoutingTests.class); @Test public void simpleFailedNodeTest() { - AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()).build()); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1)) @@ -103,7 +104,7 @@ public class FailedNodeRoutingTests { } @Test public void simpleFailedNodeTestNoReassign() { - AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()).build()); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1)) diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardVersioningTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardVersioningTests.java index 96fdd548125..b9b4ae89d01 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardVersioningTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardVersioningTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.testng.annotations.Test; @@ -44,7 +45,7 @@ public class ShardVersioningTests { private final ESLogger logger = Loggers.getLogger(ShardVersioningTests.class); @Test public void simple() { - AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()).build()); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))