diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml
index 58f9959b83e..f6c2978fbcd 100644
--- a/.idea/dictionaries/kimchy.xml
+++ b/.idea/dictionaries/kimchy.xml
@@ -101,6 +101,7 @@
queryparser
rackspace
rebalance
+ rebalancing
regex
reparse
retrans
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java
index 66f062cd16b..1c039a120fd 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java
@@ -31,9 +31,12 @@ import org.elasticsearch.common.util.concurrent.Immutable;
import java.io.IOException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.elasticsearch.common.collect.Lists.*;
+
/**
* @author kimchy (Shay Banon)
*/
@@ -102,6 +105,14 @@ public class IndexRoutingTable implements Iterable {
return shards.get(shardId);
}
+ public List shardsWithState(ShardRoutingState... states) {
+ List shards = newArrayList();
+ for (IndexShardRoutingTable shardRoutingTable : this) {
+ shards.addAll(shardRoutingTable.shardsWithState(states));
+ }
+ return shards;
+ }
+
/**
* A group shards iterator where each group ({@link ShardsIterator}
* is an iterator across shard replication group.
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java
index 11faca550d3..c5f04d7c011 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java
@@ -117,6 +117,18 @@ public class IndexShardRoutingTable implements Iterable {
return replicaShards;
}
+ public List shardsWithState(ShardRoutingState... states) {
+ List shards = newArrayList();
+ for (ShardRouting shardEntry : this) {
+ for (ShardRoutingState state : states) {
+ if (shardEntry.state() == state) {
+ shards.add(shardEntry);
+ }
+ }
+ }
+ return shards;
+ }
+
int nextCounter() {
return counter.getAndIncrement();
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
index 6267b67ffe3..be291aa6d56 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
@@ -126,7 +126,7 @@ public class RoutingNodes implements Iterable {
return nodesToShards.get(nodeId);
}
- public MutableShardRouting findPrimaryForBackup(MutableShardRouting shard) {
+ public MutableShardRouting findPrimaryForReplica(MutableShardRouting shard) {
assert !shard.primary();
for (RoutingNode routingNode : nodesToShards.values()) {
for (MutableShardRouting shardRouting : routingNode) {
@@ -138,6 +138,27 @@ public class RoutingNodes implements Iterable {
return null;
}
+ public List shardsRoutingFor(ShardRouting shardRouting) {
+ return shardsRoutingFor(shardRouting.index(), shardRouting.id());
+ }
+
+ public List shardsRoutingFor(String index, int shardId) {
+ List shards = newArrayList();
+ for (RoutingNode routingNode : this) {
+ for (MutableShardRouting shardRouting : routingNode) {
+ if (shardRouting.index().equals(index) && shardRouting.id() == shardId) {
+ shards.add(shardRouting);
+ }
+ }
+ }
+ for (MutableShardRouting shardRouting : unassigned) {
+ if (shardRouting.index().equals(index) && shardRouting.id() == shardId) {
+ shards.add(shardRouting);
+ }
+ }
+ return shards;
+ }
+
public int numberOfShardsOfType(ShardRoutingState state) {
int count = 0;
for (RoutingNode routingNode : this) {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
index e4a326ad567..d46f749e236 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
@@ -33,6 +33,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
+import static org.elasticsearch.common.collect.Lists.*;
import static org.elasticsearch.common.collect.Maps.*;
/**
@@ -90,6 +91,14 @@ public class RoutingTable implements Iterable {
return validation;
}
+ public List shardsWithState(ShardRoutingState... states) {
+ List shards = newArrayList();
+ for (IndexRoutingTable indexRoutingTable : this) {
+ shards.addAll(indexRoutingTable.shardsWithState(states));
+ }
+ return shards;
+ }
+
/**
* All the shards (replicas) for the provided indices.
*
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/PreferUnallocatedShardUnassignedStrategy.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/PreferUnallocatedShardUnassignedStrategy.java
index fb53ebe2244..4d43fa99cf9 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/PreferUnallocatedShardUnassignedStrategy.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/PreferUnallocatedShardUnassignedStrategy.java
@@ -107,7 +107,7 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent
if (!shard.primary()) {
// if its a backup, only allocate it if the primary is active
- MutableShardRouting primary = routingNodes.findPrimaryForBackup(shard);
+ MutableShardRouting primary = routingNodes.findPrimaryForReplica(shard);
if (primary == null || !primary.active()) {
continue;
}
@@ -209,7 +209,7 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent
// if its backup, see if there is a primary that *is* allocated, and try and assign a location that is closest to it
// note, since we replicate operations, this might not be the same (different flush intervals)
if (!shard.primary()) {
- MutableShardRouting primaryShard = routingNodes.findPrimaryForBackup(shard);
+ MutableShardRouting primaryShard = routingNodes.findPrimaryForReplica(shard);
if (primaryShard != null && primaryShard.active()) {
TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData primaryNodeStoreFileMetaData = nodesStoreFilesMetaData.nodesMap().get(primaryShard.currentNodeId());
if (primaryNodeStoreFileMetaData != null && primaryNodeStoreFileMetaData.storeFilesMetaData() != null && primaryNodeStoreFileMetaData.storeFilesMetaData().allocated()) {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/ShardsRoutingStrategy.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/ShardsRoutingStrategy.java
index 7aec8c3dffa..8e870818d0a 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/ShardsRoutingStrategy.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/ShardsRoutingStrategy.java
@@ -158,6 +158,19 @@ public class ShardsRoutingStrategy extends AbstractComponent {
boolean relocated = false;
List activeShards = highRoutingNode.shardsWithState(STARTED);
for (MutableShardRouting activeShard : activeShards) {
+ // we only relocate shards that all other shards within the replication group are active
+ List allShards = routingNodes.shardsRoutingFor(activeShard);
+ boolean ignoreShard = false;
+ for (MutableShardRouting allShard : allShards) {
+ if (!allShard.active()) {
+ ignoreShard = true;
+ break;
+ }
+ }
+ if (ignoreShard) {
+ continue;
+ }
+
if (lowRoutingNode.canAllocate(routingNodes.metaData(), routingNodes.routingTable()) && lowRoutingNode.canAllocate(activeShard)) {
changed = true;
lowRoutingNode.add(new MutableShardRouting(activeShard.index(), activeShard.id(),
@@ -217,13 +230,15 @@ public class ShardsRoutingStrategy extends AbstractComponent {
int lastNode = 0;
while (unassignedIterator.hasNext()) {
MutableShardRouting shard = unassignedIterator.next();
+ // if its a replica, only allocate it if the primary is active
if (!shard.primary()) {
- // if its a backup, only allocate it if the primary is active
- MutableShardRouting primary = routingNodes.findPrimaryForBackup(shard);
+ MutableShardRouting primary = routingNodes.findPrimaryForReplica(shard);
if (primary == null || !primary.active()) {
continue;
}
}
+
+ // do the allocation, finding the least "busy" node
for (int i = 0; i < nodes.size(); i++) {
RoutingNode node = nodes.get(lastNode);
lastNode++;
@@ -247,9 +262,9 @@ public class ShardsRoutingStrategy extends AbstractComponent {
// allocate all the unassigned shards above the average per node.
for (Iterator it = routingNodes.unassigned().iterator(); it.hasNext();) {
MutableShardRouting shard = it.next();
+ // if its a backup, only allocate it if the primary is active
if (!shard.primary()) {
- // if its a backup, only allocate it if the primary is active
- MutableShardRouting primary = routingNodes.findPrimaryForBackup(shard);
+ MutableShardRouting primary = routingNodes.findPrimaryForReplica(shard);
if (primary == null || !primary.active()) {
continue;
}
diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/RebalanceAfterActiveTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/RebalanceAfterActiveTests.java
new file mode 100644
index 00000000000..777fa194276
--- /dev/null
+++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/RebalanceAfterActiveTests.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.routing.strategy;
+
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.RoutingNode;
+import org.elasticsearch.cluster.routing.RoutingNodes;
+import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.transport.DummyTransportAddress;
+import org.testng.annotations.Test;
+
+import static org.elasticsearch.cluster.ClusterState.*;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
+import static org.elasticsearch.cluster.metadata.MetaData.*;
+import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
+import static org.elasticsearch.cluster.routing.RoutingBuilders.*;
+import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
+import static org.hamcrest.MatcherAssert.*;
+import static org.hamcrest.Matchers.*;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class RebalanceAfterActiveTests {
+
+ private final ESLogger logger = Loggers.getLogger(RebalanceAfterActiveTests.class);
+
+ @Test public void testRebalanceOnlyAfterAllShardsAreActive() {
+
+ ShardsRoutingStrategy strategy = new ShardsRoutingStrategy();
+
+ logger.info("Building initial routing table");
+
+ MetaData metaData = newMetaDataBuilder()
+ .put(newIndexMetaDataBuilder("test").numberOfShards(5).numberOfReplicas(1))
+ .build();
+
+ RoutingTable routingTable = routingTable()
+ .add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
+ .build();
+
+ ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
+
+ assertThat(routingTable.index("test").shards().size(), equalTo(5));
+ for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
+ assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
+ assertThat(routingTable.index("test").shard(i).shards().get(0).state(), equalTo(UNASSIGNED));
+ assertThat(routingTable.index("test").shard(i).shards().get(1).state(), equalTo(UNASSIGNED));
+ assertThat(routingTable.index("test").shard(i).shards().get(0).currentNodeId(), nullValue());
+ assertThat(routingTable.index("test").shard(i).shards().get(1).currentNodeId(), nullValue());
+ }
+
+ logger.info("start two nodes and fully start the shards");
+ clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build();
+ RoutingTable prevRoutingTable = routingTable;
+ routingTable = strategy.reroute(clusterState);
+ clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
+
+ for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
+ assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
+ assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
+ assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
+ }
+
+ logger.info("start all the primary shards, replicas will start initializing");
+ RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
+ prevRoutingTable = routingTable;
+ routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
+ clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
+ routingNodes = routingTable.routingNodes(metaData);
+
+ for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
+ assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
+ assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
+ assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
+ }
+
+ logger.info("now, start 8 more nodes, and check that no rebalancing/relocation have happened");
+ clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes())
+ .put(newNode("node3")).put(newNode("node4")).put(newNode("node5")).put(newNode("node6")).put(newNode("node7")).put(newNode("node8")).put(newNode("node9")).put(newNode("node10")))
+ .build();
+ prevRoutingTable = routingTable;
+ routingTable = strategy.reroute(clusterState);
+ clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
+ routingNodes = routingTable.routingNodes(metaData);
+
+ for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
+ assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
+ assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
+ assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
+ }
+
+ logger.info("start the replica shards, rebalancing should start");
+ routingNodes = routingTable.routingNodes(clusterState.metaData());
+ prevRoutingTable = routingTable;
+ routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
+ clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
+ routingNodes = routingTable.routingNodes(metaData);
+
+ // we only allow one relocation at a time
+ assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5));
+ assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(5));
+
+ logger.info("complete relocation, other half of relocation should happen");
+ routingNodes = routingTable.routingNodes(clusterState.metaData());
+ prevRoutingTable = routingTable;
+ routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
+ clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
+ routingNodes = routingTable.routingNodes(metaData);
+
+ // we now only relocate 3, since 2 remain where they are!
+ assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(7));
+ assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(3));
+
+ logger.info("complete relocation, thats it!");
+ routingNodes = routingTable.routingNodes(clusterState.metaData());
+ prevRoutingTable = routingTable;
+ routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
+ clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
+ routingNodes = routingTable.routingNodes(metaData);
+
+ assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(10));
+ // make sure we have an even relocation
+ for (RoutingNode routingNode : routingNodes) {
+ assertThat(routingNode.shards().size(), equalTo(1));
+ }
+ }
+
+ private DiscoveryNode newNode(String nodeId) {
+ return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE);
+ }
+}