allocate replica shards (backups) only after the primaries have been allocated, this results in better theoretical shard allocation, and the ability to reuse local storage index files (comared against the backup)

This commit is contained in:
kimchy 2010-07-06 17:58:05 +03:00
parent 1d39bb4d51
commit 070cb5b295
8 changed files with 187 additions and 99 deletions

View File

@ -333,6 +333,11 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
@Override public void onTimeout(TimeValue timeValue) {
// just to be on the safe side, see if we can start it now?
if (start(true)) {
clusterService.remove(this);
return;
}
final PrimaryNotStartedActionException failure = new PrimaryNotStartedActionException(shardId, "Timeout waiting for [" + timeValue + "]");
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {

View File

@ -146,7 +146,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
return count;
}
public List<MutableShardRouting> shardsOfType(ShardRoutingState state) {
public List<MutableShardRouting> shardsWithState(ShardRoutingState... state) {
List<MutableShardRouting> shards = newArrayList();
for (RoutingNode routingNode : this) {
shards.addAll(routingNode.shardsWithState(state));

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing.strategy;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -61,6 +62,7 @@ public class ShardsRoutingStrategy extends AbstractComponent {
if (!applyStartedShards(routingNodes, startedShardEntries)) {
return clusterState.routingTable();
}
reroute(routingNodes, clusterState.nodes());
return new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData());
}
@ -74,6 +76,8 @@ public class ShardsRoutingStrategy extends AbstractComponent {
if (!applyFailedShards(routingNodes, failedShardEntries)) {
return clusterState.routingTable();
}
// If we reroute again, the failed shard will try and be assigned to the same node, which we do no do in the applyFailedShards
// reroute(routingNodes, clusterState.nodes());
return new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData());
}
@ -84,8 +88,14 @@ public class ShardsRoutingStrategy extends AbstractComponent {
*/
public RoutingTable reroute(ClusterState clusterState) {
RoutingNodes routingNodes = clusterState.routingNodes();
if (!reroute(routingNodes, clusterState.nodes())) {
return clusterState.routingTable();
}
return new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData());
}
Iterable<DiscoveryNode> dataNodes = clusterState.nodes().dataNodes().values();
private boolean reroute(RoutingNodes routingNodes, DiscoveryNodes nodes) {
Iterable<DiscoveryNode> dataNodes = nodes.dataNodes().values();
boolean changed = false;
// first, clear from the shards any node id they used to belong to that is now dead
@ -111,11 +121,7 @@ public class ShardsRoutingStrategy extends AbstractComponent {
// rebalance
changed |= rebalance(routingNodes);
if (!changed) {
return clusterState.routingTable();
}
return new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData());
return changed;
}
private boolean rebalance(RoutingNodes routingNodes) {
@ -207,6 +213,13 @@ public class ShardsRoutingStrategy extends AbstractComponent {
int lastNode = 0;
while (unassignedIterator.hasNext()) {
MutableShardRouting shard = unassignedIterator.next();
if (!shard.primary()) {
// if its a backup, only allocate it if the primary is active
MutableShardRouting primary = routingNodes.findPrimaryForBackup(shard);
if (primary == null || !primary.active()) {
continue;
}
}
for (int i = 0; i < nodes.size(); i++) {
RoutingNode node = nodes.get(lastNode);
lastNode++;
@ -229,12 +242,19 @@ public class ShardsRoutingStrategy extends AbstractComponent {
// allocate all the unassigned shards above the average per node.
for (Iterator<MutableShardRouting> it = routingNodes.unassigned().iterator(); it.hasNext();) {
MutableShardRouting shardRoutingEntry = it.next();
MutableShardRouting shard = it.next();
if (!shard.primary()) {
// if its a backup, only allocate it if the primary is active
MutableShardRouting primary = routingNodes.findPrimaryForBackup(shard);
if (primary == null || !primary.active()) {
continue;
}
}
// go over the nodes and try and allocate the remaining ones
for (RoutingNode routingNode : routingNodes.nodesToShards().values()) {
if (routingNode.canAllocate(routingNodes.metaData(), routingNodes.routingTable()) && routingNode.canAllocate(shardRoutingEntry)) {
if (routingNode.canAllocate(routingNodes.metaData(), routingNodes.routingTable()) && routingNode.canAllocate(shard)) {
changed = true;
routingNode.add(shardRoutingEntry);
routingNode.add(shard);
it.remove();
break;
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.strategy;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.testng.annotations.Test;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
import static org.elasticsearch.cluster.routing.RoutingBuilders.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class BackupAllocatedAfterPrimaryTests {
private final ESLogger logger = Loggers.getLogger(BackupAllocatedAfterPrimaryTests.class);
@Test public void testBackupIsAllocatedAfterPrimary() {
ShardsRoutingStrategy strategy = new ShardsRoutingStrategy();
logger.info("Building initial routing table");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
assertThat(routingTable.index("test").shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).shards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(0).shards().get(1).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(0).shards().get(0).currentNodeId(), nullValue());
assertThat(routingTable.index("test").shard(0).shards().get(1).currentNodeId(), nullValue());
logger.info("Adding one node and performing rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), nullValue());
logger.info("Start all the primary shards");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node2"));
}
private DiscoveryNode newNode(String nodeId) {
return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE);
}
}

View File

@ -70,16 +70,10 @@ public class FailedShardsRoutingTests {
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start the primary shard (on node1)");
logger.info("Start the shards (primaries)");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start the backup shard (on node2)");
routingNodes = routingTable.routingNodes(clusterState.metaData());
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -88,11 +82,28 @@ public class FailedShardsRoutingTests {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), anyOf(equalTo("node1"), equalTo("node2")));
assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node1")));
}
logger.info("Start the shards (backups)");
routingNodes = routingTable.routingNodes(clusterState.metaData());
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), anyOf(equalTo("node1"), equalTo("node2")));
assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1));
// backup shards are initializing as well, we make sure that they recover from primary *started* shards in the IndicesClusterStateService
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node1")));
}
logger.info("Adding third node and reroute");
@ -201,7 +212,8 @@ public class FailedShardsRoutingTests {
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
// nothing will change, since primary shards have not started yet
assertThat(prevRoutingTable == routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(10));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
@ -209,15 +221,14 @@ public class FailedShardsRoutingTests {
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1));
// backup shards are initializing as well, we make sure that they recover from primary *started* shards in the IndicesClusterStateService
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), nullValue());
}
logger.info("Start the primary shard (on node1)");
logger.info("Start the primary shards");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -226,11 +237,10 @@ public class FailedShardsRoutingTests {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), anyOf(equalTo("node1"), equalTo("node2")));
assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1));
// backup shards are initializing as well, we make sure that they recover from primary *started* shards in the IndicesClusterStateService
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node1")));
}
logger.info("Reroute, nothing should change");
@ -238,7 +248,7 @@ public class FailedShardsRoutingTests {
routingTable = strategy.reroute(clusterState);
assertThat(prevRoutingTable == routingTable, equalTo(true));
logger.info("Fail the backup shards");
logger.info("Fail backup shards on node2");
routingNodes = routingTable.routingNodes(metaData);
prevRoutingTable = routingTable;
List<MutableShardRouting> failedShards = routingNodes.node("node2").shardsWithState(INITIALIZING);
@ -254,7 +264,6 @@ public class FailedShardsRoutingTests {
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1));
// backup shards are initializing as well, we make sure that they recover from primary *started* shards in the IndicesClusterStateService
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), nullValue());
}

