start work on being able to change the number of replicas at runtime

This commit is contained in:
kimchy 2010-08-05 11:00:02 +03:00
parent 860d8058b6
commit 6d3d9fd807
6 changed files with 270 additions and 3 deletions

View File

@ -254,6 +254,20 @@ public class MetaData implements Iterable<IndexMetaData> {
return this;
}
public Builder updateNumberOfReplicas(int numberOfReplicas, String... indices) {
if (indices == null || indices.length == 0) {
indices = this.indices.map().keySet().toArray(new String[this.indices.map().keySet().size()]);
}
for (String index : indices) {
IndexMetaData indexMetaData = this.indices.get(index);
if (indexMetaData == null) {
throw new IndexMissingException(new Index(index));
}
put(IndexMetaData.newIndexMetaDataBuilder(indexMetaData).numberOfReplicas(numberOfReplicas).build());
}
return this;
}
/**
* Indicates that this cluster state has been recovered from the gateawy.
*/

View File

@ -29,7 +29,7 @@ import java.io.IOException;
import java.io.Serializable;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class ImmutableShardRouting implements Streamable, Serializable, ShardRouting {

View File

@ -165,7 +165,7 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
}
/**
* Initializes a new empry index
* Initializes a new empty index
*/
public Builder initializeEmpty(IndexMetaData indexMetaData) {
for (int shardId = 0; shardId < indexMetaData.numberOfShards(); shardId++) {
@ -176,6 +176,48 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
return this;
}
public Builder addReplica() {
for (int shardId : shards.keySet()) {
addShard(shardId, null, false, ShardRoutingState.UNASSIGNED);
}
return this;
}
public Builder removeReplica() {
for (int shardId : shards.keySet()) {
IndexShardRoutingTable indexShard = shards.get(shardId);
if (indexShard.backupsShards().isEmpty()) {
// nothing to do here!
return this;
}
// re-add all the current ones
IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(indexShard.shardId());
for (ShardRouting shardRouting : indexShard) {
builder.addShard(new ImmutableShardRouting(shardRouting));
}
// first check if there is one that is not assigned to a node, and remove it
boolean removed = false;
for (ShardRouting shardRouting : indexShard) {
if (!shardRouting.primary() && !shardRouting.assignedToNode()) {
builder.removeShard(shardRouting);
removed = true;
break;
}
}
if (!removed) {
for (ShardRouting shardRouting : indexShard) {
if (!shardRouting.primary()) {
builder.removeShard(shardRouting);
removed = true;
break;
}
}
}
shards.put(shardId, builder.build());
}
return this;
}
public Builder addIndexShard(IndexShardRoutingTable indexShard) {
shards.put(indexShard.shardId().id(), indexShard);
return this;

View File

@ -36,7 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.collect.Lists.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class IndexShardRoutingTable implements Iterable<ShardRouting> {
@ -252,6 +252,11 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return this;
}
public Builder removeShard(ShardRouting shardEntry) {
shards.remove(shardEntry);
return this;
}
public IndexShardRoutingTable build() {
return new IndexShardRoutingTable(shardId, ImmutableList.copyOf(shards));
}

View File

@ -153,6 +153,48 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
private final Map<String, IndexRoutingTable> indicesRouting = newHashMap();
public Builder routingTable(RoutingTable routingTable) {
for (IndexRoutingTable indexRoutingTable : routingTable) {
indicesRouting.put(indexRoutingTable.index(), indexRoutingTable);
}
return this;
}
public Builder updateNumberOfReplicas(int numberOfReplicas, String... indices) throws IndexMissingException {
if (indices == null || indices.length == 0) {
indices = indicesRouting.keySet().toArray(new String[indicesRouting.keySet().size()]);
}
for (String index : indices) {
IndexRoutingTable indexRoutingTable = indicesRouting.get(index);
if (indexRoutingTable == null) {
throw new IndexMissingException(new Index(index));
}
int currentNumberOfReplicas = indexRoutingTable.shards().get(0).size() - 1; // remove the required primary
IndexRoutingTable.Builder builder = new IndexRoutingTable.Builder(index);
// re-add all the shards
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
builder.addIndexShard(indexShardRoutingTable);
}
if (currentNumberOfReplicas < numberOfReplicas) {
// now, add "empty" ones
for (int i = 0; i < (numberOfReplicas - currentNumberOfReplicas); i++) {
builder.addReplica();
}
} else if (currentNumberOfReplicas > numberOfReplicas) {
int delta = currentNumberOfReplicas - numberOfReplicas;
if (delta <= 0) {
// ignore, can't remove below the current one...
} else {
for (int i = 0; i < delta; i++) {
builder.removeReplica();
}
}
}
indicesRouting.put(index, builder.build());
}
return this;
}
public Builder add(IndexRoutingTable indexRoutingTable) {
indexRoutingTable.validate();
indicesRouting.put(indexRoutingTable.index(), indexRoutingTable);

View File

@ -0,0 +1,164 @@
package org.elasticsearch.cluster.routing.strategy;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.testng.annotations.Test;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
import static org.elasticsearch.cluster.routing.RoutingBuilders.*;
import static org.elasticsearch.cluster.routing.RoutingTable.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class UpdateNumberOfReplicasTests {
private final ESLogger logger = Loggers.getLogger(UpdateNumberOfReplicasTests.class);
@Test public void testUpdateNumberOfReplicas() {
ShardsRoutingStrategy strategy = new ShardsRoutingStrategy();
logger.info("Building initial routing table");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
assertThat(routingTable.index("test").shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).shards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(0).shards().get(1).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(0).shards().get(0).currentNodeId(), nullValue());
assertThat(routingTable.index("test").shard(0).shards().get(1).currentNodeId(), nullValue());
logger.info("Adding two nodes and performing rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start all the primary shards");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start all the replica shards");
routingNodes = routingTable.routingNodes(clusterState.metaData());
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node2"));
logger.info("add another replica");
routingNodes = routingTable.routingNodes(clusterState.metaData());
prevRoutingTable = routingTable;
routingTable = newRoutingTableBuilder().routingTable(routingTable).updateNumberOfReplicas(2).build();
metaData = MetaData.newMetaDataBuilder().metaData(clusterState.metaData()).updateNumberOfReplicas(2).build();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).metaData(metaData).build();
assertThat(clusterState.metaData().index("test").numberOfReplicas(), equalTo(2));
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).size(), equalTo(3));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(0).backupsShards().get(1).state(), equalTo(UNASSIGNED));
logger.info("Add another node and start the added replica");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3"))).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).size(), equalTo(3));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(0).backupsShards().get(1).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(0).backupsShards().get(1).currentNodeId(), equalTo("node3"));
routingNodes = routingTable.routingNodes(clusterState.metaData());
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).size(), equalTo(3));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(0).backupsShards().get(1).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).backupsShards().get(1).currentNodeId(), equalTo("node3"));
logger.info("now remove a replica");
routingNodes = routingTable.routingNodes(clusterState.metaData());
prevRoutingTable = routingTable;
routingTable = newRoutingTableBuilder().routingTable(routingTable).updateNumberOfReplicas(1).build();
metaData = MetaData.newMetaDataBuilder().metaData(clusterState.metaData()).updateNumberOfReplicas(1).build();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).metaData(metaData).build();
assertThat(clusterState.metaData().index("test").numberOfReplicas(), equalTo(1));
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(0).backupsShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).backupsShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node3")));
logger.info("do a reroute, should remain the same");
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(false));
}
private DiscoveryNode newNode(String nodeId) {
return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE);
}
}