From b5b36215aeaa447e30648e9b21700d1d108857a8 Mon Sep 17 00:00:00 2001 From: kimchy Date: Tue, 21 Sep 2010 11:37:36 +0200 Subject: [PATCH] refactoring in routing logic to allow adding allocation explanation that later can be shown --- .../elasticsearch/cluster/ClusterState.java | 35 +++++- .../action/shard/ShardStateAction.java | 14 +-- .../metadata/MetaDataCreateIndexService.java | 5 +- .../metadata/MetaDataDeleteIndexService.java | 5 +- .../cluster/routing/RoutingService.java | 7 +- .../allocation/AllocationExplanation.java | 105 ++++++++++++++++++ .../allocation/FailedRerouteAllocation.java | 43 +++++++ .../routing/allocation/NodeAllocation.java | 14 +-- .../routing/allocation/NodeAllocations.java | 33 +++--- ...RebalanceOnlyWhenActiveNodeAllocation.java | 6 +- ...plicaAfterPrimaryActiveNodeAllocation.java | 10 +- .../routing/allocation/RoutingAllocation.java | 84 ++++++++++++++ .../allocation/SameShardNodeAllocation.java | 8 +- .../routing/allocation/ShardsAllocation.java | 85 +++++++------- .../allocation/StartedRerouteAllocation.java | 43 +++++++ .../allocation/ThrottlingNodeAllocation.java | 14 +-- .../BlobReuseExistingNodeAllocation.java | 27 ++--- .../local/LocalGatewayNodeAllocation.java | 34 +++--- ...ReplicaAsPrimaryDuringRelocationTests.java | 10 +- .../allocation/FailedShardsRoutingTests.java | 26 ++--- .../PrimaryElectionRoutingTests.java | 8 +- ...yNotRelocatedWhileBeingRecoveredTests.java | 8 +- .../allocation/RebalanceAfterActiveTests.java | 12 +- .../ReplicaAllocatedAfterPrimaryTests.java | 4 +- .../SingleShardNoReplicasRoutingTests.java | 32 +++--- .../SingleShardOneReplicaRoutingTests.java | 14 +-- .../TenShardsOneReplicaRoutingTests.java | 14 +-- .../allocation/ThrottlingAllocationTests.java | 22 ++-- .../UpdateNumberOfReplicasTests.java | 12 +- .../ClusterSerializationTests.java | 4 +- 30 files changed, 512 insertions(+), 226 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationExplanation.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterState.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterState.java index d0c2e6aa830..55caa39aa2e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -25,6 +25,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.AllocationExplanation; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.io.stream.*; import org.elasticsearch.common.settings.Settings; @@ -46,19 +48,22 @@ public class ClusterState { private final ClusterBlocks blocks; + private final AllocationExplanation allocationExplanation; + // built on demand private volatile RoutingNodes routingNodes; public ClusterState(long version, ClusterState state) { - this(version, state.metaData(), state.routingTable(), state.nodes(), state.blocks()); + this(version, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), state.allocationExplanation()); } - public ClusterState(long version, MetaData metaData, RoutingTable routingTable, DiscoveryNodes nodes, ClusterBlocks blocks) { + public ClusterState(long version, MetaData metaData, RoutingTable routingTable, DiscoveryNodes nodes, ClusterBlocks blocks, AllocationExplanation allocationExplanation) { this.version = version; this.metaData = metaData; this.routingTable = routingTable; this.nodes = nodes; this.blocks = blocks; + this.allocationExplanation = allocationExplanation; } public long version() { @@ -109,6 +114,14 @@ public class ClusterState { return blocks; } + public AllocationExplanation allocationExplanation() { + return this.allocationExplanation; + } + + public AllocationExplanation getAllocationExplanation() { + return allocationExplanation(); + } + /** * Returns a built (on demand) routing nodes view of the routing table. NOTE, the routing nodes * are mutable, use them just for read operations @@ -141,6 +154,8 @@ public class ClusterState { private ClusterBlocks blocks = ClusterBlocks.EMPTY_CLUSTER_BLOCK; + private AllocationExplanation allocationExplanation = AllocationExplanation.EMPTY; + public Builder nodes(DiscoveryNodes.Builder nodesBuilder) { return nodes(nodesBuilder.build()); } @@ -154,6 +169,12 @@ public class ClusterState { return routingTable(routingTable.build()); } + public Builder routingResult(RoutingAllocation.Result routingResult) { + this.routingTable = routingResult.routingTable(); + this.allocationExplanation = routingResult.explanation(); + return this; + } + public Builder routingTable(RoutingTable routingTable) { this.routingTable = routingTable; return this; @@ -177,6 +198,11 @@ public class ClusterState { return this; } + public Builder allocationExplanation(AllocationExplanation allocationExplanation) { + this.allocationExplanation = allocationExplanation; + return this; + } + public Builder version(long version) { this.version = version; return this; @@ -188,11 +214,12 @@ public class ClusterState { this.routingTable = state.routingTable(); this.metaData = state.metaData(); this.blocks = state.blocks(); + this.allocationExplanation = state.allocationExplanation(); return this; } public ClusterState build() { - return new ClusterState(version, metaData, routingTable, nodes, blocks); + return new ClusterState(version, metaData, routingTable, nodes, blocks, allocationExplanation); } public static byte[] toBytes(ClusterState state) throws IOException { @@ -211,6 +238,7 @@ public class ClusterState { RoutingTable.Builder.writeTo(state.routingTable(), out); DiscoveryNodes.Builder.writeTo(state.nodes(), out); ClusterBlocks.Builder.writeClusterBlocks(state.blocks(), out); + state.allocationExplanation().writeTo(out); } public static ClusterState readFrom(StreamInput in, @Nullable Settings globalSettings, @Nullable DiscoveryNode localNode) throws IOException { @@ -220,6 +248,7 @@ public class ClusterState { builder.routingTable = RoutingTable.Builder.readFrom(in); builder.nodes = DiscoveryNodes.Builder.readFrom(in, localNode); builder.blocks = ClusterBlocks.Builder.readClusterBlocks(in); + builder.allocationExplanation = AllocationExplanation.readAllocationExplanation(in); return builder.build(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 0a3dfe0a1ec..c24ffb45402 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.ShardsAllocation; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -124,12 +125,11 @@ public class ShardStateAction extends AbstractComponent { if (logger.isDebugEnabled()) { logger.debug("Applying failed shard {}, reason [{}]", shardRouting, reason); } - RoutingTable prevRoutingTable = currentState.routingTable(); - RoutingTable newRoutingTable = shardsAllocation.applyFailedShards(currentState, newArrayList(shardRouting)); - if (prevRoutingTable == newRoutingTable) { + RoutingAllocation.Result routingResult = shardsAllocation.applyFailedShards(currentState, newArrayList(shardRouting)); + if (!routingResult.changed()) { return currentState; } - return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).build(); + return newClusterStateBuilder().state(currentState).routingResult(routingResult).build(); } }); } @@ -163,11 +163,11 @@ public class ShardStateAction extends AbstractComponent { if (logger.isDebugEnabled()) { logger.debug("applying started shard {}, reason [{}]", shardRouting, reason); } - RoutingTable newRoutingTable = shardsAllocation.applyStartedShards(currentState, newArrayList(shardRouting)); - if (routingTable == newRoutingTable) { + RoutingAllocation.Result routingResult = shardsAllocation.applyStartedShards(currentState, newArrayList(shardRouting)); + if (!routingResult.changed()) { return currentState; } - return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).build(); + return newClusterStateBuilder().state(currentState).routingResult(routingResult).build(); } }); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index f43440d409e..e7f4fe97506 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.ShardsAllocation; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Maps; @@ -295,8 +296,8 @@ public class MetaDataCreateIndexService extends AbstractComponent { IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(request.index) .initializeEmpty(currentState.metaData().index(request.index)); routingTableBuilder.add(indexRoutingBuilder); - RoutingTable newRoutingTable = shardsAllocation.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).build()); - return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).build(); + RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).build()); + return newClusterStateBuilder().state(currentState).routingResult(routingResult).build(); } @Override public void clusterStateProcessed(ClusterState clusterState) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java index b69d227ee0c..d340845005a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.ShardsAllocation; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -89,7 +90,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent { .remove(request.index) .build(); - RoutingTable newRoutingTable = shardsAllocation.reroute( + RoutingAllocation.Result routingResult = shardsAllocation.reroute( newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).metaData(newMetaData).build()); ClusterBlocks blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(request.index).build(); @@ -117,7 +118,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent { listener.timeout = timeoutTask; - return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).metaData(newMetaData).blocks(blocks).build(); + return newClusterStateBuilder().state(currentState).routingResult(routingResult).metaData(newMetaData).blocks(blocks).build(); } catch (Exception e) { listener.onFailure(e); return currentState; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index f9ef6df33de..49973e1a11d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.*; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.ShardsAllocation; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -120,12 +121,12 @@ public class RoutingService extends AbstractLifecycleComponent i } clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - RoutingTable newRoutingTable = shardsAllocation.reroute(currentState); - if (newRoutingTable == currentState.routingTable()) { + RoutingAllocation.Result routingResult = shardsAllocation.reroute(currentState); + if (!routingResult.changed()) { // no state changed return currentState; } - return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).build(); + return newClusterStateBuilder().state(currentState).routingResult(routingResult).build(); } }); routingTableDirty = false; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationExplanation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationExplanation.java new file mode 100644 index 00000000000..d178177551a --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationExplanation.java @@ -0,0 +1,105 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.Lists; +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * @author kimchy (shay.banon) + */ +public class AllocationExplanation implements Streamable { + + public static final AllocationExplanation EMPTY = new AllocationExplanation(); + + public static class NodeExplanation { + private final DiscoveryNode node; + + private final String description; + + public NodeExplanation(DiscoveryNode node, String description) { + this.node = node; + this.description = description; + } + + public DiscoveryNode node() { + return node; + } + + public String description() { + return description; + } + } + + private final Map> explanations = Maps.newHashMap(); + + public AllocationExplanation add(ShardId shardId, NodeExplanation nodeExplanation) { + List list = explanations.get(shardId); + if (list == null) { + list = Lists.newArrayList(); + explanations.put(shardId, list); + } + list.add(nodeExplanation); + return this; + } + + public Map> explanations() { + return this.explanations; + } + + public static AllocationExplanation readAllocationExplanation(StreamInput in) throws IOException { + AllocationExplanation e = new AllocationExplanation(); + e.readFrom(in); + return e; + } + + @Override public void readFrom(StreamInput in) throws IOException { + int size = in.readVInt(); + for (int i = 0; i < size; i++) { + ShardId shardId = ShardId.readShardId(in); + int size2 = in.readVInt(); + List ne = Lists.newArrayListWithCapacity(size2); + for (int j = 0; j < size2; j++) { + ne.add(new NodeExplanation(DiscoveryNode.readNode(in), in.readUTF())); + } + } + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(explanations.size()); + for (Map.Entry> entry : explanations.entrySet()) { + entry.getKey().writeTo(out); + out.writeVInt(entry.getValue().size()); + for (NodeExplanation nodeExplanation : entry.getValue()) { + nodeExplanation.node().writeTo(out); + out.writeUTF(nodeExplanation.description()); + } + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java new file mode 100644 index 00000000000..2be256645ef --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.ShardRouting; + +import java.util.List; + +/** + * @author kimchy (shay.banon) + */ +public class FailedRerouteAllocation extends RoutingAllocation { + + private final List failedShards; + + public FailedRerouteAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes, List failedShards) { + super(routingNodes, nodes); + this.failedShards = failedShards; + } + + public List failedShards() { + return failedShards; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocation.java index 27347cc216c..cf24f48b89a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocation.java @@ -19,15 +19,11 @@ package org.elasticsearch.cluster.routing.allocation; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; -import java.util.List; - /** * A pluggable logic allowing to control if allocation of a shard is allowed on a specific node. * @@ -56,22 +52,22 @@ public abstract class NodeAllocation extends AbstractComponent { super(settings); } - public void applyStartedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List startedShards) { + public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) { } - public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List failedShards) { + public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) { } - public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { + public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) { return false; } - public boolean canRebalance(ShardRouting shardRouting, RoutingNodes routingNodes, DiscoveryNodes nodes) { + public boolean canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { return true; } - public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) { + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { return Decision.YES; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java index 4e42daa92a3..abb9645e0cf 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java @@ -19,15 +19,12 @@ package org.elasticsearch.cluster.routing.allocation; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import java.util.List; import java.util.Set; /** @@ -54,39 +51,39 @@ public class NodeAllocations extends NodeAllocation { this.allocations = allocations.toArray(new NodeAllocation[allocations.size()]); } - @Override public void applyStartedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List startedShards) { - for (NodeAllocation allocation : allocations) { - allocation.applyStartedShards(nodeAllocations, routingNodes, nodes, startedShards); + @Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) { + for (NodeAllocation allocation1 : allocations) { + allocation1.applyStartedShards(nodeAllocations, allocation); } } - @Override public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List failedShards) { - for (NodeAllocation allocation : allocations) { - allocation.applyFailedShards(nodeAllocations, routingNodes, nodes, failedShards); + @Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) { + for (NodeAllocation allocation1 : allocations) { + allocation1.applyFailedShards(nodeAllocations, allocation); } } - @Override public boolean canRebalance(ShardRouting shardRouting, RoutingNodes routingNodes, DiscoveryNodes nodes) { - for (NodeAllocation allocation : allocations) { - if (!allocation.canRebalance(shardRouting, routingNodes, nodes)) { + @Override public boolean canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { + for (NodeAllocation allocation1 : allocations) { + if (!allocation1.canRebalance(shardRouting, allocation)) { return false; } } return true; } - @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { + @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) { boolean changed = false; - for (NodeAllocation allocation : allocations) { - changed |= allocation.allocateUnassigned(nodeAllocations, routingNodes, nodes); + for (NodeAllocation allocation1 : allocations) { + changed |= allocation1.allocateUnassigned(nodeAllocations, allocation); } return changed; } - @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) { + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { Decision ret = Decision.YES; - for (NodeAllocation allocation : allocations) { - Decision decision = allocation.canAllocate(shardRouting, node, routingNodes); + for (NodeAllocation allocation1 : allocations) { + Decision decision = allocation1.canAllocate(shardRouting, node, allocation); if (decision == Decision.NO) { return Decision.NO; } else if (decision == Decision.THROTTLE) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/RebalanceOnlyWhenActiveNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/RebalanceOnlyWhenActiveNodeAllocation.java index 4f6422ee654..2efae2ed95c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/RebalanceOnlyWhenActiveNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/RebalanceOnlyWhenActiveNodeAllocation.java @@ -19,9 +19,7 @@ package org.elasticsearch.cluster.routing.allocation; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.MutableShardRouting; -import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -39,8 +37,8 @@ public class RebalanceOnlyWhenActiveNodeAllocation extends NodeAllocation { super(settings); } - @Override public boolean canRebalance(ShardRouting shardRouting, RoutingNodes routingNodes, DiscoveryNodes nodes) { - List shards = routingNodes.shardsRoutingFor(shardRouting); + @Override public boolean canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { + List shards = allocation.routingNodes().shardsRoutingFor(shardRouting); // its ok to check for active here, since in relocation, a shard is split into two in routing // nodes, once relocating, and one initializing for (ShardRouting allShard : shards) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ReplicaAfterPrimaryActiveNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ReplicaAfterPrimaryActiveNodeAllocation.java index 0705fce4d47..53486c457c1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ReplicaAfterPrimaryActiveNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ReplicaAfterPrimaryActiveNodeAllocation.java @@ -19,10 +19,8 @@ package org.elasticsearch.cluster.routing.allocation; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -38,15 +36,11 @@ public class ReplicaAfterPrimaryActiveNodeAllocation extends NodeAllocation { super(settings); } - @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { - return false; - } - - @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) { + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { if (shardRouting.primary()) { return Decision.YES; } - MutableShardRouting primary = routingNodes.findPrimaryForReplica(shardRouting); + MutableShardRouting primary = allocation.routingNodes().findPrimaryForReplica(shardRouting); if (primary == null || !primary.active()) { return Decision.NO; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java new file mode 100644 index 00000000000..073bf7e2480 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -0,0 +1,84 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingTable; + +/** + * @author kimchy (shay.banon) + */ +public class RoutingAllocation { + + public static class Result { + + private final boolean changed; + + private final RoutingTable routingTable; + + private final AllocationExplanation explanation; + + public Result(boolean changed, RoutingTable routingTable, AllocationExplanation explanation) { + this.changed = changed; + this.routingTable = routingTable; + this.explanation = explanation; + } + + public boolean changed() { + return this.changed; + } + + public RoutingTable routingTable() { + return routingTable; + } + + public AllocationExplanation explanation() { + return explanation; + } + } + + private final RoutingNodes routingNodes; + + private final DiscoveryNodes nodes; + + private final AllocationExplanation explanation = new AllocationExplanation(); + + public RoutingAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes) { + this.routingNodes = routingNodes; + this.nodes = nodes; + } + + public RoutingTable routingTable() { + return routingNodes.routingTable(); + } + + public RoutingNodes routingNodes() { + return routingNodes; + } + + public DiscoveryNodes nodes() { + return nodes; + } + + public AllocationExplanation explanation() { + return explanation; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/SameShardNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/SameShardNodeAllocation.java index 12cd65016bc..3277c800c3f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/SameShardNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/SameShardNodeAllocation.java @@ -19,10 +19,8 @@ package org.elasticsearch.cluster.routing.allocation; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -38,11 +36,7 @@ public class SameShardNodeAllocation extends NodeAllocation { super(settings); } - @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { - return false; - } - - @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) { + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { for (MutableShardRouting current : node.shards()) { // we do not allow for two shards of the same shard id to exists on the same node if (current.shardId().equals(shardRouting.shardId())) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java index 5d78917b3fa..2c0ef7b64a3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java @@ -21,7 +21,6 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.*; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -60,15 +59,16 @@ public class ShardsAllocation extends AbstractComponent { * *

If the same instance of the routing table is returned, then no change has been made. */ - public RoutingTable applyStartedShards(ClusterState clusterState, List startedShards) { + public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List startedShards) { RoutingNodes routingNodes = clusterState.routingNodes(); - nodeAllocations.applyStartedShards(nodeAllocations, routingNodes, clusterState.nodes(), startedShards); + StartedRerouteAllocation allocation = new StartedRerouteAllocation(routingNodes, clusterState.nodes(), startedShards); + nodeAllocations.applyStartedShards(nodeAllocations, allocation); boolean changed = applyStartedShards(routingNodes, startedShards); if (!changed) { - return clusterState.routingTable(); + return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation()); } - reroute(routingNodes, clusterState.nodes()); - return new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()); + reroute(allocation); + return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation()); } /** @@ -76,16 +76,17 @@ public class ShardsAllocation extends AbstractComponent { * *

If the same instance of the routing table is returned, then no change has been made. */ - public RoutingTable applyFailedShards(ClusterState clusterState, List failedShards) { + public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List failedShards) { RoutingNodes routingNodes = clusterState.routingNodes(); - nodeAllocations.applyFailedShards(nodeAllocations, routingNodes, clusterState.nodes(), failedShards); - boolean changed = applyFailedShards(routingNodes, failedShards); + FailedRerouteAllocation allocation = new FailedRerouteAllocation(routingNodes, clusterState.nodes(), failedShards); + nodeAllocations.applyFailedShards(nodeAllocations, allocation); + boolean changed = applyFailedShards(allocation); if (!changed) { - return clusterState.routingTable(); + return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation()); } // If we reroute again, the failed shard will try and be assigned to the same node, which we do no do in the applyFailedShards // reroute(routingNodes, clusterState.nodes()); - return new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()); + return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation()); } /** @@ -93,45 +94,46 @@ public class ShardsAllocation extends AbstractComponent { * *

