From 3117341f44a31276655d2efb6c1ef957e37405cc Mon Sep 17 00:00:00 2001 From: kimchy Date: Sat, 21 Aug 2010 17:49:19 +0300 Subject: [PATCH] Shards Allocation: Only rebalance a shard if all its instances are already active, closes #331. --- .idea/dictionaries/kimchy.xml | 1 + .../cluster/routing/IndexRoutingTable.java | 11 ++ .../routing/IndexShardRoutingTable.java | 12 ++ .../cluster/routing/RoutingNodes.java | 23 ++- .../cluster/routing/RoutingTable.java | 9 ++ ...ferUnallocatedShardUnassignedStrategy.java | 4 +- .../strategy/ShardsRoutingStrategy.java | 23 ++- .../strategy/RebalanceAfterActiveTests.java | 153 ++++++++++++++++++ 8 files changed, 229 insertions(+), 7 deletions(-) create mode 100644 modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/RebalanceAfterActiveTests.java diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml index 58f9959b83e..f6c2978fbcd 100644 --- a/.idea/dictionaries/kimchy.xml +++ b/.idea/dictionaries/kimchy.xml @@ -101,6 +101,7 @@ queryparser rackspace rebalance + rebalancing regex reparse retrans diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java index 66f062cd16b..1c039a120fd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java @@ -31,9 +31,12 @@ import org.elasticsearch.common.util.concurrent.Immutable; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; +import static org.elasticsearch.common.collect.Lists.*; + /** * @author kimchy (Shay Banon) */ @@ -102,6 +105,14 @@ public class IndexRoutingTable implements Iterable { return shards.get(shardId); } + public List shardsWithState(ShardRoutingState... states) { + List shards = newArrayList(); + for (IndexShardRoutingTable shardRoutingTable : this) { + shards.addAll(shardRoutingTable.shardsWithState(states)); + } + return shards; + } + /** * A group shards iterator where each group ({@link ShardsIterator} * is an iterator across shard replication group. diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 11faca550d3..c5f04d7c011 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -117,6 +117,18 @@ public class IndexShardRoutingTable implements Iterable { return replicaShards; } + public List shardsWithState(ShardRoutingState... states) { + List shards = newArrayList(); + for (ShardRouting shardEntry : this) { + for (ShardRoutingState state : states) { + if (shardEntry.state() == state) { + shards.add(shardEntry); + } + } + } + return shards; + } + int nextCounter() { return counter.getAndIncrement(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 6267b67ffe3..be291aa6d56 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -126,7 +126,7 @@ public class RoutingNodes implements Iterable { return nodesToShards.get(nodeId); } - public MutableShardRouting findPrimaryForBackup(MutableShardRouting shard) { + public MutableShardRouting findPrimaryForReplica(MutableShardRouting shard) { assert !shard.primary(); for (RoutingNode routingNode : nodesToShards.values()) { for (MutableShardRouting shardRouting : routingNode) { @@ -138,6 +138,27 @@ public class RoutingNodes implements Iterable { return null; } + public List shardsRoutingFor(ShardRouting shardRouting) { + return shardsRoutingFor(shardRouting.index(), shardRouting.id()); + } + + public List shardsRoutingFor(String index, int shardId) { + List shards = newArrayList(); + for (RoutingNode routingNode : this) { + for (MutableShardRouting shardRouting : routingNode) { + if (shardRouting.index().equals(index) && shardRouting.id() == shardId) { + shards.add(shardRouting); + } + } + } + for (MutableShardRouting shardRouting : unassigned) { + if (shardRouting.index().equals(index) && shardRouting.id() == shardId) { + shards.add(shardRouting); + } + } + return shards; + } + public int numberOfShardsOfType(ShardRoutingState state) { int count = 0; for (RoutingNode routingNode : this) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index e4a326ad567..d46f749e236 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import static org.elasticsearch.common.collect.Lists.*; import static org.elasticsearch.common.collect.Maps.*; /** @@ -90,6 +91,14 @@ public class RoutingTable implements Iterable { return validation; } + public List shardsWithState(ShardRoutingState... states) { + List shards = newArrayList(); + for (IndexRoutingTable indexRoutingTable : this) { + shards.addAll(indexRoutingTable.shardsWithState(states)); + } + return shards; + } + /** * All the shards (replicas) for the provided indices. * diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/PreferUnallocatedShardUnassignedStrategy.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/PreferUnallocatedShardUnassignedStrategy.java index fb53ebe2244..4d43fa99cf9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/PreferUnallocatedShardUnassignedStrategy.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/PreferUnallocatedShardUnassignedStrategy.java @@ -107,7 +107,7 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent if (!shard.primary()) { // if its a backup, only allocate it if the primary is active - MutableShardRouting primary = routingNodes.findPrimaryForBackup(shard); + MutableShardRouting primary = routingNodes.findPrimaryForReplica(shard); if (primary == null || !primary.active()) { continue; } @@ -209,7 +209,7 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent // if its backup, see if there is a primary that *is* allocated, and try and assign a location that is closest to it // note, since we replicate operations, this might not be the same (different flush intervals) if (!shard.primary()) { - MutableShardRouting primaryShard = routingNodes.findPrimaryForBackup(shard); + MutableShardRouting primaryShard = routingNodes.findPrimaryForReplica(shard); if (primaryShard != null && primaryShard.active()) { TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData primaryNodeStoreFileMetaData = nodesStoreFilesMetaData.nodesMap().get(primaryShard.currentNodeId()); if (primaryNodeStoreFileMetaData != null && primaryNodeStoreFileMetaData.storeFilesMetaData() != null && primaryNodeStoreFileMetaData.storeFilesMetaData().allocated()) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/ShardsRoutingStrategy.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/ShardsRoutingStrategy.java index 7aec8c3dffa..8e870818d0a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/ShardsRoutingStrategy.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/ShardsRoutingStrategy.java @@ -158,6 +158,19 @@ public class ShardsRoutingStrategy extends AbstractComponent { boolean relocated = false; List activeShards = highRoutingNode.shardsWithState(STARTED); for (MutableShardRouting activeShard : activeShards) { + // we only relocate shards that all other shards within the replication group are active + List allShards = routingNodes.shardsRoutingFor(activeShard); + boolean ignoreShard = false; + for (MutableShardRouting allShard : allShards) { + if (!allShard.active()) { + ignoreShard = true; + break; + } + } + if (ignoreShard) { + continue; + } + if (lowRoutingNode.canAllocate(routingNodes.metaData(), routingNodes.routingTable()) && lowRoutingNode.canAllocate(activeShard)) { changed = true; lowRoutingNode.add(new MutableShardRouting(activeShard.index(), activeShard.id(), @@ -217,13 +230,15 @@ public class ShardsRoutingStrategy extends AbstractComponent { int lastNode = 0; while (unassignedIterator.hasNext()) { MutableShardRouting shard = unassignedIterator.next(); + // if its a replica, only allocate it if the primary is active if (!shard.primary()) { - // if its a backup, only allocate it if the primary is active - MutableShardRouting primary = routingNodes.findPrimaryForBackup(shard); + MutableShardRouting primary = routingNodes.findPrimaryForReplica(shard); if (primary == null || !primary.active()) { continue; } } + + // do the allocation, finding the least "busy" node for (int i = 0; i < nodes.size(); i++) { RoutingNode node = nodes.get(lastNode); lastNode++; @@ -247,9 +262,9 @@ public class ShardsRoutingStrategy extends AbstractComponent { // allocate all the unassigned shards above the average per node. for (Iterator it = routingNodes.unassigned().iterator(); it.hasNext();) { MutableShardRouting shard = it.next(); + // if its a backup, only allocate it if the primary is active if (!shard.primary()) { - // if its a backup, only allocate it if the primary is active - MutableShardRouting primary = routingNodes.findPrimaryForBackup(shard); + MutableShardRouting primary = routingNodes.findPrimaryForReplica(shard); if (primary == null || !primary.active()) { continue; } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/RebalanceAfterActiveTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/RebalanceAfterActiveTests.java new file mode 100644 index 00000000000..777fa194276 --- /dev/null +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/strategy/RebalanceAfterActiveTests.java @@ -0,0 +1,153 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.strategy; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.testng.annotations.Test; + +import static org.elasticsearch.cluster.ClusterState.*; +import static org.elasticsearch.cluster.metadata.IndexMetaData.*; +import static org.elasticsearch.cluster.metadata.MetaData.*; +import static org.elasticsearch.cluster.node.DiscoveryNodes.*; +import static org.elasticsearch.cluster.routing.RoutingBuilders.*; +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +/** + * @author kimchy (shay.banon) + */ +public class RebalanceAfterActiveTests { + + private final ESLogger logger = Loggers.getLogger(RebalanceAfterActiveTests.class); + + @Test public void testRebalanceOnlyAfterAllShardsAreActive() { + + ShardsRoutingStrategy strategy = new ShardsRoutingStrategy(); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test").numberOfShards(5).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test").initializeEmpty(metaData.index("test"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + assertThat(routingTable.index("test").shards().size(), equalTo(5)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().get(0).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).shards().get(1).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).shards().get(0).currentNodeId(), nullValue()); + assertThat(routingTable.index("test").shard(i).shards().get(1).currentNodeId(), nullValue()); + } + + logger.info("start two nodes and fully start the shards"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build(); + RoutingTable prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); + } + + logger.info("start all the primary shards, replicas will start initializing"); + RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData()); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = routingTable.routingNodes(metaData); + + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + } + + logger.info("now, start 8 more nodes, and check that no rebalancing/relocation have happened"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node3")).put(newNode("node4")).put(newNode("node5")).put(newNode("node6")).put(newNode("node7")).put(newNode("node8")).put(newNode("node9")).put(newNode("node10"))) + .build(); + prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = routingTable.routingNodes(metaData); + + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + } + + logger.info("start the replica shards, rebalancing should start"); + routingNodes = routingTable.routingNodes(clusterState.metaData()); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = routingTable.routingNodes(metaData); + + // we only allow one relocation at a time + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5)); + assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(5)); + + logger.info("complete relocation, other half of relocation should happen"); + routingNodes = routingTable.routingNodes(clusterState.metaData()); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = routingTable.routingNodes(metaData); + + // we now only relocate 3, since 2 remain where they are! + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(7)); + assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(3)); + + logger.info("complete relocation, thats it!"); + routingNodes = routingTable.routingNodes(clusterState.metaData()); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = routingTable.routingNodes(metaData); + + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(10)); + // make sure we have an even relocation + for (RoutingNode routingNode : routingNodes) { + assertThat(routingNode.shards().size(), equalTo(1)); + } + } + + private DiscoveryNode newNode(String nodeId) { + return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE); + } +}