From 3028d5a7a16cd9ccb28f1a364ac2c1d762a6e6de Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Wed, 21 Sep 2011 18:26:16 +0300 Subject: [PATCH] Shard allocation awareness (rack aware, zone aware, for example), closes #1352. --- .../cluster/routing/RoutingNodes.java | 18 + .../routing/allocation/AllocationService.java | 41 +- .../decider/AllocationDeciders.java | 1 + .../decider/AllocationDecidersModule.java | 1 + .../decider/AwarenessAllocationDecider.java | 142 ++++ .../common/settings/ImmutableSettings.java | 6 +- .../allocation/AwarenessAllocationTests.java | 727 ++++++++++++++++++ .../allocation/FilterRoutingTests.java | 2 +- 8 files changed, 923 insertions(+), 15 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java create mode 100644 modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/AwarenessAllocationTests.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index a7680da5bc0..6ee83ab92d3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -23,11 +23,13 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.trove.map.hash.TObjectIntHashMap; import org.elasticsearch.common.util.concurrent.NotThreadSafe; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -53,6 +55,8 @@ public class RoutingNodes implements Iterable { private final List ignoredUnassigned = newArrayList(); + private final Map> nodesPerAttributeNames = new HashMap>(); + public RoutingNodes(ClusterState clusterState) { this.metaData = clusterState.metaData(); this.blocks = clusterState.blocks(); @@ -158,6 +162,20 @@ public class RoutingNodes implements Iterable { return nodesToShards.get(nodeId); } + public TObjectIntHashMap nodesPerAttributesCounts(String attributeName) { + TObjectIntHashMap nodesPerAttributesCounts = nodesPerAttributeNames.get(attributeName); + if (nodesPerAttributesCounts != null) { + return nodesPerAttributesCounts; + } + nodesPerAttributesCounts = new TObjectIntHashMap(); + for (RoutingNode routingNode : this) { + String attrValue = routingNode.node().attributes().get(attributeName); + nodesPerAttributesCounts.adjustOrPutValue(attrValue, 1, 1); + } + nodesPerAttributeNames.put(attributeName, nodesPerAttributesCounts); + return nodesPerAttributesCounts; + } + public MutableShardRouting findPrimaryForReplica(ShardRouting shard) { assert !shard.primary(); for (RoutingNode routingNode : nodesToShards.values()) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index dfe7c6f496f..5b9c21d35fd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.node.settings.NodeSettingsService; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -175,20 +176,36 @@ public class AllocationService extends AbstractComponent { private boolean moveShards(RoutingAllocation allocation) { boolean changed = false; - for (RoutingNode routingNode : allocation.routingNodes()) { - for (MutableShardRouting shardRouting : routingNode) { - // we can only move started shards... - if (!shardRouting.started()) { + + // create a copy of the shards interleaving between nodes, and check if they can remain + List shards = new ArrayList(); + int index = 0; + boolean found = true; + while (found) { + found = false; + for (RoutingNode routingNode : allocation.routingNodes()) { + if (index >= routingNode.shards().size()) { continue; } - if (!allocation.deciders().canRemain(shardRouting, routingNode, allocation)) { - logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node()); - boolean moved = shardsAllocators.move(shardRouting, routingNode, allocation); - if (!moved) { - logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); - } else { - changed = true; - } + found = true; + shards.add(routingNode.shards().get(index)); + } + index++; + } + for (int i = 0; i < shards.size(); i++) { + MutableShardRouting shardRouting = shards.get(i); + // we can only move started shards... + if (!shardRouting.started()) { + continue; + } + RoutingNode routingNode = allocation.routingNodes().node(shardRouting.currentNodeId()); + if (!allocation.deciders().canRemain(shardRouting, routingNode, allocation)) { + logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node()); + boolean moved = shardsAllocators.move(shardRouting, routingNode, allocation); + if (!moved) { + logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); + } else { + changed = true; } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java index f157da76735..704e9226fb1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java @@ -45,6 +45,7 @@ public class AllocationDeciders extends AllocationDecider { .add(new RebalanceOnlyWhenActiveAllocationDecider(settings)) .add(new ClusterRebalanceAllocationDecider(settings)) .add(new ConcurrentRebalanceAllocationDecider(settings)) + .add(new AwarenessAllocationDecider(settings)) .build() ); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecidersModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecidersModule.java index be318f9eca4..675fbe8bf1f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecidersModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecidersModule.java @@ -47,6 +47,7 @@ public class AllocationDecidersModule extends AbstractModule { allocationMultibinder.addBinding().to(RebalanceOnlyWhenActiveAllocationDecider.class); allocationMultibinder.addBinding().to(ClusterRebalanceAllocationDecider.class); allocationMultibinder.addBinding().to(ConcurrentRebalanceAllocationDecider.class); + allocationMultibinder.addBinding().to(AwarenessAllocationDecider.class); for (Class allocation : allocations) { allocationMultibinder.addBinding().to(allocation); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java new file mode 100644 index 00000000000..75c76808035 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java @@ -0,0 +1,142 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation.decider; + +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.trove.map.hash.TObjectIntHashMap; + +import java.util.Map; + +/** + */ +public class AwarenessAllocationDecider extends AllocationDecider { + + private String[] awarenessAttributes; + + private Map forcedAwarenessAttributes; + + @Inject public AwarenessAllocationDecider(Settings settings) { + super(settings); + this.awarenessAttributes = settings.getAsArray("cluster.routing.allocation.awareness.attributes"); + + forcedAwarenessAttributes = Maps.newHashMap(); + Map forceGroups = settings.getGroups("cluster.routing.allocation.awareness.force."); + for (Map.Entry entry : forceGroups.entrySet()) { + forcedAwarenessAttributes.put(entry.getKey(), entry.getValue().getAsArray("values")); + } + } + + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return underCapacity(shardRouting, node, allocation, true) ? Decision.YES : Decision.NO; + } + + @Override public boolean canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return underCapacity(shardRouting, node, allocation, false); + } + + private boolean underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) { + if (awarenessAttributes.length == 0) { + return true; + } + + IndexMetaData indexMetaData = allocation.metaData().index(shardRouting.index()); + int shardCount = indexMetaData.numberOfReplicas() + 1; // 1 for primary + for (String awarenessAttribute : awarenessAttributes) { + // the node the shard exists on must be associated with an awareness attribute + if (!node.node().attributes().containsKey(awarenessAttribute)) { + return false; + } + + // build attr_value -> nodes map + TObjectIntHashMap nodesPerAttribute = allocation.routingNodes().nodesPerAttributesCounts(awarenessAttribute); + + // build the count of shards per attribute value + TObjectIntHashMap shardPerAttribute = new TObjectIntHashMap(); + for (RoutingNode routingNode : allocation.routingNodes()) { + for (int i = 0; i < routingNode.shards().size(); i++) { + MutableShardRouting nodeShardRouting = routingNode.shards().get(i); + if (nodeShardRouting.shardId().equals(shardRouting.shardId())) { + // if the shard is relocating, then make sure we count it as part of the node it is relocating to + if (nodeShardRouting.relocating()) { + RoutingNode relocationNode = allocation.routingNodes().node(nodeShardRouting.relocatingNodeId()); + shardPerAttribute.adjustOrPutValue(relocationNode.node().attributes().get(awarenessAttribute), 1, 1); + } else if (nodeShardRouting.started()) { + shardPerAttribute.adjustOrPutValue(routingNode.node().attributes().get(awarenessAttribute), 1, 1); + } + } + } + } + if (moveToNode) { + if (shardRouting.assignedToNode()) { + String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId(); + if (!node.nodeId().equals(nodeId)) { + // we work on different nodes, move counts around + shardPerAttribute.adjustOrPutValue(allocation.routingNodes().node(nodeId).node().attributes().get(awarenessAttribute), -1, 0); + shardPerAttribute.adjustOrPutValue(node.node().attributes().get(awarenessAttribute), 1, 1); + } + } else { + shardPerAttribute.adjustOrPutValue(node.node().attributes().get(awarenessAttribute), 1, 1); + } + } + + int numberOfAttributes = nodesPerAttribute.size(); + String[] fullValues = forcedAwarenessAttributes.get(awarenessAttribute); + if (fullValues != null) { + for (String fullValue : fullValues) { + if (!shardPerAttribute.contains(fullValue)) { + numberOfAttributes++; + } + } + } + // TODO should we remove ones that are not part of full list? + + int averagePerAttribute = shardCount / numberOfAttributes; + int totalLeftover = shardCount % numberOfAttributes; + int requiredCountPerAttribute; + if (averagePerAttribute == 0) { + // if we have more attributes values than shard count, no leftover + totalLeftover = 0; + requiredCountPerAttribute = 1; + } else { + requiredCountPerAttribute = averagePerAttribute; + } + int leftoverPerAttribute = totalLeftover == 0 ? 0 : 1; + + int currentNodeCount = shardPerAttribute.get(node.node().attributes().get(awarenessAttribute)); + // if we are above with leftover, then we know we are not good, even with mod + if (currentNodeCount > (requiredCountPerAttribute + leftoverPerAttribute)) { + return false; + } + // all is well, we are below or same as average + if (currentNodeCount <= requiredCountPerAttribute) { + continue; + } + } + + return true; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/settings/ImmutableSettings.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/settings/ImmutableSettings.java index f2c0073775f..d259c5dad61 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/settings/ImmutableSettings.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/settings/ImmutableSettings.java @@ -237,7 +237,9 @@ public class ImmutableSettings implements Settings { if (get(settingPrefix) != null) { String[] strings = Strings.splitStringByCommaToArray(get(settingPrefix)); if (strings.length > 0) { - Collections.addAll(result, strings); + for (String string : strings) { + result.add(string.trim()); + } } } @@ -247,7 +249,7 @@ public class ImmutableSettings implements Settings { if (value == null) { break; } - result.add(value); + result.add(value.trim()); } if (result.isEmpty()) { return defaultArray; diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/AwarenessAllocationTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/AwarenessAllocationTests.java new file mode 100644 index 00000000000..56304c42301 --- /dev/null +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/AwarenessAllocationTests.java @@ -0,0 +1,727 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.testng.annotations.Test; + +import static org.elasticsearch.cluster.ClusterState.*; +import static org.elasticsearch.cluster.metadata.IndexMetaData.*; +import static org.elasticsearch.cluster.metadata.MetaData.*; +import static org.elasticsearch.cluster.node.DiscoveryNodes.*; +import static org.elasticsearch.cluster.routing.RoutingBuilders.*; +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.elasticsearch.cluster.routing.allocation.RoutingAllocationTests.*; +import static org.elasticsearch.common.settings.ImmutableSettings.*; +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +/** + */ +@Test +public class AwarenessAllocationTests { + + private final ESLogger logger = Loggers.getLogger(AwarenessAllocationTests.class); + + @Test public void moveShardOnceNewNodeWithAttributeAdded1() { + AllocationService strategy = new AllocationService(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.allow_rebalance", "always") + .put("cluster.routing.allocation.awareness.attributes", "rack_id") + .build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test").initializeEmpty(metaData.index("test"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + logger.info("--> adding two nodes on same rack and do rerouting"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder() + .put(newNode("node1", ImmutableMap.of("rack_id", "1"))) + .put(newNode("node2", ImmutableMap.of("rack_id", "1"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + logger.info("--> start the shards (replicas)"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + + logger.info("--> add a new node with a new rack and reroute"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node3", ImmutableMap.of("rack_id", "2"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.RELOCATING).get(0).relocatingNodeId(), equalTo("node3")); + + logger.info("--> complete relocation"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState).routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new rack, make sure nothing moves"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node4", ImmutableMap.of("rack_id", "3"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + assertThat(routingTable, sameInstance(clusterState.routingTable())); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(2)); + } + + @Test public void moveShardOnceNewNodeWithAttributeAdded2() { + AllocationService strategy = new AllocationService(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.allow_rebalance", "always") + .put("cluster.routing.allocation.awareness.attributes", "rack_id") + .build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test").initializeEmpty(metaData.index("test"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + logger.info("--> adding two nodes on same rack and do rerouting"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder() + .put(newNode("node1", ImmutableMap.of("rack_id", "1"))) + .put(newNode("node2", ImmutableMap.of("rack_id", "1"))) + .put(newNode("node3", ImmutableMap.of("rack_id", "1"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + logger.info("--> start the shards (replicas)"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + + logger.info("--> add a new node with a new rack and reroute"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node4", ImmutableMap.of("rack_id", "2"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.RELOCATING).get(0).relocatingNodeId(), equalTo("node4")); + + logger.info("--> complete relocation"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState).routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new rack, make sure nothing moves"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node5", ImmutableMap.of("rack_id", "3"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + assertThat(routingTable, sameInstance(clusterState.routingTable())); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(2)); + } + + @Test public void moveShardOnceNewNodeWithAttributeAdded3() { + AllocationService strategy = new AllocationService(settingsBuilder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) + .put("cluster.routing.allocation.allow_rebalance", "always") + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) + .put("cluster.routing.allocation.awareness.attributes", "rack_id") + .build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test").numberOfShards(5).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test").initializeEmpty(metaData.index("test"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + logger.info("--> adding two nodes on same rack and do rerouting"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder() + .put(newNode("node1", ImmutableMap.of("rack_id", "1"))) + .put(newNode("node2", ImmutableMap.of("rack_id", "1"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); + + logger.info("--> start the shards (primaries)"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + logger.info("--> start the shards (replicas)"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(10)); + + logger.info("--> add a new node with a new rack and reroute"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node3", ImmutableMap.of("rack_id", "2"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(5)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(5)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(5)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.RELOCATING).get(0).relocatingNodeId(), equalTo("node3")); + + logger.info("--> complete initializing"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + logger.info("--> run it again, since we still might have relocation"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(10)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState).routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new rack, some more relocation should happen"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node4", ImmutableMap.of("rack_id", "3"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(RELOCATING).size(), greaterThan(0)); + + logger.info("--> complete relocation"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(10)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState).routingTable(), sameInstance(clusterState.routingTable())); + } + + @Test public void moveShardOnceNewNodeWithAttributeAdded4() { + AllocationService strategy = new AllocationService(settingsBuilder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) + .put("cluster.routing.allocation.allow_rebalance", "always") + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) + .put("cluster.routing.allocation.awareness.attributes", "rack_id") + .build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test1").numberOfShards(5).numberOfReplicas(1)) + .put(newIndexMetaDataBuilder("test2").numberOfShards(5).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test1").initializeEmpty(metaData.index("test1"))) + .add(indexRoutingTable("test2").initializeEmpty(metaData.index("test2"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + logger.info("--> adding two nodes on same rack and do rerouting"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder() + .put(newNode("node1", ImmutableMap.of("rack_id", "1"))) + .put(newNode("node2", ImmutableMap.of("rack_id", "1"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(10)); + + logger.info("--> start the shards (primaries)"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + logger.info("--> start the shards (replicas)"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(20)); + + logger.info("--> add a new node with a new rack and reroute"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node3", ImmutableMap.of("rack_id", "2"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(10)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(10)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(10)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.RELOCATING).get(0).relocatingNodeId(), equalTo("node3")); + + logger.info("--> complete initializing"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + logger.info("--> run it again, since we still might have relocation"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(20)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState).routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new rack, some more relocation should happen"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node4", ImmutableMap.of("rack_id", "3"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(RELOCATING).size(), greaterThan(0)); + + logger.info("--> complete relocation"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(20)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState).routingTable(), sameInstance(clusterState.routingTable())); + } + + @Test public void moveShardOnceNewNodeWithAttributeAdded5() { + AllocationService strategy = new AllocationService(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.allow_rebalance", "always") + .put("cluster.routing.allocation.awareness.attributes", "rack_id") + .build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(2)) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test").initializeEmpty(metaData.index("test"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + logger.info("--> adding two nodes on same rack and do rerouting"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder() + .put(newNode("node1", ImmutableMap.of("rack_id", "1"))) + .put(newNode("node2", ImmutableMap.of("rack_id", "1"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + logger.info("--> start the shards (replicas)"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + + logger.info("--> add a new node with a new rack and reroute"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node3", ImmutableMap.of("rack_id", "2"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo("node3")); + + logger.info("--> complete relocation"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(3)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState).routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new rack, we will have another relocation"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node4", ImmutableMap.of("rack_id", "3"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(2)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.RELOCATING).get(0).relocatingNodeId(), equalTo("node4")); + + logger.info("--> complete relocation"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(3)); + + logger.info("--> make sure another reroute does not move things"); + assertThat(strategy.reroute(clusterState).routingTable(), sameInstance(clusterState.routingTable())); + } + + @Test public void moveShardOnceNewNodeWithAttributeAdded6() { + AllocationService strategy = new AllocationService(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.allow_rebalance", "always") + .put("cluster.routing.allocation.awareness.attributes", "rack_id") + .build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(3)) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test").initializeEmpty(metaData.index("test"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + logger.info("--> adding two nodes on same rack and do rerouting"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder() + .put(newNode("node1", ImmutableMap.of("rack_id", "1"))) + .put(newNode("node2", ImmutableMap.of("rack_id", "1"))) + .put(newNode("node3", ImmutableMap.of("rack_id", "1"))) + .put(newNode("node4", ImmutableMap.of("rack_id", "1"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + logger.info("--> start the shards (replicas)"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(4)); + + logger.info("--> add a new node with a new rack and reroute"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node5", ImmutableMap.of("rack_id", "2"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(3)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.RELOCATING).get(0).relocatingNodeId(), equalTo("node5")); + + logger.info("--> complete relocation"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(4)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState).routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new rack, we will have another relocation"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node6", ImmutableMap.of("rack_id", "3"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(3)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.RELOCATING).get(0).relocatingNodeId(), equalTo("node6")); + + logger.info("--> complete relocation"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(4)); + + logger.info("--> make sure another reroute does not move things"); + assertThat(strategy.reroute(clusterState).routingTable(), sameInstance(clusterState.routingTable())); + } + + @Test public void fullAwareness1() { + AllocationService strategy = new AllocationService(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.allow_rebalance", "always") + .put("cluster.routing.allocation.awareness.force.rack_id.values", "1,2") + .put("cluster.routing.allocation.awareness.attributes", "rack_id") + .build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test").initializeEmpty(metaData.index("test"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + logger.info("--> adding two nodes on same rack and do rerouting"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder() + .put(newNode("node1", ImmutableMap.of("rack_id", "1"))) + .put(newNode("node2", ImmutableMap.of("rack_id", "1"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + logger.info("--> replica will not start because we have only one rack value"); + assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); + + logger.info("--> add a new node with a new rack and reroute"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node3", ImmutableMap.of("rack_id", "2"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo("node3")); + + logger.info("--> complete relocation"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState).routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new rack, make sure nothing moves"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node4", ImmutableMap.of("rack_id", "3"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + assertThat(routingTable, sameInstance(clusterState.routingTable())); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(2)); + } + + @Test public void fullAwareness2() { + AllocationService strategy = new AllocationService(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.allow_rebalance", "always") + .put("cluster.routing.allocation.awareness.force.rack_id.values", "1,2") + .put("cluster.routing.allocation.awareness.attributes", "rack_id") + .build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test").initializeEmpty(metaData.index("test"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + logger.info("--> adding two nodes on same rack and do rerouting"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder() + .put(newNode("node1", ImmutableMap.of("rack_id", "1"))) + .put(newNode("node2", ImmutableMap.of("rack_id", "1"))) + .put(newNode("node3", ImmutableMap.of("rack_id", "1"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + logger.info("--> replica will not start because we have only one rack value"); + assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); + + logger.info("--> add a new node with a new rack and reroute"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node4", ImmutableMap.of("rack_id", "2"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo("node4")); + + logger.info("--> complete relocation"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState).routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new rack, make sure nothing moves"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node5", ImmutableMap.of("rack_id", "3"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + assertThat(routingTable, sameInstance(clusterState.routingTable())); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(2)); + } + + @Test public void fullAwareness3() { + AllocationService strategy = new AllocationService(settingsBuilder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) + .put("cluster.routing.allocation.allow_rebalance", "always") + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) + .put("cluster.routing.allocation.awareness.force.rack_id.values", "1,2") + .put("cluster.routing.allocation.awareness.attributes", "rack_id") + .build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test1").numberOfShards(5).numberOfReplicas(1)) + .put(newIndexMetaDataBuilder("test2").numberOfShards(5).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test1").initializeEmpty(metaData.index("test1"))) + .add(indexRoutingTable("test2").initializeEmpty(metaData.index("test2"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + logger.info("--> adding two nodes on same rack and do rerouting"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder() + .put(newNode("node1", ImmutableMap.of("rack_id", "1"))) + .put(newNode("node2", ImmutableMap.of("rack_id", "1"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(10)); + + logger.info("--> start the shards (primaries)"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(10)); + + logger.info("--> add a new node with a new rack and reroute"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node3", ImmutableMap.of("rack_id", "2"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(10)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(10)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo("node3")); + + logger.info("--> complete initializing"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + logger.info("--> run it again, since we still might have relocation"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(20)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState).routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new rack, some more relocation should happen"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node4", ImmutableMap.of("rack_id", "3"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(RELOCATING).size(), greaterThan(0)); + + logger.info("--> complete relocation"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(20)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState).routingTable(), sameInstance(clusterState.routingTable())); + } +} diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterRoutingTests.java index c83a446fe56..756164bf6c5 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterRoutingTests.java @@ -68,7 +68,7 @@ public class FilterRoutingTests { ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); - logger.info("--> adding two nodes and performing rerouting"); + logger.info("--> adding four nodes and performing rerouting"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder() .put(newNode("node1", ImmutableMap.of("tag1", "value1"))) .put(newNode("node2", ImmutableMap.of("tag1", "value2")))