automatic preference when using awareness attributes

This commit is contained in:
Shay Banon 2011-09-21 23:09:44 +03:00
parent 3028d5a7a1
commit 44efcca108
5 changed files with 173 additions and 10 deletions

View File

@ -19,7 +19,10 @@
package org.elasticsearch.cluster.routing;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.UnmodifiableIterator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -28,7 +31,10 @@ import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.collect.Lists.*;
@ -264,6 +270,86 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return new PlainShardIterator(shardId, ordered);
}
static class AttributesKey {
final String[] attributes;
AttributesKey(String[] attributes) {
this.attributes = attributes;
}
@Override public int hashCode() {
return Arrays.hashCode(attributes);
}
@Override public boolean equals(Object obj) {
return Arrays.equals(attributes, ((AttributesKey) obj).attributes);
}
}
static class AttributesRoutings {
public final ImmutableList<ShardRouting> withSameAttribute;
public final ImmutableList<ShardRouting> withoutSameAttribute;
public final int totalSize;
AttributesRoutings(ImmutableList<ShardRouting> withSameAttribute, ImmutableList<ShardRouting> withoutSameAttribute) {
this.withSameAttribute = withSameAttribute;
this.withoutSameAttribute = withoutSameAttribute;
this.totalSize = withoutSameAttribute.size() + withSameAttribute.size();
}
}
private volatile Map<AttributesKey, AttributesRoutings> activeShardsByAttributes = ImmutableMap.of();
private final Object shardsByAttributeMutex = new Object();
public ShardIterator preferAttributesActiveShardsIt(String[] attributes, DiscoveryNodes nodes) {
return preferAttributesActiveShardsIt(attributes, nodes, counter.incrementAndGet());
}
public ShardIterator preferAttributesActiveShardsIt(String[] attributes, DiscoveryNodes nodes, int index) {
AttributesKey key = new AttributesKey(attributes);
AttributesRoutings shardRoutings = activeShardsByAttributes.get(key);
if (shardRoutings == null) {
synchronized (shardsByAttributeMutex) {
ArrayList<ShardRouting> from = new ArrayList<ShardRouting>(activeShards);
ArrayList<ShardRouting> to = new ArrayList<ShardRouting>();
for (String attribute : attributes) {
String localAttributeValue = nodes.localNode().attributes().get(attribute);
if (localAttributeValue == null) {
continue;
}
for (Iterator<ShardRouting> iterator = from.iterator(); iterator.hasNext(); ) {
ShardRouting fromShard = iterator.next();
if (localAttributeValue.equals(nodes.get(fromShard.currentNodeId()).attributes().get(attribute))) {
iterator.remove();
to.add(fromShard);
}
}
}
shardRoutings = new AttributesRoutings(ImmutableList.copyOf(to), ImmutableList.copyOf(from));
activeShardsByAttributes = MapBuilder.newMapBuilder(activeShardsByAttributes).put(key, shardRoutings).immutableMap();
}
}
// we now randomize, once between the ones that have the same attributes, and once for the ones that don't
// we don't want to mix between the two!
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(shardRoutings.totalSize);
index = Math.abs(index);
for (int i = 0; i < shardRoutings.withSameAttribute.size(); i++) {
int loc = (index + i) % shardRoutings.withSameAttribute.size();
ShardRouting shardRouting = shardRoutings.withSameAttribute.get(loc);
ordered.add(shardRouting);
}
for (int i = 0; i < shardRoutings.withoutSameAttribute.size(); i++) {
int loc = (index + i) % shardRoutings.withoutSameAttribute.size();
ShardRouting shardRouting = shardRoutings.withoutSameAttribute.get(loc);
ordered.add(shardRouting);
}
return new PlainShardIterator(shardId, ordered);
}
public ShardRouting primaryShard() {
return primary;
}

View File

@ -50,6 +50,10 @@ public class AwarenessAllocationDecider extends AllocationDecider {
}
}
public String[] awarenessAttributes() {
return this.awarenessAttributes;
}
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, true) ? Decision.YES : Decision.NO;
}

View File

