diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index fbadeeef0c2..b5a4e66525f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -333,6 +333,11 @@ public abstract class TransportShardReplicationOperationAction { return count; } - public List shardsOfType(ShardRoutingState state) { + public List shardsWithState(ShardRoutingState... state) { List shards = newArrayList(); for (RoutingNode routingNode : this) { shards.addAll(routingNode.shardsWithState(state)); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/ShardsRoutingStrategy.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/ShardsRoutingStrategy.java index fb1d64e1091..292b267e4c7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/ShardsRoutingStrategy.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/ShardsRoutingStrategy.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing.strategy; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.*; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -61,6 +62,7 @@ public class ShardsRoutingStrategy extends AbstractComponent { if (!applyStartedShards(routingNodes, startedShardEntries)) { return clusterState.routingTable(); } + reroute(routingNodes, clusterState.nodes()); return new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()); } @@ -74,6 +76,8 @@ public class ShardsRoutingStrategy extends AbstractComponent { if (!applyFailedShards(routingNodes, failedShardEntries)) { return clusterState.routingTable(); } + // If we reroute again, the failed shard will try and be assigned to the same node, which we do no do in the applyFailedShards +// reroute(routingNodes, clusterState.nodes()); return new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()); } @@ -84,8 +88,14 @@ public class ShardsRoutingStrategy extends AbstractComponent { */ public RoutingTable reroute(ClusterState clusterState) { RoutingNodes routingNodes = clusterState.routingNodes(); + if (!reroute(routingNodes, clusterState.nodes())) { + return clusterState.routingTable(); + } + return new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()); + } - Iterable dataNodes = clusterState.nodes().dataNodes().values(); + private boolean reroute(RoutingNodes routingNodes, DiscoveryNodes nodes) { + Iterable dataNodes = nodes.dataNodes().values(); boolean changed = false; // first, clear from the shards any node id they used to belong to that is now dead @@ -111,11 +121,7 @@ public class ShardsRoutingStrategy extends AbstractComponent { // rebalance changed |= rebalance(routingNodes); - if (!changed) { - return clusterState.routingTable(); - } - - return new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()); + return changed; } private boolean rebalance(RoutingNodes routingNodes) { @@ -207,6 +213,13 @@ public class ShardsRoutingStrategy extends AbstractComponent { int lastNode = 0; while (unassignedIterator.hasNext()) { MutableShardRouting shard = unassignedIterator.next(); + if (!shard.primary()) { + // if its a backup, only allocate it if the primary is active + MutableShardRouting primary = routingNodes.findPrimaryForBackup(shard); + if (primary == null || !primary.active()) { + continue; + } + } for (int i = 0; i < nodes.size(); i++) { RoutingNode node = nodes.get(lastNode); lastNode++; @@ -229,12 +242,19 @@ public class ShardsRoutingStrategy extends AbstractComponent { // allocate all the unassigned shards above the average per node. for (Iterator it = routingNodes.unassigned().iterator(); it.hasNext();) { - MutableShardRouting shardRoutingEntry = it.next(); + MutableShardRouting shard = it.next(); + if (!shard.primary()) { + // if its a backup, only allocate it if the primary is active + MutableShardRouting primary = routingNodes.findPrimaryForBackup(shard); + if (primary == null || !primary.active()) { + continue; + } + } // go over the nodes and try and allocate the remaining ones for (RoutingNode routingNode : routingNodes.nodesToShards().values()) { - if (routingNode.canAllocate(routingNodes.metaData(), routingNodes.routingTable()) && routingNode.canAllocate(shardRoutingEntry)) { + if (routingNode.canAllocate(routingNodes.metaData(), routingNodes.routingTable()) && routingNode.canAllocate(shard)) { changed = true; - routingNode.add(shardRoutingEntry); + routingNode.add(shard); it.remove(); break; } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/BackupAllocatedAfterPrimaryTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/BackupAllocatedAfterPrimaryTests.java new file mode 100644 index 00000000000..d17c8b324dc --- /dev/null +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/BackupAllocatedAfterPrimaryTests.java @@ -0,0 +1,110 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.strategy; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.testng.annotations.Test; + +import static org.elasticsearch.cluster.ClusterState.*; +import static org.elasticsearch.cluster.metadata.IndexMetaData.*; +import static org.elasticsearch.cluster.metadata.MetaData.*; +import static org.elasticsearch.cluster.node.DiscoveryNodes.*; +import static org.elasticsearch.cluster.routing.RoutingBuilders.*; +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +/** + * @author kimchy (shay.banon) + */ +public class BackupAllocatedAfterPrimaryTests { + + private final ESLogger logger = Loggers.getLogger(BackupAllocatedAfterPrimaryTests.class); + + @Test public void testBackupIsAllocatedAfterPrimary() { + + ShardsRoutingStrategy strategy = new ShardsRoutingStrategy(); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test").initializeEmpty(metaData.index("test"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + assertThat(routingTable.index("test").shards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(0).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(0).shards().get(0).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(0).shards().get(1).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(0).shards().get(0).currentNodeId(), nullValue()); + assertThat(routingTable.index("test").shard(0).shards().get(1).currentNodeId(), nullValue()); + + logger.info("Adding one node and performing rerouting"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build(); + + RoutingTable prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(0).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(INITIALIZING)); + assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1")); + assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), nullValue()); + + logger.info("Start all the primary shards"); + RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData()); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(0).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1")); + assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(INITIALIZING)); + assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node2")); + + } + + private DiscoveryNode newNode(String nodeId) { + return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE); + } +} diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/FailedShardsRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/FailedShardsRoutingTests.java index 443e03eb1d0..9b844a93510 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/FailedShardsRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/FailedShardsRoutingTests.java @@ -70,16 +70,10 @@ public class FailedShardsRoutingTests { routingTable = strategy.reroute(clusterState); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - logger.info("Start the primary shard (on node1)"); + logger.info("Start the shards (primaries)"); RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData()); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); - clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - - logger.info("Start the backup shard (on node2)"); - routingNodes = routingTable.routingNodes(clusterState.metaData()); - prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -88,11 +82,28 @@ public class FailedShardsRoutingTests { assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); - assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1")); + assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), anyOf(equalTo("node1"), equalTo("node2"))); + assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(INITIALIZING)); + assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node1"))); + } + + logger.info("Start the shards (backups)"); + routingNodes = routingTable.routingNodes(clusterState.metaData()); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), anyOf(equalTo("node1"), equalTo("node2"))); assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1)); - // backup shards are initializing as well, we make sure that they recover from primary *started* shards in the IndicesClusterStateService assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(STARTED)); - assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), equalTo("node2")); + assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node1"))); } logger.info("Adding third node and reroute"); @@ -201,7 +212,8 @@ public class FailedShardsRoutingTests { routingTable = strategy.reroute(clusterState); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - assertThat(prevRoutingTable != routingTable, equalTo(true)); + // nothing will change, since primary shards have not started yet + assertThat(prevRoutingTable == routingTable, equalTo(true)); assertThat(routingTable.index("test").shards().size(), equalTo(10)); for (int i = 0; i < routingTable.index("test").shards().size(); i++) { assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); @@ -209,15 +221,14 @@ public class FailedShardsRoutingTests { assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING)); assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1")); assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1)); - // backup shards are initializing as well, we make sure that they recover from primary *started* shards in the IndicesClusterStateService - assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(INITIALIZING)); - assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), equalTo("node2")); + assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), nullValue()); } - logger.info("Start the primary shard (on node1)"); + logger.info("Start the primary shards"); RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData()); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -226,11 +237,10 @@ public class FailedShardsRoutingTests { assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); - assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1")); + assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), anyOf(equalTo("node1"), equalTo("node2"))); assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1)); - // backup shards are initializing as well, we make sure that they recover from primary *started* shards in the IndicesClusterStateService assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(INITIALIZING)); - assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), equalTo("node2")); + assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node1"))); } logger.info("Reroute, nothing should change"); @@ -238,7 +248,7 @@ public class FailedShardsRoutingTests { routingTable = strategy.reroute(clusterState); assertThat(prevRoutingTable == routingTable, equalTo(true)); - logger.info("Fail the backup shards"); + logger.info("Fail backup shards on node2"); routingNodes = routingTable.routingNodes(metaData); prevRoutingTable = routingTable; List failedShards = routingNodes.node("node2").shardsWithState(INITIALIZING); @@ -254,7 +264,6 @@ public class FailedShardsRoutingTests { assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1")); assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1)); - // backup shards are initializing as well, we make sure that they recover from primary *started* shards in the IndicesClusterStateService assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(UNASSIGNED)); assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), nullValue()); } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/SingleShardNoBackupsRoutingStrategyTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/SingleShardNoBackupsRoutingStrategyTests.java index 8056dec794d..31e4c9067f5 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/SingleShardNoBackupsRoutingStrategyTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/SingleShardNoBackupsRoutingStrategyTests.java @@ -47,7 +47,7 @@ import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class SingleShardNoBackupsRoutingStrategyTests { @@ -283,26 +283,7 @@ public class SingleShardNoBackupsRoutingStrategyTests { logger.info("Marking the shard as started"); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsOfType(INITIALIZING)); - clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - - assertThat(routingTable != prevRoutingTable, equalTo(true)); - for (int i = 0; i < numberOfIndices; i++) { - assertThat(routingTable.index("test" + i).shards().size(), equalTo(1)); - assertThat(routingTable.index("test" + i).shard(0).size(), equalTo(1)); - assertThat(routingTable.index("test" + i).shard(0).shards().size(), equalTo(1)); - assertThat(routingTable.index("test" + i).shard(0).shards().get(0).unassigned(), equalTo(false)); - assertThat(routingTable.index("test" + i).shard(0).shards().get(0).state(), equalTo(STARTED)); - assertThat(routingTable.index("test" + i).shard(0).shards().get(0).primary(), equalTo(true)); - // make sure we still have 2 shards initializing per node on the first 25 nodes - String nodeId = routingTable.index("test" + i).shard(0).shards().get(0).currentNodeId(); - int nodeIndex = Integer.parseInt(nodeId.substring("node".length())); - assertThat(nodeIndex, lessThan(25)); - } - - logger.info("Perform another round of reroute after we started the shards (we don't do automatic reroute when applying started shards)"); - prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); @@ -351,7 +332,7 @@ public class SingleShardNoBackupsRoutingStrategyTests { assertThat(routingTable.indicesRouting().size(), equalTo(numberOfIndices)); - logger.info("Starting 3 nodes and retouring"); + logger.info("Starting 3 nodes and rerouting"); clusterState = newClusterStateBuilder().state(clusterState) .nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3"))) .build(); @@ -385,26 +366,9 @@ public class SingleShardNoBackupsRoutingStrategyTests { routingNodes = routingTable.routingNodes(metaData); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsOfType(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - assertThat(prevRoutingTable != routingTable, equalTo(true)); - for (int i = 0; i < numberOfIndices; i++) { - assertThat(routingTable.index("test" + i).shards().size(), equalTo(1)); - assertThat(routingTable.index("test" + i).shard(0).size(), equalTo(1)); - assertThat(routingTable.index("test" + i).shard(0).shards().size(), equalTo(1)); - assertThat(routingTable.index("test" + i).shard(0).shards().get(0).state(), equalTo(STARTED)); - } - routingNodes = routingTable.routingNodes(metaData); - assertThat(routingNodes.numberOfShardsOfType(STARTED), equalTo(numberOfIndices)); - assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), anyOf(equalTo(3), equalTo(4))); - assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), anyOf(equalTo(3), equalTo(4))); - assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), anyOf(equalTo(3), equalTo(4))); - - logger.info("Now, reroute so we start the relocation process for even distribution (4 should be relocated)"); - prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState); - clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); for (int i = 0; i < numberOfIndices; i++) { assertThat(routingTable.index("test" + i).shards().size(), equalTo(1)); @@ -418,7 +382,7 @@ public class SingleShardNoBackupsRoutingStrategyTests { logger.info("Now, mark the relocated as started"); prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsOfType(INITIALIZING)); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); // routingTable = strategy.reroute(new RoutingStrategyInfo(metaData, routingTable), nodes); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/SingleShardOneBackupRoutingStrategyTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/SingleShardOneBackupRoutingStrategyTests.java index 00680868137..79a62f81794 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/SingleShardOneBackupRoutingStrategyTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/SingleShardOneBackupRoutingStrategyTests.java @@ -39,7 +39,7 @@ import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class SingleShardOneBackupRoutingStrategyTests { @@ -85,22 +85,13 @@ public class SingleShardOneBackupRoutingStrategyTests { assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(UNASSIGNED)); assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), nullValue()); - logger.info("Add another node and perform rerouting"); + logger.info("Add another node and perform rerouting, nothing will happen since primary shards not started"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build(); prevRoutingTable = routingTable; routingTable = strategy.reroute(clusterState); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - assertThat(prevRoutingTable != routingTable, equalTo(true)); - assertThat(routingTable.index("test").shards().size(), equalTo(1)); - assertThat(routingTable.index("test").shard(0).size(), equalTo(2)); - assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2)); - assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(INITIALIZING)); - assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1")); - assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(1)); - // backup shards are initializing as well, we make sure that they recover from primary *started* shards in the IndicesClusterStateService - assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(INITIALIZING)); - assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node2")); + assertThat(prevRoutingTable == routingTable, equalTo(true)); logger.info("Start the primary shard (on node1)"); RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData()); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/TenShardsOneBackupRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/TenShardsOneBackupRoutingTests.java index 1a323366039..03832fce198 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/TenShardsOneBackupRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/TenShardsOneBackupRoutingTests.java @@ -39,7 +39,7 @@ import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class TenShardsOneBackupRoutingTests { @@ -89,24 +89,13 @@ public class TenShardsOneBackupRoutingTests { assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), nullValue()); } - logger.info("Add another node and perform rerouting"); + logger.info("Add another node and perform rerouting, nothing will happen since primary not started"); clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build(); prevRoutingTable = routingTable; routingTable = strategy.reroute(clusterState); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - assertThat(prevRoutingTable != routingTable, equalTo(true)); - assertThat(routingTable.index("test").shards().size(), equalTo(10)); - for (int i = 0; i < routingTable.index("test").shards().size(); i++) { - assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); - assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); - assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING)); - assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1")); - assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1)); - // backup shards are initializing as well, we make sure that they recover from primary *started* shards in the IndicesClusterStateService - assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(INITIALIZING)); - assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), equalTo("node2")); - } + assertThat(prevRoutingTable == routingTable, equalTo(true)); logger.info("Start the primary shard (on node1)"); RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());