If the same instance of the routing table is returned, then no change has been made. */ - public RoutingTable reroute(ClusterState clusterState) { + public RoutingAllocation.Result reroute(ClusterState clusterState) { RoutingNodes routingNodes = clusterState.routingNodes(); - if (!reroute(routingNodes, clusterState.nodes())) { - return clusterState.routingTable(); + RoutingAllocation allocation = new RoutingAllocation(routingNodes, clusterState.nodes()); + if (!reroute(allocation)) { + return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation()); } - return new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()); + return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation()); } - private boolean reroute(RoutingNodes routingNodes, DiscoveryNodes nodes) { - Iterable dataNodes = nodes.dataNodes().values(); + private boolean reroute(RoutingAllocation allocation) { + Iterable dataNodes = allocation.nodes().dataNodes().values(); boolean changed = false; // first, clear from the shards any node id they used to belong to that is now dead - changed |= deassociateDeadNodes(routingNodes, dataNodes); + changed |= deassociateDeadNodes(allocation.routingNodes(), dataNodes); // create a sorted list of from nodes with least number of shards to the maximum ones - applyNewNodes(routingNodes, dataNodes); + applyNewNodes(allocation.routingNodes(), dataNodes); // elect primaries *before* allocating unassigned, so backups of primaries that failed // will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*) - changed |= electPrimaries(routingNodes); + changed |= electPrimaries(allocation.routingNodes()); // now allocate all the unassigned to available nodes - if (routingNodes.hasUnassigned()) { - changed |= nodeAllocations.allocateUnassigned(nodeAllocations, routingNodes, nodes); - changed |= allocateUnassigned(routingNodes); + if (allocation.routingNodes().hasUnassigned()) { + changed |= nodeAllocations.allocateUnassigned(nodeAllocations, allocation); + changed |= allocateUnassigned(allocation); // elect primaries again, in case this is needed with unassigned allocation - changed |= electPrimaries(routingNodes); + changed |= electPrimaries(allocation.routingNodes()); } // rebalance - changed |= rebalance(routingNodes, nodes); + changed |= rebalance(allocation); return changed; } - private boolean rebalance(RoutingNodes routingNodes, DiscoveryNodes nodes) { + private boolean rebalance(RoutingAllocation allocation) { boolean changed = false; - List sortedNodesLeastToHigh = routingNodes.sortedNodesLeastToHigh(); + List sortedNodesLeastToHigh = allocation.routingNodes().sortedNodesLeastToHigh(); if (sortedNodesLeastToHigh.isEmpty()) { return false; } @@ -143,7 +145,7 @@ public class ShardsAllocation extends AbstractComponent { while (lowIndex != highIndex) { RoutingNode lowRoutingNode = sortedNodesLeastToHigh.get(lowIndex); RoutingNode highRoutingNode = sortedNodesLeastToHigh.get(highIndex); - int averageNumOfShards = routingNodes.requiredAverageNumberOfShardsPerNode(); + int averageNumOfShards = allocation.routingNodes().requiredAverageNumberOfShardsPerNode(); // only active shards can be removed so must count only active ones. if (highRoutingNode.numberOfOwningShards() <= averageNumOfShards) { @@ -159,11 +161,11 @@ public class ShardsAllocation extends AbstractComponent { boolean relocated = false; List startedShards = highRoutingNode.shardsWithState(STARTED); for (MutableShardRouting startedShard : startedShards) { - if (!nodeAllocations.canRebalance(startedShard, routingNodes, nodes)) { + if (!nodeAllocations.canRebalance(startedShard, allocation)) { continue; } - if (nodeAllocations.canAllocate(startedShard, lowRoutingNode, routingNodes).allocate()) { + if (nodeAllocations.canAllocate(startedShard, lowRoutingNode, allocation).allocate()) { changed = true; lowRoutingNode.add(new MutableShardRouting(startedShard.index(), startedShard.id(), lowRoutingNode.nodeId(), startedShard.currentNodeId(), @@ -214,8 +216,11 @@ public class ShardsAllocation extends AbstractComponent { return changed; } - private boolean allocateUnassigned(RoutingNodes routingNodes) { + private boolean allocateUnassigned(RoutingAllocation allocation) { boolean changed = false; + RoutingNodes routingNodes = allocation.routingNodes(); + + List nodes = routingNodes.sortedNodesLeastToHigh(); Iterator unassignedIterator = routingNodes.unassigned().iterator(); @@ -231,7 +236,7 @@ public class ShardsAllocation extends AbstractComponent { lastNode = 0; } - if (nodeAllocations.canAllocate(shard, node, routingNodes).allocate()) { + if (nodeAllocations.canAllocate(shard, node, allocation).allocate()) { int numberOfShardsToAllocate = routingNodes.requiredAverageNumberOfShardsPerNode() - node.shards().size(); if (numberOfShardsToAllocate <= 0) { continue; @@ -250,7 +255,7 @@ public class ShardsAllocation extends AbstractComponent { MutableShardRouting shard = it.next(); // go over the nodes and try and allocate the remaining ones for (RoutingNode routingNode : routingNodes.sortedNodesLeastToHigh()) { - if (nodeAllocations.canAllocate(shard, routingNode, routingNodes).allocate()) { + if (nodeAllocations.canAllocate(shard, routingNode, allocation).allocate()) { changed = true; routingNode.add(shard); it.remove(); @@ -379,15 +384,15 @@ public class ShardsAllocation extends AbstractComponent { return dirty; } - private boolean applyFailedShards(RoutingNodes routingNodes, Iterable failedShardEntries) { + private boolean applyFailedShards(FailedRerouteAllocation allocation) { boolean dirty = false; // apply shards might be called several times with the same shard, ignore it - for (ShardRouting failedShard : failedShardEntries) { + for (ShardRouting failedShard : allocation.failedShards()) { boolean shardDirty = false; boolean inRelocation = failedShard.relocatingNodeId() != null; if (inRelocation) { - RoutingNode routingNode = routingNodes.nodesToShards().get(failedShard.currentNodeId()); + RoutingNode routingNode = allocation.routingNodes().nodesToShards().get(failedShard.currentNodeId()); if (routingNode != null) { Iterator shards = routingNode.iterator(); while (shards.hasNext()) { @@ -403,7 +408,7 @@ public class ShardsAllocation extends AbstractComponent { } String nodeId = inRelocation ? failedShard.relocatingNodeId() : failedShard.currentNodeId(); - RoutingNode currentRoutingNode = routingNodes.nodesToShards().get(nodeId); + RoutingNode currentRoutingNode = allocation.routingNodes().nodesToShards().get(nodeId); if (currentRoutingNode == null) { // already failed (might be called several times for the same shard) @@ -439,10 +444,10 @@ public class ShardsAllocation extends AbstractComponent { // not in relocation so find a new target. boolean allocated = false; - List sortedNodesLeastToHigh = routingNodes.sortedNodesLeastToHigh(); + List sortedNodesLeastToHigh = allocation.routingNodes().sortedNodesLeastToHigh(); for (RoutingNode target : sortedNodesLeastToHigh) { if (!target.nodeId().equals(failedShard.currentNodeId()) && - nodeAllocations.canAllocate(failedShard, target, routingNodes).allocate()) { + nodeAllocations.canAllocate(failedShard, target, allocation).allocate()) { target.add(new MutableShardRouting(failedShard.index(), failedShard.id(), target.nodeId(), failedShard.relocatingNodeId(), failedShard.primary(), INITIALIZING)); @@ -452,7 +457,7 @@ public class ShardsAllocation extends AbstractComponent { } if (!allocated) { // we did not manage to allocate it, put it in the unassigned - routingNodes.unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(), + allocation.routingNodes().unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(), null, failedShard.primary(), ShardRoutingState.UNASSIGNED)); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java new file mode 100644 index 00000000000..74ac249ca89 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.ShardRouting; + +import java.util.List; + +/** + * @author kimchy (shay.banon) + */ +public class StartedRerouteAllocation extends RoutingAllocation { + + private final List startedShards; + + public StartedRerouteAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes, List startedShards) { + super(routingNodes, nodes); + this.startedShards = startedShards; + } + + public List startedShards() { + return startedShards; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java index 9522353e16e..ba0ee3277e8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java @@ -19,8 +19,10 @@ package org.elasticsearch.cluster.routing.allocation; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -37,14 +39,10 @@ public class ThrottlingNodeAllocation extends NodeAllocation { this.concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", Runtime.getRuntime().availableProcessors() + 1); } - @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { - return false; - } - - @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) { + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { if (shardRouting.primary()) { boolean primaryUnassigned = false; - for (MutableShardRouting shard : routingNodes.unassigned()) { + for (MutableShardRouting shard : allocation.routingNodes().unassigned()) { if (shard.shardId().equals(shardRouting.shardId())) { primaryUnassigned = true; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java index 0b8f84c1a8a..ad8cf43dab0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java @@ -26,8 +26,7 @@ import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.allocation.NodeAllocation; -import org.elasticsearch.cluster.routing.allocation.NodeAllocations; +import org.elasticsearch.cluster.routing.allocation.*; import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -45,7 +44,6 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; import org.elasticsearch.transport.ConnectTransportException; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; @@ -74,23 +72,26 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { this.listTimeout = componentSettings.getAsTime("list_timeout", TimeValue.timeValueSeconds(30)); } - @Override public void applyStartedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List startedShards) { - for (ShardRouting shardRouting : startedShards) { + @Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) { + for (ShardRouting shardRouting : allocation.startedShards()) { cachedCommitPoints.remove(shardRouting.shardId()); cachedStores.remove(shardRouting.shardId()); } } - @Override public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List failedShards) { - for (ShardRouting shardRouting : failedShards) { + @Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) { + for (ShardRouting shardRouting : allocation.failedShards()) { cachedCommitPoints.remove(shardRouting.shardId()); cachedStores.remove(shardRouting.shardId()); } } - @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { + @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) { boolean changed = false; + DiscoveryNodes nodes = allocation.nodes(); + RoutingNodes routingNodes = allocation.routingNodes(); + if (nodes.dataNodes().isEmpty()) { return changed; } @@ -119,7 +120,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { continue; } // if its THROTTLING, we are not going to allocate it to this node, so ignore it as well - if (nodeAllocations.canAllocate(shard, node, routingNodes).allocate()) { + if (nodeAllocations.canAllocate(shard, node, allocation).allocate()) { canBeAllocatedToAtLeastOneNode = true; break; } @@ -153,7 +154,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { // check if we can allocate on that node... // we only check for NO, since if this node is THROTTLING and it has enough "same data" // then we will try and assign it next time - if (nodeAllocations.canAllocate(shard, node, routingNodes) == Decision.NO) { + if (nodeAllocations.canAllocate(shard, node, allocation) == Decision.NO) { continue; } @@ -249,7 +250,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { } if (lastNodeMatched != null) { - if (nodeAllocations.canAllocate(shard, lastNodeMatched, routingNodes) == NodeAllocation.Decision.THROTTLE) { + if (nodeAllocations.canAllocate(shard, lastNodeMatched, allocation) == NodeAllocation.Decision.THROTTLE) { if (logger.isTraceEnabled()) { logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched)); } @@ -333,8 +334,4 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { } return shardStores; } - - @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) { - return Decision.YES; - } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java index 9dd2619d383..f3c234312ae 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java @@ -25,8 +25,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.*; -import org.elasticsearch.cluster.routing.allocation.NodeAllocation; -import org.elasticsearch.cluster.routing.allocation.NodeAllocations; +import org.elasticsearch.cluster.routing.allocation.*; import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.collect.Tuple; @@ -47,7 +46,6 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; import org.elasticsearch.transport.ConnectTransportException; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; @@ -82,26 +80,26 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { this.initialShards = componentSettings.get("initial_shards", "quorum"); } - @Override public void applyStartedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List startedShards) { - for (ShardRouting shardRouting : startedShards) { + @Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) { + for (ShardRouting shardRouting : allocation.startedShards()) { cachedStores.remove(shardRouting.shardId()); } } - @Override public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List failedShards) { - for (ShardRouting shardRouting : failedShards) { + @Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) { + for (ShardRouting shardRouting : allocation.failedShards()) { cachedStores.remove(shardRouting.shardId()); } - for (ShardRouting failedShard : failedShards) { - IndexRoutingTable indexRoutingTable = routingNodes.routingTable().index(failedShard.index()); - if (!routingNodes.blocks().hasIndexBlock(indexRoutingTable.index(), LocalGateway.INDEX_NOT_RECOVERED_BLOCK)) { + for (ShardRouting failedShard : allocation.failedShards()) { + IndexRoutingTable indexRoutingTable = allocation.routingTable().index(failedShard.index()); + if (!allocation.routingNodes().blocks().hasIndexBlock(indexRoutingTable.index(), LocalGateway.INDEX_NOT_RECOVERED_BLOCK)) { continue; } // we are still in the initial allocation, find another node with existing shards // all primary are unassigned for the index, see if we can allocate it on existing nodes, if not, don't assign Set nodesIds = Sets.newHashSet(); - nodesIds.addAll(nodes.dataNodes().keySet()); + nodesIds.addAll(allocation.nodes().dataNodes().keySet()); TransportNodesListGatewayStartedShards.NodesLocalGatewayStartedShards nodesState = listGatewayStartedShards.list(nodesIds, null).actionGet(); // make a list of ShardId to Node, each one from the latest version @@ -125,7 +123,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { } if (t != null) { // we found a node to allocate to, do it - RoutingNode currentRoutingNode = routingNodes.nodesToShards().get(failedShard.currentNodeId()); + RoutingNode currentRoutingNode = allocation.routingNodes().nodesToShards().get(failedShard.currentNodeId()); if (currentRoutingNode == null) { // already failed (might be called several times for the same shard) continue; @@ -142,7 +140,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { } } - RoutingNode targetNode = routingNodes.nodesToShards().get(t.v1().id()); + RoutingNode targetNode = allocation.routingNodes().nodesToShards().get(t.v1().id()); targetNode.add(new MutableShardRouting(failedShard.index(), failedShard.id(), targetNode.nodeId(), failedShard.relocatingNodeId(), failedShard.primary(), INITIALIZING)); @@ -150,8 +148,10 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { } } - @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { + @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) { boolean changed = false; + DiscoveryNodes nodes = allocation.nodes(); + RoutingNodes routingNodes = allocation.routingNodes(); for (IndexRoutingTable indexRoutingTable : routingNodes.routingTable()) { // only do the allocation if there is a local "INDEX NOT RECOVERED" block @@ -266,7 +266,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { continue; } // if its THROTTLING, we are not going to allocate it to this node, so ignore it as well - if (nodeAllocations.canAllocate(shard, node, routingNodes).allocate()) { + if (nodeAllocations.canAllocate(shard, node, allocation).allocate()) { canBeAllocatedToAtLeastOneNode = true; break; } @@ -300,7 +300,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { // check if we can allocate on that node... // we only check for NO, since if this node is THROTTLING and it has enough "same data" // then we will try and assign it next time - if (nodeAllocations.canAllocate(shard, node, routingNodes) == Decision.NO) { + if (nodeAllocations.canAllocate(shard, node, allocation) == Decision.NO) { continue; } @@ -335,7 +335,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { } if (lastNodeMatched != null) { - if (nodeAllocations.canAllocate(shard, lastNodeMatched, routingNodes) == NodeAllocation.Decision.THROTTLE) { + if (nodeAllocations.canAllocate(shard, lastNodeMatched, allocation) == NodeAllocation.Decision.THROTTLE) { if (logger.isTraceEnabled()) { logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched)); } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java index 657da2adcc5..e0f209bdfbc 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java @@ -65,19 +65,19 @@ public class ElectReplicaAsPrimaryDuringRelocationTests { logger.info("Adding two nodes and performing rerouting"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build(); RoutingTable prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("Start the primary shards"); RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("Start the replica shards"); routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes(); @@ -89,7 +89,7 @@ public class ElectReplicaAsPrimaryDuringRelocationTests { logger.info("Start another node and perform rerouting"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3"))).build(); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("find the replica shard that gets relocated"); @@ -105,7 +105,7 @@ public class ElectReplicaAsPrimaryDuringRelocationTests { logger.info("kill the node [{}] of the primary shard for the relocating replica", indexShardRoutingTable.primaryShard().currentNodeId()); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).remove(indexShardRoutingTable.primaryShard().currentNodeId())).build(); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("make sure all the primary shards are active"); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java index ee32fc0c2fe..ef419346772 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -68,13 +68,13 @@ public class FailedShardsRoutingTests { logger.info("Adding two nodes and performing rerouting"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build(); RoutingTable prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("Start the shards (primaries)"); RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -92,7 +92,7 @@ public class FailedShardsRoutingTests { logger.info("Start the shards (backups)"); routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -110,7 +110,7 @@ public class FailedShardsRoutingTests { logger.info("Adding third node and reroute"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3"))).build(); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes(); @@ -125,7 +125,7 @@ public class FailedShardsRoutingTests { logger.info("Fail the shards on node 3"); routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyFailedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING)); + routingTable = strategy.applyFailedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes(); @@ -137,7 +137,7 @@ public class FailedShardsRoutingTests { logger.info("Do another reroute, should try and assign again to node 3"); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes(); @@ -152,7 +152,7 @@ public class FailedShardsRoutingTests { logger.info("Start the shards on node 3"); routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes(); @@ -192,7 +192,7 @@ public class FailedShardsRoutingTests { clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build(); RoutingTable prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -210,7 +210,7 @@ public class FailedShardsRoutingTests { logger.info("Add another node and perform rerouting"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build(); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); // nothing will change, since primary shards have not started yet @@ -229,7 +229,7 @@ public class FailedShardsRoutingTests { logger.info("Start the primary shards"); RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -246,14 +246,14 @@ public class FailedShardsRoutingTests { logger.info("Reroute, nothing should change"); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); assertThat(prevRoutingTable == routingTable, equalTo(true)); logger.info("Fail backup shards on node2"); routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; List failedShards = routingNodes.node("node2").shardsWithState(INITIALIZING); - routingTable = strategy.applyFailedShards(clusterState, failedShards); + routingTable = strategy.applyFailedShards(clusterState, failedShards).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes(); @@ -270,7 +270,7 @@ public class FailedShardsRoutingTests { } // fail them again... - routingTable = strategy.applyFailedShards(clusterState, failedShards); + routingTable = strategy.applyFailedShards(clusterState, failedShards).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes(); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java index 1a6f4d40823..be52324394e 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java @@ -64,25 +64,25 @@ public class PrimaryElectionRoutingTests { logger.info("Adding two nodes and performing rerouting"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build(); RoutingTable prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("Start the primary shard (on node1)"); RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("Start the backup shard (on node2)"); routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("Adding third node and reroute and kill first node"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3")).remove("node1")).build(); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes(); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java index 01b31ccd45a..3587a57f90f 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java @@ -64,19 +64,19 @@ public class PrimaryNotRelocatedWhileBeingRecoveredTests { logger.info("Adding two nodes and performing rerouting"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build(); - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("Start the primary shard (on node1)"); RoutingNodes routingNodes = clusterState.routingNodes(); - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5)); logger.info("start another node, replica will start recovering form primary"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build(); - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5)); @@ -84,7 +84,7 @@ public class PrimaryNotRelocatedWhileBeingRecoveredTests { logger.info("start another node, make sure the primary is not relocated"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3"))).build(); - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5)); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java index 5e0e7298eaa..5f496172e25 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java @@ -74,7 +74,7 @@ public class RebalanceAfterActiveTests { logger.info("start two nodes and fully start the shards"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build(); RoutingTable prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); for (int i = 0; i < routingTable.index("test").shards().size(); i++) { @@ -86,7 +86,7 @@ public class RebalanceAfterActiveTests { logger.info("start all the primary shards, replicas will start initializing"); RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes(); @@ -101,7 +101,7 @@ public class RebalanceAfterActiveTests { .put(newNode("node3")).put(newNode("node4")).put(newNode("node5")).put(newNode("node6")).put(newNode("node7")).put(newNode("node8")).put(newNode("node9")).put(newNode("node10"))) .build(); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes(); @@ -114,7 +114,7 @@ public class RebalanceAfterActiveTests { logger.info("start the replica shards, rebalancing should start"); routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes(); @@ -125,7 +125,7 @@ public class RebalanceAfterActiveTests { logger.info("complete relocation, other half of relocation should happen"); routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes(); @@ -136,7 +136,7 @@ public class RebalanceAfterActiveTests { logger.info("complete relocation, thats it!"); routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes(); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java index 06b8fc35373..b111eacf96f 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java @@ -73,7 +73,7 @@ public class ReplicaAllocatedAfterPrimaryTests { clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build(); RoutingTable prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -89,7 +89,7 @@ public class ReplicaAllocatedAfterPrimaryTests { logger.info("Start all the primary shards"); RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java index 66483e4baf4..d5b0f68cffe 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java @@ -78,7 +78,7 @@ public class SingleShardNoReplicasRoutingTests { logger.info("Adding one node and performing rerouting"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build(); RoutingTable prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(routingTable.index("test").shards().size(), equalTo(1)); @@ -90,14 +90,14 @@ public class SingleShardNoReplicasRoutingTests { logger.info("Rerouting again, nothing should change"); prevRoutingTable = routingTable; clusterState = newClusterStateBuilder().state(clusterState).build(); - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); assertThat(routingTable == prevRoutingTable, equalTo(true)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("Marking the shard as started"); RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(routingTable != prevRoutingTable, equalTo(true)); @@ -110,7 +110,7 @@ public class SingleShardNoReplicasRoutingTests { logger.info("Starting another node and making sure nothing changed"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build(); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(routingTable == prevRoutingTable, equalTo(true)); @@ -124,7 +124,7 @@ public class SingleShardNoReplicasRoutingTests { clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).remove("node1")).build(); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(routingTable != prevRoutingTable, equalTo(true)); @@ -137,14 +137,14 @@ public class SingleShardNoReplicasRoutingTests { logger.info("Start another node, make sure that things remain the same (shard is in node2 and initializing)"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3"))).build(); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(routingTable == prevRoutingTable, equalTo(true)); logger.info("Start the shard on node 2"); routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(routingTable != prevRoutingTable, equalTo(true)); @@ -179,7 +179,7 @@ public class SingleShardNoReplicasRoutingTests { logger.info("Adding one node and rerouting"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build(); RoutingTable prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -193,7 +193,7 @@ public class SingleShardNoReplicasRoutingTests { logger.info("Marking the shard as failed"); RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyFailedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); + routingTable = strategy.applyFailedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -240,7 +240,7 @@ public class SingleShardNoReplicasRoutingTests { } RoutingTable prevRoutingTable = routingTable; clusterState = newClusterStateBuilder().state(clusterState).nodes(nodesBuilder).build(); - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -278,14 +278,14 @@ public class SingleShardNoReplicasRoutingTests { } prevRoutingTable = routingTable; clusterState = newClusterStateBuilder().state(clusterState).nodes(nodesBuilder).build(); - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(false)); logger.info("Marking the shard as started"); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -339,7 +339,7 @@ public class SingleShardNoReplicasRoutingTests { .nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3"))) .build(); RoutingTable prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -362,13 +362,13 @@ public class SingleShardNoReplicasRoutingTests { clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); assertThat(prevRoutingTable == routingTable, equalTo(true)); routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -384,7 +384,7 @@ public class SingleShardNoReplicasRoutingTests { logger.info("Now, mark the relocated as started"); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); // routingTable = strategy.reroute(new RoutingStrategyInfo(metaData, routingTable), nodes); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java index 8bb796586e1..485f65368a0 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java @@ -73,7 +73,7 @@ public class SingleShardOneReplicaRoutingTests { clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build(); RoutingTable prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -89,7 +89,7 @@ public class SingleShardOneReplicaRoutingTests { logger.info("Add another node and perform rerouting, nothing will happen since primary shards not started"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build(); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable == routingTable, equalTo(true)); @@ -97,7 +97,7 @@ public class SingleShardOneReplicaRoutingTests { logger.info("Start the primary shard (on node1)"); RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -114,13 +114,13 @@ public class SingleShardOneReplicaRoutingTests { logger.info("Reroute, nothing should change"); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); assertThat(prevRoutingTable == routingTable, equalTo(true)); logger.info("Start the backup shard"); routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -137,7 +137,7 @@ public class SingleShardOneReplicaRoutingTests { clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).remove("node1")).build(); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -155,7 +155,7 @@ public class SingleShardOneReplicaRoutingTests { clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3"))).build(); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java index faa5192c4d9..2f3f653add2 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java @@ -75,7 +75,7 @@ public class TenShardsOneReplicaRoutingTests { clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build(); RoutingTable prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -93,7 +93,7 @@ public class TenShardsOneReplicaRoutingTests { logger.info("Add another node and perform rerouting, nothing will happen since primary not started"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build(); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable == routingTable, equalTo(true)); @@ -101,7 +101,7 @@ public class TenShardsOneReplicaRoutingTests { logger.info("Start the primary shard (on node1)"); RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -119,13 +119,13 @@ public class TenShardsOneReplicaRoutingTests { logger.info("Reroute, nothing should change"); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); assertThat(prevRoutingTable == routingTable, equalTo(true)); logger.info("Start the backup shard"); routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes(); @@ -146,7 +146,7 @@ public class TenShardsOneReplicaRoutingTests { logger.info("Add another node and perform rerouting"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3"))).build(); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes(); @@ -161,7 +161,7 @@ public class TenShardsOneReplicaRoutingTests { logger.info("Start the shards on node 3"); routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes(); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java index 18e582ef49f..7203a37d12e 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java @@ -62,7 +62,7 @@ public class ThrottlingAllocationTests { logger.info("start one node, do reroute, only 3 should initialize"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build(); - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(0)); @@ -70,7 +70,7 @@ public class ThrottlingAllocationTests { assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(17)); logger.info("start initializing, another 3 should initialize"); - routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(3)); @@ -78,7 +78,7 @@ public class ThrottlingAllocationTests { assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(14)); logger.info("start initializing, another 3 should initialize"); - routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(6)); @@ -86,7 +86,7 @@ public class ThrottlingAllocationTests { assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(11)); logger.info("start initializing, another 1 should initialize"); - routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(9)); @@ -94,7 +94,7 @@ public class ThrottlingAllocationTests { assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(10)); logger.info("start initializing, all primaries should be started"); - routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(10)); @@ -119,7 +119,7 @@ public class ThrottlingAllocationTests { logger.info("start one node, do reroute, only 3 should initialize"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build(); - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(0)); @@ -127,7 +127,7 @@ public class ThrottlingAllocationTests { assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(7)); logger.info("start initializing, another 2 should initialize"); - routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(3)); @@ -135,7 +135,7 @@ public class ThrottlingAllocationTests { assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(5)); logger.info("start initializing, all primaries should be started"); - routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5)); @@ -144,7 +144,7 @@ public class ThrottlingAllocationTests { logger.info("start another node, replicas should start being allocated"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build(); - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5)); @@ -152,7 +152,7 @@ public class ThrottlingAllocationTests { assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(2)); logger.info("start initializing replicas"); - routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(8)); @@ -160,7 +160,7 @@ public class ThrottlingAllocationTests { assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0)); logger.info("start initializing replicas, all should be started"); - routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(10)); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java index 34fde7b2bde..a73c2b9c4b9 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java @@ -56,19 +56,19 @@ public class UpdateNumberOfReplicasTests { clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build(); RoutingTable prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("Start all the primary shards"); RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("Start all the replica shards"); routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -103,7 +103,7 @@ public class UpdateNumberOfReplicasTests { logger.info("Add another node and start the added replica"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3"))).build(); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -119,7 +119,7 @@ public class UpdateNumberOfReplicasTests { routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -153,7 +153,7 @@ public class UpdateNumberOfReplicasTests { logger.info("do a reroute, should remain the same"); prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.reroute(clusterState).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(false)); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java index 7a84fe155fb..930f781ab0b 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java @@ -57,7 +57,7 @@ public class ClusterSerializationTests { ClusterState clusterState = newClusterStateBuilder().nodes(nodes).metaData(metaData).routingTable(routingTable).build(); ShardsAllocation strategy = new ShardsAllocation(); - clusterState = newClusterStateBuilder().state(clusterState).routingTable(strategy.reroute(clusterState)).build(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(strategy.reroute(clusterState).routingTable()).build(); ClusterState serializedClusterState = ClusterState.Builder.fromBytes(ClusterState.Builder.toBytes(clusterState), ImmutableSettings.settingsBuilder().build(), newNode("node1")); @@ -79,7 +79,7 @@ public class ClusterSerializationTests { ClusterState clusterState = newClusterStateBuilder().nodes(nodes).metaData(metaData).routingTable(routingTable).build(); ShardsAllocation strategy = new ShardsAllocation(); - RoutingTable source = strategy.reroute(clusterState); + RoutingTable source = strategy.reroute(clusterState).routingTable(); BytesStreamOutput outStream = new BytesStreamOutput(); RoutingTable.Builder.writeTo(source, outStream);