View File

@ -47,7 +47,7 @@ import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class SingleShardNoBackupsRoutingStrategyTests {
@ -283,26 +283,7 @@ public class SingleShardNoBackupsRoutingStrategyTests {
logger.info("Marking the shard as started");
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsOfType(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable != prevRoutingTable, equalTo(true));
for (int i = 0; i < numberOfIndices; i++) {
assertThat(routingTable.index("test" + i).shards().size(), equalTo(1));
assertThat(routingTable.index("test" + i).shard(0).size(), equalTo(1));
assertThat(routingTable.index("test" + i).shard(0).shards().size(), equalTo(1));
assertThat(routingTable.index("test" + i).shard(0).shards().get(0).unassigned(), equalTo(false));
assertThat(routingTable.index("test" + i).shard(0).shards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test" + i).shard(0).shards().get(0).primary(), equalTo(true));
// make sure we still have 2 shards initializing per node on the first 25 nodes
String nodeId = routingTable.index("test" + i).shard(0).shards().get(0).currentNodeId();
int nodeIndex = Integer.parseInt(nodeId.substring("node".length()));
assertThat(nodeIndex, lessThan(25));
}
logger.info("Perform another round of reroute after we started the shards (we don't do automatic reroute when applying started shards)");
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -351,7 +332,7 @@ public class SingleShardNoBackupsRoutingStrategyTests {
assertThat(routingTable.indicesRouting().size(), equalTo(numberOfIndices));
logger.info("Starting 3 nodes and retouring");
logger.info("Starting 3 nodes and rerouting");
clusterState = newClusterStateBuilder().state(clusterState)
.nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3")))
.build();
@ -385,26 +366,9 @@ public class SingleShardNoBackupsRoutingStrategyTests {
routingNodes = routingTable.routingNodes(metaData);
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsOfType(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
for (int i = 0; i < numberOfIndices; i++) {
assertThat(routingTable.index("test" + i).shards().size(), equalTo(1));
assertThat(routingTable.index("test" + i).shard(0).size(), equalTo(1));
assertThat(routingTable.index("test" + i).shard(0).shards().size(), equalTo(1));
assertThat(routingTable.index("test" + i).shard(0).shards().get(0).state(), equalTo(STARTED));
}
routingNodes = routingTable.routingNodes(metaData);
assertThat(routingNodes.numberOfShardsOfType(STARTED), equalTo(numberOfIndices));
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), anyOf(equalTo(3), equalTo(4)));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), anyOf(equalTo(3), equalTo(4)));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), anyOf(equalTo(3), equalTo(4)));
logger.info("Now, reroute so we start the relocation process for even distribution (4 should be relocated)");
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
for (int i = 0; i < numberOfIndices; i++) {
assertThat(routingTable.index("test" + i).shards().size(), equalTo(1));
@ -418,7 +382,7 @@ public class SingleShardNoBackupsRoutingStrategyTests {
logger.info("Now, mark the relocated as started");
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsOfType(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
// routingTable = strategy.reroute(new RoutingStrategyInfo(metaData, routingTable), nodes);

View File

@ -39,7 +39,7 @@ import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class SingleShardOneBackupRoutingStrategyTests {
@ -85,22 +85,13 @@ public class SingleShardOneBackupRoutingStrategyTests {
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), nullValue());
logger.info("Add another node and perform rerouting");
logger.info("Add another node and perform rerouting, nothing will happen since primary shards not started");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(1));
// backup shards are initializing as well, we make sure that they recover from primary *started* shards in the IndicesClusterStateService
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node2"));
assertThat(prevRoutingTable == routingTable, equalTo(true));
logger.info("Start the primary shard (on node1)");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());

View File

@ -39,7 +39,7 @@ import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class TenShardsOneBackupRoutingTests {
@ -89,24 +89,13 @@ public class TenShardsOneBackupRoutingTests {
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), nullValue());
}
logger.info("Add another node and perform rerouting");
logger.info("Add another node and perform rerouting, nothing will happen since primary not started");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(10));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1));
// backup shards are initializing as well, we make sure that they recover from primary *started* shards in the IndicesClusterStateService
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), equalTo("node2"));
}
assertThat(prevRoutingTable == routingTable, equalTo(true));
logger.info("Start the primary shard (on node1)");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());