fix primary election on replica relocation

This commit is contained in:
kimchy 2010-08-19 14:59:50 +03:00
parent 9c0bbe9bb5
commit 22ea5e6608
11 changed files with 198 additions and 80 deletions

View File

@ -186,7 +186,7 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
public Builder removeReplica() {
for (int shardId : shards.keySet()) {
IndexShardRoutingTable indexShard = shards.get(shardId);
if (indexShard.backupsShards().isEmpty()) {
if (indexShard.replicaShards().isEmpty()) {
// nothing to do here!
return this;
}

View File

@ -107,14 +107,14 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return null;
}
public List<ShardRouting> backupsShards() {
List<ShardRouting> backupShards = Lists.newArrayListWithCapacity(2);
public List<ShardRouting> replicaShards() {
List<ShardRouting> replicaShards = Lists.newArrayListWithCapacity(2);
for (ShardRouting shardRouting : this) {
if (!shardRouting.primary()) {
backupShards.add(shardRouting);
replicaShards.add(shardRouting);
}
}
return backupShards;
return replicaShards;
}
int nextCounter() {

View File

@ -93,7 +93,7 @@ public class MutableShardRouting extends ImmutableShardRouting {
public void moveFromPrimary() {
if (!primary) {
throw new IllegalShardRoutingStateException(this, "Already primary, can't move to backup");
throw new IllegalShardRoutingStateException(this, "Already primary, can't move to replica");
}
primary = false;
}

View File

@ -180,11 +180,11 @@ public class ShardsRoutingStrategy extends AbstractComponent {
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary() && !shardEntry.assignedToNode()) {
boolean elected = false;
// primary and not assigned, go over and find a backup that is assigned
// primary and not assigned, go over and find a replica that is assigned and active (since it might be relocating)
for (RoutingNode routingNode : routingNodes.nodesToShards().values()) {
for (MutableShardRouting shardEntry2 : routingNode.shards()) {
if (shardEntry.shardId().equals(shardEntry2.shardId())) {
if (shardEntry.shardId().equals(shardEntry2.shardId()) && shardEntry2.active()) {
assert shardEntry2.assignedToNode();
assert !shardEntry2.primary();

View File

@ -0,0 +1,118 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.strategy;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.testng.annotations.Test;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
import static org.elasticsearch.cluster.routing.RoutingBuilders.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class ElectReplicaAsPrimaryDuringRelocationTests {
private final ESLogger logger = Loggers.getLogger(ElectReplicaAsPrimaryDuringRelocationTests.class);
@Test public void testElectReplicaAsPrimaryDuringRelocation() {
ShardsRoutingStrategy strategy = new ShardsRoutingStrategy();
logger.info("Building initial routing table");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(2).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
logger.info("Adding two nodes and performing rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start the primary shards");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start the replica shards");
routingNodes = routingTable.routingNodes(clusterState.metaData());
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(2));
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(2));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(2));
logger.info("Start another node and perform rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3"))).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("find the replica shard that gets relocated");
IndexShardRoutingTable indexShardRoutingTable = null;
if (routingTable.index("test").shard(0).replicaShards().get(0).relocating()) {
indexShardRoutingTable = routingTable.index("test").shard(0);
} else if (routingTable.index("test").shard(1).replicaShards().get(0).relocating()) {
indexShardRoutingTable = routingTable.index("test").shard(1);
}
assertThat("failed to find relocating replica", indexShardRoutingTable, notNullValue());
logger.info("kill the node [{}] of the primary shard for the relocating replica", indexShardRoutingTable.primaryShard().currentNodeId());
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).remove(indexShardRoutingTable.primaryShard().currentNodeId())).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("make sure all the primary shards are active");
assertThat(routingTable.index("test").shard(0).primaryShard().active(), equalTo(true));
assertThat(routingTable.index("test").shard(1).primaryShard().active(), equalTo(true));
}
private DiscoveryNode newNode(String nodeId) {
return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE);
}
}

View File

@ -83,9 +83,9 @@ public class FailedShardsRoutingTests {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), anyOf(equalTo("node1"), equalTo("node2")));
assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node1")));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node1")));
}
logger.info("Start the shards (backups)");
@ -101,9 +101,9 @@ public class FailedShardsRoutingTests {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), anyOf(equalTo("node1"), equalTo("node2")));
assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node1")));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node1")));
}
logger.info("Adding third node and reroute");
@ -201,9 +201,9 @@ public class FailedShardsRoutingTests {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), nullValue());
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), nullValue());
}
logger.info("Add another node and perform rerouting");
@ -220,9 +220,9 @@ public class FailedShardsRoutingTests {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), nullValue());
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), nullValue());
}
logger.info("Start the primary shards");
@ -238,9 +238,9 @@ public class FailedShardsRoutingTests {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), anyOf(equalTo("node1"), equalTo("node2")));
assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node1")));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node1")));
}
logger.info("Reroute, nothing should change");
@ -263,9 +263,9 @@ public class FailedShardsRoutingTests {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), nullValue());
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), nullValue());
}
// fail them again...
@ -280,10 +280,10 @@ public class FailedShardsRoutingTests {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
// backup shards are initializing as well, we make sure that they recover from primary *started* shards in the IndicesClusterStateService
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), nullValue());
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), nullValue());
}
}