@ -22,10 +22,12 @@ package org.elasticsearch.cluster.routing.operation.plain;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.cluster.routing.operation.hash.HashFunction;
import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction;
@ -52,10 +54,13 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
private final boolean useType;
@Inject public PlainOperationRouting(Settings indexSettings, HashFunction hashFunction) {
private final AwarenessAllocationDecider awarenessAllocationDecider;
@Inject public PlainOperationRouting(Settings indexSettings, HashFunction hashFunction, AwarenessAllocationDecider awarenessAllocationDecider) {
super(indexSettings);
this.hashFunction = hashFunction;
this.useType = indexSettings.getAsBoolean("cluster.routing.operation.use_type", false);
this.awarenessAllocationDecider = awarenessAllocationDecider;
}
@Override public ShardIterator indexShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException {
@ -67,11 +72,11 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
}
@Override public ShardIterator getShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing, @Nullable String preference) throws IndexMissingException, IndexShardMissingException {
return preferenceActiveShardIterator(shards(clusterState, index, type, id, routing), clusterState.nodes().localNodeId(), preference);
return preferenceActiveShardIterator(shards(clusterState, index, type, id, routing), clusterState.nodes().localNodeId(), clusterState.nodes(), preference);
}
@Override public ShardIterator getShards(ClusterState clusterState, String index, int shardId, @Nullable String preference) throws IndexMissingException, IndexShardMissingException {
return preferenceActiveShardIterator(shards(clusterState, index, shardId), clusterState.nodes().localNodeId(), preference);
return preferenceActiveShardIterator(shards(clusterState, index, shardId), clusterState.nodes().localNodeId(), clusterState.nodes(), preference);
}
@Override public GroupShardsIterator broadcastDeleteShards(ClusterState clusterState, String index) throws IndexMissingException {
@ -149,7 +154,7 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
throw new IndexShardMissingException(new ShardId(index, shardId));
}
// we might get duplicates, but that's ok, they will override one another
set.add(preferenceActiveShardIterator(indexShard, clusterState.nodes().localNodeId(), preference));
set.add(preferenceActiveShardIterator(indexShard, clusterState.nodes().localNodeId(), clusterState.nodes(), preference));
}
}
}
@ -160,25 +165,35 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
for (String index : concreteIndices) {
IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
for (IndexShardRoutingTable indexShard : indexRouting) {
set.add(preferenceActiveShardIterator(indexShard, clusterState.nodes().localNodeId(), preference));
set.add(preferenceActiveShardIterator(indexShard, clusterState.nodes().localNodeId(), clusterState.nodes(), preference));
}
}
return new GroupShardsIterator(set);
}
}
private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String nodeId, @Nullable String preference) {
private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String nodeId, DiscoveryNodes nodes, @Nullable String preference) {
if (preference == null) {
return indexShard.activeShardsRandomIt();
String[] awarenessAttributes = awarenessAllocationDecider.awarenessAttributes();
if (awarenessAttributes.length == 0) {
return indexShard.activeShardsRandomIt();
} else {
return indexShard.preferAttributesActiveShardsIt(awarenessAttributes, nodes);
}
}
if ("_local".equals(preference)) {
return indexShard.preferNodeShardsIt(nodeId);
return indexShard.preferNodeActiveShardsIt(nodeId);
}
if ("_primary".equals(preference)) {
return indexShard.primaryShardIt();
}
// if not, then use it as the index
return indexShard.shardsIt(DjbHashFunction.DJB_HASH(preference));
String[] awarenessAttributes = awarenessAllocationDecider.awarenessAttributes();
if (awarenessAttributes.length == 0) {
return indexShard.activeShardsIt(DjbHashFunction.DJB_HASH(preference));
} else {
return indexShard.preferAttributesActiveShardsIt(awarenessAttributes, nodes, DjbHashFunction.DJB_HASH(preference));
}
}
public IndexMetaData indexMetaData(ClusterState clusterState, String index) {

View File

@ -19,18 +19,26 @@
package org.elasticsearch.cluster.structure;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.index.shard.ShardId;
import org.testng.annotations.Test;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
import static org.elasticsearch.cluster.routing.RoutingBuilders.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.cluster.routing.allocation.RoutingAllocationTests.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
@ -209,4 +217,53 @@ public class RoutingIteratorTests {
assertThat(shardRouting1, not(sameInstance(shardRouting2)));
assertThat(shardRouting1, sameInstance(shardRouting3));
}
@Test public void testAttributePreferenceRouting() {
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.awareness.attributes", "rack_id,zone")
.build());
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
.put(newNode("node1", ImmutableMap.of("rack_id", "rack_1", "zone", "zone1")))
.put(newNode("node2", ImmutableMap.of("rack_id", "rack_2", "zone", "zone2")))
.localNodeId("node1")
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
// after all are started, check routing iteration
ShardIterator shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveShardsIt(new String[]{"rack_id"}, clusterState.nodes());
ShardRouting shardRouting = shardIterator.nextOrNull();
assertThat(shardRouting, notNullValue());
assertThat(shardRouting.currentNodeId(), equalTo("node1"));
shardRouting = shardIterator.nextOrNull();
assertThat(shardRouting, notNullValue());
assertThat(shardRouting.currentNodeId(), equalTo("node2"));
shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveShardsIt(new String[]{"rack_id"}, clusterState.nodes());
shardRouting = shardIterator.nextOrNull();
assertThat(shardRouting, notNullValue());
assertThat(shardRouting.currentNodeId(), equalTo("node1"));
shardRouting = shardIterator.nextOrNull();
assertThat(shardRouting, notNullValue());
assertThat(shardRouting.currentNodeId(), equalTo("node2"));
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.cluster.routing.operation.plain.PlainOperationRouting;
import org.elasticsearch.common.collect.ImmutableMap;
@ -399,7 +400,7 @@ public class ThreeShardsUnbalancedShardsEmbeddedSearchTests extends AbstractNode
public static class UnevenOperationRoutingStrategy extends PlainOperationRouting {
@Inject public UnevenOperationRoutingStrategy(Settings settings) {
super(settings, null);
super(settings, null, new AwarenessAllocationDecider(Builder.EMPTY_SETTINGS));
}
@Override protected int hash(String routing) {