Search Preference: Add _shards prefix to explicitly list shards, and add _prefer_node option, closes #1904

This commit is contained in:
Shay Banon 2012-05-03 01:12:22 +03:00
parent 8e6d3753cc
commit 07f3ed05b0
3 changed files with 122 additions and 7 deletions

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.settings.NodeSettingsService;
@ -71,6 +72,14 @@ public class AwarenessAllocationDecider extends AllocationDecider {
private Map<String, String[]> forcedAwarenessAttributes;
public AwarenessAllocationDecider() {
this(ImmutableSettings.Builder.EMPTY_SETTINGS);
}
public AwarenessAllocationDecider(Settings settings) {
this(settings, new NodeSettingsService(settings));
}
@Inject
public AwarenessAllocationDecider(Settings settings, NodeSettingsService nodeSettingsService) {
super(settings);

View File

@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.cluster.routing.operation.hash.HashFunction;
import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -163,7 +164,10 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
throw new IndexShardMissingException(new ShardId(index, shardId));
}
// we might get duplicates, but that's ok, they will override one another
set.add(preferenceActiveShardIterator(indexShard, clusterState.nodes().localNodeId(), clusterState.nodes(), preference));
ShardIterator iterator = preferenceActiveShardIterator(indexShard, clusterState.nodes().localNodeId(), clusterState.nodes(), preference);
if (iterator != null) {
set.add(iterator);
}
}
}
}
@ -174,7 +178,10 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
for (String index : concreteIndices) {
IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
for (IndexShardRoutingTable indexShard : indexRouting) {
set.add(preferenceActiveShardIterator(indexShard, clusterState.nodes().localNodeId(), clusterState.nodes(), preference));
ShardIterator iterator = preferenceActiveShardIterator(indexShard, clusterState.nodes().localNodeId(), clusterState.nodes(), preference);
if (iterator != null) {
set.add(iterator);
}
}
}
return new GroupShardsIterator(set);
@ -191,6 +198,42 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
}
}
if (preference.charAt(0) == '_') {
if (preference.startsWith("_shards:")) {
// starts with _shards, so execute on specific ones
int index = preference.indexOf(';');
String shards;
if (index == -1) {
shards = preference.substring("_shards:".length());
} else {
shards = preference.substring("_shards:".length(), index);
}
String[] ids = Strings.splitStringByCommaToArray(shards);
boolean found = false;
for (String id : ids) {
if (Integer.parseInt(id) == indexShard.shardId().id()) {
found = true;
break;
}
}
if (!found) {
return null;
}
// no more preference
if (index == -1 || index == preference.length() - 1) {
String[] awarenessAttributes = awarenessAllocationDecider.awarenessAttributes();
if (awarenessAttributes.length == 0) {
return indexShard.activeShardsRandomIt();
} else {
return indexShard.preferAttributesActiveShardsIt(awarenessAttributes, nodes);
}
} else {
// update the preference and continue
preference = preference.substring(index + 1);
}
}
if (preference.startsWith("_prefer_node:")) {
return indexShard.preferNodeActiveShardsIt(preference.substring("_prefer_node:".length()));
}
if ("_local".equals(preference)) {
return indexShard.preferNodeActiveShardsIt(localNodeId);
}

View File

@ -23,11 +23,12 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction;
import org.elasticsearch.cluster.routing.operation.plain.PlainOperationRouting;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests;
import org.testng.annotations.Test;
@ -39,7 +40,6 @@ import static org.elasticsearch.cluster.node.DiscoveryNodes.newNodesBuilder;
import static org.elasticsearch.cluster.routing.RoutingBuilders.indexRoutingTable;
import static org.elasticsearch.cluster.routing.RoutingBuilders.routingTable;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
@ -300,4 +300,67 @@ public class RoutingIteratorTests {
assertThat(shardRouting, notNullValue());
assertThat(shardRouting.currentNodeId(), equalTo("node2"));
}
@Test
public void testShardsAndPreferNodeRouting() {
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.build());
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(5).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
.put(RoutingAllocationTests.newNode("node1"))
.put(RoutingAllocationTests.newNode("node2"))
.localNodeId("node1")
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
PlainOperationRouting operationRouting = new PlainOperationRouting(ImmutableSettings.Builder.EMPTY_SETTINGS, new DjbHashFunction(), new AwarenessAllocationDecider());
GroupShardsIterator shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, new String[]{"test"}, null, null, "_shards:0");
assertThat(shardIterators.size(), equalTo(1));
assertThat(shardIterators.iterator().next().shardId().id(), equalTo(0));
shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, new String[]{"test"}, null, null, "_shards:1");
assertThat(shardIterators.size(), equalTo(1));
assertThat(shardIterators.iterator().next().shardId().id(), equalTo(1));
//check node preference, first without preference to see they switch
shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, new String[]{"test"}, null, null, "_shards:0;");
assertThat(shardIterators.size(), equalTo(1));
assertThat(shardIterators.iterator().next().shardId().id(), equalTo(0));
String firstRoundNodeId = shardIterators.iterator().next().nextOrNull().currentNodeId();
shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, new String[]{"test"}, null, null, "_shards:0");
assertThat(shardIterators.size(), equalTo(1));
assertThat(shardIterators.iterator().next().shardId().id(), equalTo(0));
assertThat(shardIterators.iterator().next().nextOrNull().currentNodeId(), not(equalTo(firstRoundNodeId)));
shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, new String[]{"test"}, null, null, "_shards:0;_prefer_node:node1");
assertThat(shardIterators.size(), equalTo(1));
assertThat(shardIterators.iterator().next().shardId().id(), equalTo(0));
assertThat(shardIterators.iterator().next().nextOrNull().currentNodeId(), equalTo("node1"));
shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, new String[]{"test"}, null, null, "_shards:0;_prefer_node:node1");
assertThat(shardIterators.size(), equalTo(1));
assertThat(shardIterators.iterator().next().shardId().id(), equalTo(0));
assertThat(shardIterators.iterator().next().nextOrNull().currentNodeId(), equalTo("node1"));
}
}