View File

@ -92,7 +92,7 @@ public class PrimaryElectionRoutingTests {
assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(1));
// verify where the primary is
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node3"));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node3"));
}
private DiscoveryNode newNode(String nodeId) {

View File

@ -82,9 +82,9 @@ public class ReplicaAllocatedAfterPrimaryTests {
assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), nullValue());
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), nullValue());
logger.info("Start all the primary shards");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
@ -98,9 +98,9 @@ public class ReplicaAllocatedAfterPrimaryTests {
assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node2"));
}

View File

@ -81,9 +81,9 @@ public class SingleShardOneReplicaRoutingStrategyTests {
assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), nullValue());
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), nullValue());
logger.info("Add another node and perform rerouting, nothing will happen since primary shards not started");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build();
@ -105,10 +105,10 @@ public class SingleShardOneReplicaRoutingStrategyTests {
assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(1));
// backup shards are initializing as well, we make sure that they recover from primary *started* shards in the IndicesClusterStateService
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node2"));
logger.info("Reroute, nothing should change");
@ -128,9 +128,9 @@ public class SingleShardOneReplicaRoutingStrategyTests {
assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node2"));
logger.info("Kill node1, backup shard should become primary");
@ -145,10 +145,10 @@ public class SingleShardOneReplicaRoutingStrategyTests {
assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(1));
// backup shards are initializing as well, we make sure that they recover from primary *started* shards in the IndicesClusterStateService
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), nullValue());
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), nullValue());
logger.info("Start another node, backup shard should start initializing");
@ -163,10 +163,10 @@ public class SingleShardOneReplicaRoutingStrategyTests {
assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(1));
// backup shards are initializing as well, we make sure that they recover from primary *started* shards in the IndicesClusterStateService
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node3"));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node3"));
}
private DiscoveryNode newNode(String nodeId) {

View File

@ -84,9 +84,9 @@ public class TenShardsOneReplicaRoutingTests {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), nullValue());
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), nullValue());
}
logger.info("Add another node and perform rerouting, nothing will happen since primary not started");
@ -110,10 +110,10 @@ public class TenShardsOneReplicaRoutingTests {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
// backup shards are initializing as well, we make sure that they recover from primary *started* shards in the IndicesClusterStateService
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), equalTo("node2"));
}
logger.info("Reroute, nothing should change");
@ -135,9 +135,9 @@ public class TenShardsOneReplicaRoutingTests {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(i).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).backupsShards().get(0).currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), equalTo("node2"));
}
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(10));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(10));

View File

@ -76,9 +76,9 @@ public class UpdateNumberOfReplicasTests {
assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node2"));
logger.info("add another replica");
routingNodes = routingTable.routingNodes(clusterState.metaData());
@ -94,10 +94,10 @@ public class UpdateNumberOfReplicasTests {
assertThat(routingTable.index("test").shard(0).size(), equalTo(3));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(0).backupsShards().get(1).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(0).replicaShards().get(1).state(), equalTo(UNASSIGNED));
logger.info("Add another node and start the added replica");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3"))).build();
@ -110,11 +110,11 @@ public class UpdateNumberOfReplicasTests {
assertThat(routingTable.index("test").shard(0).size(), equalTo(3));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(0).backupsShards().get(1).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(0).backupsShards().get(1).currentNodeId(), equalTo("node3"));
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(0).replicaShards().get(1).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(0).replicaShards().get(1).currentNodeId(), equalTo("node3"));
routingNodes = routingTable.routingNodes(clusterState.metaData());
prevRoutingTable = routingTable;
@ -126,11 +126,11 @@ public class UpdateNumberOfReplicasTests {
assertThat(routingTable.index("test").shard(0).size(), equalTo(3));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(0).backupsShards().get(1).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).backupsShards().get(1).currentNodeId(), equalTo("node3"));
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(0).replicaShards().get(1).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).replicaShards().get(1).currentNodeId(), equalTo("node3"));
logger.info("now remove a replica");
routingNodes = routingTable.routingNodes(clusterState.metaData());
@ -146,9 +146,9 @@ public class UpdateNumberOfReplicasTests {
assertThat(routingTable.index("test").shard(0).size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node3")));
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node3")));
logger.info("do a reroute, should remain the same");
prevRoutingTable = routingTable;