diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index 8fe4c88bff9..9f423e8d56b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -254,6 +254,20 @@ public class MetaData implements Iterable<IndexMetaData> { return this; } + public Builder updateNumberOfReplicas(int numberOfReplicas, String... indices) { + if (indices == null || indices.length == 0) { + indices = this.indices.map().keySet().toArray(new String[this.indices.map().keySet().size()]); + } + for (String index : indices) { + IndexMetaData indexMetaData = this.indices.get(index); + if (indexMetaData == null) { + throw new IndexMissingException(new Index(index)); + } + put(IndexMetaData.newIndexMetaDataBuilder(indexMetaData).numberOfReplicas(numberOfReplicas).build()); + } + return this; + } + /** * Indicates that this cluster state has been recovered from the gateawy. */ diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ImmutableShardRouting.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ImmutableShardRouting.java index e34c9d53879..2e8410a8c22 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ImmutableShardRouting.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ImmutableShardRouting.java @@ -29,7 +29,7 @@ import java.io.IOException; import java.io.Serializable; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class ImmutableShardRouting implements Streamable, Serializable, ShardRouting { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java index 7e77004a1a1..cc903b8df2d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java @@ -165,7 +165,7 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> { } /** - * Initializes a new empry index + * Initializes a new empty index */ public Builder initializeEmpty(IndexMetaData indexMetaData) { for (int shardId = 0; shardId < indexMetaData.numberOfShards(); shardId++) { @@ -176,6 +176,48 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> { return this; } + public Builder addReplica() { + for (int shardId : shards.keySet()) { + addShard(shardId, null, false, ShardRoutingState.UNASSIGNED); + } + return this; + } + + public Builder removeReplica() { + for (int shardId : shards.keySet()) { + IndexShardRoutingTable indexShard = shards.get(shardId); + if (indexShard.backupsShards().isEmpty()) { + // nothing to do here! + return this; + } + // re-add all the current ones + IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(indexShard.shardId()); + for (ShardRouting shardRouting : indexShard) { + builder.addShard(new ImmutableShardRouting(shardRouting)); + } + // first check if there is one that is not assigned to a node, and remove it + boolean removed = false; + for (ShardRouting shardRouting : indexShard) { + if (!shardRouting.primary() && !shardRouting.assignedToNode()) { + builder.removeShard(shardRouting); + removed = true; + break; + } + } + if (!removed) { + for (ShardRouting shardRouting : indexShard) { + if (!shardRouting.primary()) { + builder.removeShard(shardRouting); + removed = true; + break; + } + } + } + shards.put(shardId, builder.build()); + } + return this; + } + public Builder addIndexShard(IndexShardRoutingTable indexShard) { shards.put(indexShard.shardId().id(), indexShard); return this; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index e80ab7d0713..18f681ff325 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -36,7 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.common.collect.Lists.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class IndexShardRoutingTable implements Iterable<ShardRouting> { @@ -252,6 +252,11 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> { return this; } + public Builder removeShard(ShardRouting shardEntry) { + shards.remove(shardEntry); + return this; + } + public IndexShardRoutingTable build() { return new IndexShardRoutingTable(shardId, ImmutableList.copyOf(shards)); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index 3f245e2644e..e4a326ad567 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -153,6 +153,48 @@ public class RoutingTable implements Iterable<IndexRoutingTable> { private final Map<String, IndexRoutingTable> indicesRouting = newHashMap(); + public Builder routingTable(RoutingTable routingTable) { + for (IndexRoutingTable indexRoutingTable : routingTable) { + indicesRouting.put(indexRoutingTable.index(), indexRoutingTable); + } + return this; + } + + public Builder updateNumberOfReplicas(int numberOfReplicas, String... indices) throws IndexMissingException { + if (indices == null || indices.length == 0) { + indices = indicesRouting.keySet().toArray(new String[indicesRouting.keySet().size()]); + } + for (String index : indices) { + IndexRoutingTable indexRoutingTable = indicesRouting.get(index); + if (indexRoutingTable == null) { + throw new IndexMissingException(new Index(index)); + } + int currentNumberOfReplicas = indexRoutingTable.shards().get(0).size() - 1; // remove the required primary + IndexRoutingTable.Builder builder = new IndexRoutingTable.Builder(index); + // re-add all the shards + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + builder.addIndexShard(indexShardRoutingTable); + } + if (currentNumberOfReplicas < numberOfReplicas) { + // now, add "empty" ones + for (int i = 0; i < (numberOfReplicas - currentNumberOfReplicas); i++) { + builder.addReplica(); + } + } else if (currentNumberOfReplicas > numberOfReplicas) { + int delta = currentNumberOfReplicas - numberOfReplicas; + if (delta <= 0) { + // ignore, can't remove below the current one... + } else { + for (int i = 0; i < delta; i++) { + builder.removeReplica(); + } + } + } + indicesRouting.put(index, builder.build()); + } + return this; + } + public Builder add(IndexRoutingTable indexRoutingTable) { indexRoutingTable.validate(); indicesRouting.put(indexRoutingTable.index(), indexRoutingTable); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/UpdateNumberOfReplicasTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/UpdateNumberOfReplicasTests.java new file mode 100644 index 00000000000..8379183d77f --- /dev/null +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/UpdateNumberOfReplicasTests.java @@ -0,0 +1,164 @@ +package org.elasticsearch.cluster.routing.strategy; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.testng.annotations.Test; + +import static org.elasticsearch.cluster.ClusterState.*; +import static org.elasticsearch.cluster.metadata.IndexMetaData.*; +import static org.elasticsearch.cluster.metadata.MetaData.*; +import static org.elasticsearch.cluster.node.DiscoveryNodes.*; +import static org.elasticsearch.cluster.routing.RoutingBuilders.*; +import static org.elasticsearch.cluster.routing.RoutingTable.*; +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +/** + * @author kimchy (shay.banon) + */ +public class UpdateNumberOfReplicasTests { + + private final ESLogger logger = Loggers.getLogger(UpdateNumberOfReplicasTests.class); + + @Test public void testUpdateNumberOfReplicas() { + ShardsRoutingStrategy strategy = new ShardsRoutingStrategy(); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test").initializeEmpty(metaData.index("test"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + assertThat(routingTable.index("test").shards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(0).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(0).shards().get(0).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(0).shards().get(1).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(0).shards().get(0).currentNodeId(), nullValue()); + assertThat(routingTable.index("test").shard(0).shards().get(1).currentNodeId(), nullValue()); + + + logger.info("Adding two nodes and performing rerouting"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build(); + + RoutingTable prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + logger.info("Start all the primary shards"); + RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData()); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + logger.info("Start all the replica shards"); + routingNodes = routingTable.routingNodes(clusterState.metaData()); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(0).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1")); + assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node2")); + + logger.info("add another replica"); + routingNodes = routingTable.routingNodes(clusterState.metaData()); + prevRoutingTable = routingTable; + routingTable = newRoutingTableBuilder().routingTable(routingTable).updateNumberOfReplicas(2).build(); + metaData = MetaData.newMetaDataBuilder().metaData(clusterState.metaData()).updateNumberOfReplicas(2).build(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).metaData(metaData).build(); + + assertThat(clusterState.metaData().index("test").numberOfReplicas(), equalTo(2)); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(0).size(), equalTo(3)); + assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1")); + assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node2")); + assertThat(routingTable.index("test").shard(0).backupsShards().get(1).state(), equalTo(UNASSIGNED)); + + logger.info("Add another node and start the added replica"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3"))).build(); + prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(0).size(), equalTo(3)); + assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1")); + assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node2")); + assertThat(routingTable.index("test").shard(0).backupsShards().get(1).state(), equalTo(INITIALIZING)); + assertThat(routingTable.index("test").shard(0).backupsShards().get(1).currentNodeId(), equalTo("node3")); + + routingNodes = routingTable.routingNodes(clusterState.metaData()); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(0).size(), equalTo(3)); + assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1")); + assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node2")); + assertThat(routingTable.index("test").shard(0).backupsShards().get(1).state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(0).backupsShards().get(1).currentNodeId(), equalTo("node3")); + + logger.info("now remove a replica"); + routingNodes = routingTable.routingNodes(clusterState.metaData()); + prevRoutingTable = routingTable; + routingTable = newRoutingTableBuilder().routingTable(routingTable).updateNumberOfReplicas(1).build(); + metaData = MetaData.newMetaDataBuilder().metaData(clusterState.metaData()).updateNumberOfReplicas(1).build(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).metaData(metaData).build(); + + assertThat(clusterState.metaData().index("test").numberOfReplicas(), equalTo(1)); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(0).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1")); + assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node3"))); + + logger.info("do a reroute, should remain the same"); + prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable != routingTable, equalTo(false)); + } + + private DiscoveryNode newNode(String nodeId) { + return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE); + } +}