Shards Allocation: Only rebalance a shard if all its instances are already active, closes #331.

This commit is contained in:
kimchy 2010-08-21 17:49:19 +03:00
parent 38e6649a7e
commit 3117341f44
8 changed files with 229 additions and 7 deletions

View File

@ -101,6 +101,7 @@
<w>queryparser</w> <w>queryparser</w>
<w>rackspace</w> <w>rackspace</w>
<w>rebalance</w> <w>rebalance</w>
<w>rebalancing</w>
<w>regex</w> <w>regex</w>
<w>reparse</w> <w>reparse</w>
<w>retrans</w> <w>retrans</w>

View File

@ -31,9 +31,12 @@ import org.elasticsearch.common.util.concurrent.Immutable;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import static org.elasticsearch.common.collect.Lists.*;
/** /**
* @author kimchy (Shay Banon) * @author kimchy (Shay Banon)
*/ */
@ -102,6 +105,14 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
return shards.get(shardId); return shards.get(shardId);
} }
public List<ShardRouting> shardsWithState(ShardRoutingState... states) {
List<ShardRouting> shards = newArrayList();
for (IndexShardRoutingTable shardRoutingTable : this) {
shards.addAll(shardRoutingTable.shardsWithState(states));
}
return shards;
}
/** /**
* A group shards iterator where each group ({@link ShardsIterator} * A group shards iterator where each group ({@link ShardsIterator}
* is an iterator across shard replication group. * is an iterator across shard replication group.

View File

@ -117,6 +117,18 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return replicaShards; return replicaShards;
} }
public List<ShardRouting> shardsWithState(ShardRoutingState... states) {
List<ShardRouting> shards = newArrayList();
for (ShardRouting shardEntry : this) {
for (ShardRoutingState state : states) {
if (shardEntry.state() == state) {
shards.add(shardEntry);
}
}
}
return shards;
}
int nextCounter() { int nextCounter() {
return counter.getAndIncrement(); return counter.getAndIncrement();
} }

View File

@ -126,7 +126,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
return nodesToShards.get(nodeId); return nodesToShards.get(nodeId);
} }
public MutableShardRouting findPrimaryForBackup(MutableShardRouting shard) { public MutableShardRouting findPrimaryForReplica(MutableShardRouting shard) {
assert !shard.primary(); assert !shard.primary();
for (RoutingNode routingNode : nodesToShards.values()) { for (RoutingNode routingNode : nodesToShards.values()) {
for (MutableShardRouting shardRouting : routingNode) { for (MutableShardRouting shardRouting : routingNode) {
@ -138,6 +138,27 @@ public class RoutingNodes implements Iterable<RoutingNode> {
return null; return null;
} }
public List<MutableShardRouting> shardsRoutingFor(ShardRouting shardRouting) {
return shardsRoutingFor(shardRouting.index(), shardRouting.id());
}
public List<MutableShardRouting> shardsRoutingFor(String index, int shardId) {
List<MutableShardRouting> shards = newArrayList();
for (RoutingNode routingNode : this) {
for (MutableShardRouting shardRouting : routingNode) {
if (shardRouting.index().equals(index) && shardRouting.id() == shardId) {
shards.add(shardRouting);
}
}
}
for (MutableShardRouting shardRouting : unassigned) {
if (shardRouting.index().equals(index) && shardRouting.id() == shardId) {
shards.add(shardRouting);
}
}
return shards;
}
public int numberOfShardsOfType(ShardRoutingState state) { public int numberOfShardsOfType(ShardRoutingState state) {
int count = 0; int count = 0;
for (RoutingNode routingNode : this) { for (RoutingNode routingNode : this) {

View File

@ -33,6 +33,7 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.common.collect.Lists.*;
import static org.elasticsearch.common.collect.Maps.*; import static org.elasticsearch.common.collect.Maps.*;
/** /**
@ -90,6 +91,14 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
return validation; return validation;
} }
public List<ShardRouting> shardsWithState(ShardRoutingState... states) {
List<ShardRouting> shards = newArrayList();
for (IndexRoutingTable indexRoutingTable : this) {
shards.addAll(indexRoutingTable.shardsWithState(states));
}
return shards;
}
/** /**
* All the shards (replicas) for the provided indices. * All the shards (replicas) for the provided indices.
* *

View File

@ -107,7 +107,7 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent
if (!shard.primary()) { if (!shard.primary()) {
// if its a backup, only allocate it if the primary is active // if its a backup, only allocate it if the primary is active
MutableShardRouting primary = routingNodes.findPrimaryForBackup(shard); MutableShardRouting primary = routingNodes.findPrimaryForReplica(shard);
if (primary == null || !primary.active()) { if (primary == null || !primary.active()) {
continue; continue;
} }
@ -209,7 +209,7 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent
// if its backup, see if there is a primary that *is* allocated, and try and assign a location that is closest to it // if its backup, see if there is a primary that *is* allocated, and try and assign a location that is closest to it
// note, since we replicate operations, this might not be the same (different flush intervals) // note, since we replicate operations, this might not be the same (different flush intervals)
if (!shard.primary()) { if (!shard.primary()) {
MutableShardRouting primaryShard = routingNodes.findPrimaryForBackup(shard); MutableShardRouting primaryShard = routingNodes.findPrimaryForReplica(shard);
if (primaryShard != null && primaryShard.active()) { if (primaryShard != null && primaryShard.active()) {
TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData primaryNodeStoreFileMetaData = nodesStoreFilesMetaData.nodesMap().get(primaryShard.currentNodeId()); TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData primaryNodeStoreFileMetaData = nodesStoreFilesMetaData.nodesMap().get(primaryShard.currentNodeId());
if (primaryNodeStoreFileMetaData != null && primaryNodeStoreFileMetaData.storeFilesMetaData() != null && primaryNodeStoreFileMetaData.storeFilesMetaData().allocated()) { if (primaryNodeStoreFileMetaData != null && primaryNodeStoreFileMetaData.storeFilesMetaData() != null && primaryNodeStoreFileMetaData.storeFilesMetaData().allocated()) {

View File

@ -158,6 +158,19 @@ public class ShardsRoutingStrategy extends AbstractComponent {
boolean relocated = false; boolean relocated = false;
List<MutableShardRouting> activeShards = highRoutingNode.shardsWithState(STARTED); List<MutableShardRouting> activeShards = highRoutingNode.shardsWithState(STARTED);
for (MutableShardRouting activeShard : activeShards) { for (MutableShardRouting activeShard : activeShards) {
// we only relocate shards that all other shards within the replication group are active
List<MutableShardRouting> allShards = routingNodes.shardsRoutingFor(activeShard);
boolean ignoreShard = false;
for (MutableShardRouting allShard : allShards) {
if (!allShard.active()) {
ignoreShard = true;
break;
}
}
if (ignoreShard) {
continue;
}
if (lowRoutingNode.canAllocate(routingNodes.metaData(), routingNodes.routingTable()) && lowRoutingNode.canAllocate(activeShard)) { if (lowRoutingNode.canAllocate(routingNodes.metaData(), routingNodes.routingTable()) && lowRoutingNode.canAllocate(activeShard)) {
changed = true; changed = true;
lowRoutingNode.add(new MutableShardRouting(activeShard.index(), activeShard.id(), lowRoutingNode.add(new MutableShardRouting(activeShard.index(), activeShard.id(),
@ -217,13 +230,15 @@ public class ShardsRoutingStrategy extends AbstractComponent {
int lastNode = 0; int lastNode = 0;
while (unassignedIterator.hasNext()) { while (unassignedIterator.hasNext()) {
MutableShardRouting shard = unassignedIterator.next(); MutableShardRouting shard = unassignedIterator.next();
// if its a replica, only allocate it if the primary is active
if (!shard.primary()) { if (!shard.primary()) {
// if its a backup, only allocate it if the primary is active MutableShardRouting primary = routingNodes.findPrimaryForReplica(shard);
MutableShardRouting primary = routingNodes.findPrimaryForBackup(shard);
if (primary == null || !primary.active()) { if (primary == null || !primary.active()) {
continue; continue;
} }
} }
// do the allocation, finding the least "busy" node
for (int i = 0; i < nodes.size(); i++) { for (int i = 0; i < nodes.size(); i++) {
RoutingNode node = nodes.get(lastNode); RoutingNode node = nodes.get(lastNode);
lastNode++; lastNode++;
@ -247,9 +262,9 @@ public class ShardsRoutingStrategy extends AbstractComponent {
// allocate all the unassigned shards above the average per node. // allocate all the unassigned shards above the average per node.
for (Iterator<MutableShardRouting> it = routingNodes.unassigned().iterator(); it.hasNext();) { for (Iterator<MutableShardRouting> it = routingNodes.unassigned().iterator(); it.hasNext();) {
MutableShardRouting shard = it.next(); MutableShardRouting shard = it.next();
// if its a backup, only allocate it if the primary is active
if (!shard.primary()) { if (!shard.primary()) {
// if its a backup, only allocate it if the primary is active MutableShardRouting primary = routingNodes.findPrimaryForReplica(shard);
MutableShardRouting primary = routingNodes.findPrimaryForBackup(shard);
if (primary == null || !primary.active()) { if (primary == null || !primary.active()) {
continue; continue;
} }

View File

@ -0,0 +1,153 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.strategy;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.testng.annotations.Test;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
import static org.elasticsearch.cluster.routing.RoutingBuilders.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class RebalanceAfterActiveTests {
private final ESLogger logger = Loggers.getLogger(RebalanceAfterActiveTests.class);
@Test public void testRebalanceOnlyAfterAllShardsAreActive() {
ShardsRoutingStrategy strategy = new ShardsRoutingStrategy();
logger.info("Building initial routing table");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(5).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
assertThat(routingTable.index("test").shards().size(), equalTo(5));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).shards().get(1).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).shards().get(0).currentNodeId(), nullValue());
assertThat(routingTable.index("test").shard(i).shards().get(1).currentNodeId(), nullValue());
}
logger.info("start two nodes and fully start the shards");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
}
logger.info("start all the primary shards, replicas will start initializing");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
}
logger.info("now, start 8 more nodes, and check that no rebalancing/relocation have happened");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes())
.put(newNode("node3")).put(newNode("node4")).put(newNode("node5")).put(newNode("node6")).put(newNode("node7")).put(newNode("node8")).put(newNode("node9")).put(newNode("node10")))
.build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
}
logger.info("start the replica shards, rebalancing should start");
routingNodes = routingTable.routingNodes(clusterState.metaData());
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
// we only allow one relocation at a time
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5));
assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(5));
logger.info("complete relocation, other half of relocation should happen");
routingNodes = routingTable.routingNodes(clusterState.metaData());
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
// we now only relocate 3, since 2 remain where they are!
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(7));
assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(3));
logger.info("complete relocation, thats it!");
routingNodes = routingTable.routingNodes(clusterState.metaData());
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(10));
// make sure we have an even relocation
for (RoutingNode routingNode : routingNodes) {
assertThat(routingNode.shards().size(), equalTo(1));
}
}
private DiscoveryNode newNode(String nodeId) {
return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE);
}
}