Avoid placing a shard replica on the same machine as shard itself, closes #1680.

This commit is contained in:
Shay Banon 2012-02-08 15:39:01 +02:00
parent 23ca0d2cb2
commit 457f0a4266
7 changed files with 132 additions and 2 deletions

View File

@ -33,9 +33,15 @@ import java.util.List;
*/
public class SameShardAllocationDecider extends AllocationDecider {
public static final String SAME_HOST_SETTING = "cluster.routing.allocation.same_shard.host";
private final boolean sameHost;
@Inject
public SameShardAllocationDecider(Settings settings) {
super(settings);
this.sameHost = settings.getAsBoolean(SAME_HOST_SETTING, false);
}
@Override
@ -47,6 +53,25 @@ public class SameShardAllocationDecider extends AllocationDecider {
return Decision.NO;
}
}
if (sameHost) {
if (node.node() != null) {
for (RoutingNode checkNode : allocation.routingNodes()) {
if (checkNode.node() == null) {
continue;
}
// check if its on the same host as the one we want to allocate to
if (!checkNode.node().address().sameHost(node.node().address())) {
continue;
}
shards = checkNode.shards();
for (int i = 0; i < shards.size(); i++) {
if (shards.get(i).shardId().equals(shardRouting.shardId())) {
return Decision.NO;
}
}
}
}
}
return Decision.YES;
}
}

View File

@ -44,6 +44,11 @@ public class DummyTransportAddress implements TransportAddress {
return false;
}
@Override
public boolean sameHost(TransportAddress otherAddress) {
return false;
}
@Override
public void readFrom(StreamInput in) throws IOException {
}

View File

@ -30,8 +30,6 @@ import java.net.InetSocketAddress;
/**
* A transport address used for IP socket address (wraps {@link java.net.InetSocketAddress}).
*
*
*/
public class InetSocketTransportAddress implements TransportAddress {
@ -80,6 +78,20 @@ public class InetSocketTransportAddress implements TransportAddress {
return false;
}
@Override
public boolean sameHost(TransportAddress other) {
if (!(other instanceof InetSocketTransportAddress)) {
return false;
}
InetSocketTransportAddress otherAddr = (InetSocketTransportAddress) other;
if (address.isUnresolved() || otherAddr.address().isUnresolved()) {
String hostName = address.getHostName();
String otherHostName = otherAddr.address().getHostName();
return !(hostName == null || otherHostName == null) && hostName.equals(otherHostName);
}
return otherAddr.address().getAddress().equals(address.getAddress());
}
public InetSocketAddress address() {
return this.address;
}

View File

@ -52,6 +52,11 @@ public class LocalTransportAddress implements TransportAddress {
return id.equals(otherAddress);
}
@Override
public boolean sameHost(TransportAddress other) {
return true;
}
@Override
public void readFrom(StreamInput in) throws IOException {
id = in.readUTF();

View File

@ -31,4 +31,6 @@ public interface TransportAddress extends Streamable, Serializable {
short uniqueAddressTypeId();
boolean match(String otherAddress);
boolean sameHost(TransportAddress other);
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.test.unit.cluster.routing.allocation;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import java.util.Map;
@ -30,6 +31,10 @@ public class RoutingAllocationTests {
return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE);
}
public static DiscoveryNode newNode(String nodeId, TransportAddress address) {
return new DiscoveryNode(nodeId, address);
}
public static DiscoveryNode newNode(String nodeId, Map<String, String> attributes) {
return new DiscoveryNode("", nodeId, DummyTransportAddress.INSTANCE, attributes);
}

View File

@ -0,0 +1,76 @@
package org.elasticsearch.test.unit.cluster.routing.allocation;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.testng.annotations.Test;
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
import static org.elasticsearch.cluster.metadata.IndexMetaData.newIndexMetaDataBuilder;
import static org.elasticsearch.cluster.metadata.MetaData.newMetaDataBuilder;
import static org.elasticsearch.cluster.node.DiscoveryNodes.newNodesBuilder;
import static org.elasticsearch.cluster.routing.RoutingBuilders.indexRoutingTable;
import static org.elasticsearch.cluster.routing.RoutingBuilders.routingTable;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
/**
*/
@Test
public class SameShardRoutingTests {
private final ESLogger logger = Loggers.getLogger(SameShardRoutingTests.class);
@Test
public void sameHost() {
AllocationService strategy = new AllocationService(settingsBuilder().put(SameShardAllocationDecider.SAME_HOST_SETTING, true).build());
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(2).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
logger.info("--> adding two nodes with the same host");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
.put(newNode("node1", new InetSocketTransportAddress("test1", 80)))
.put(newNode("node2", new InetSocketTransportAddress("test1", 80)))).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(clusterState.readOnlyRoutingNodes().numberOfShardsOfType(ShardRoutingState.INITIALIZING), equalTo(2));
logger.info("--> start all primary shards, no replica will be started since its on the same host");
routingTable = strategy.applyStartedShards(clusterState, clusterState.readOnlyRoutingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(clusterState.readOnlyRoutingNodes().numberOfShardsOfType(ShardRoutingState.STARTED), equalTo(2));
assertThat(clusterState.readOnlyRoutingNodes().numberOfShardsOfType(ShardRoutingState.INITIALIZING), equalTo(0));
logger.info("--> add another node, with a different host, replicas will be allocating");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
.putAll(clusterState.nodes())
.put(newNode("node3", new InetSocketTransportAddress("test2", 80)))).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(clusterState.readOnlyRoutingNodes().numberOfShardsOfType(ShardRoutingState.STARTED), equalTo(2));
assertThat(clusterState.readOnlyRoutingNodes().numberOfShardsOfType(ShardRoutingState.INITIALIZING), equalTo(2));
for (MutableShardRouting shardRouting : clusterState.readOnlyRoutingNodes().shardsWithState(INITIALIZING)) {
assertThat(shardRouting.currentNodeId(), equalTo("node3"));
}
}
}