5925 - Allow node specification in preference

-Allow node selector api's with new preference
ONLY_NODES ( selector apis like https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster.html)

-Update documentation
This commit is contained in:
Nirmal Chidambaram 2015-06-01 17:25:04 -05:00
parent 0a526be344
commit 72a9d34eb8
6 changed files with 104 additions and 1 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator; import com.google.common.collect.UnmodifiableIterator;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -330,6 +331,33 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return new PlainShardIterator(shardId, ordered); return new PlainShardIterator(shardId, ordered);
} }
/**
* Returns shards based on nodeAttributes given such as node name , node attribute, node IP
* Supports node specifications in cluster API
*
* @param nodeAttribute
* @param discoveryNodes
*/
public ShardIterator onlyNodeSelectorActiveInitializingShardsIt(String nodeAttribute, DiscoveryNodes discoveryNodes) {
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
Set<String> selectedNodes = Sets.newHashSet(discoveryNodes.resolveNodesIds(nodeAttribute));
for (ShardRouting shardRouting : activeShards) {
if (selectedNodes.contains(shardRouting.currentNodeId())) {
ordered.add(shardRouting);
}
}
for (ShardRouting shardRouting : allInitializingShards) {
if (selectedNodes.contains(shardRouting.currentNodeId())) {
ordered.add(shardRouting);
}
}
if (ordered.isEmpty()) {
throw new IllegalArgumentException("No data node with critera [" + nodeAttribute + "] found");
}
return new PlainShardIterator(shardId, ordered);
}
public ShardIterator preferNodeActiveInitializingShardsIt(String nodeId) { public ShardIterator preferNodeActiveInitializingShardsIt(String nodeId) {
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size()); ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
// fill it in a randomized fashion // fill it in a randomized fashion

View File

@ -183,6 +183,9 @@ public class OperationRouting extends AbstractComponent {
String nodeId = preference.substring(Preference.ONLY_NODE.type().length() + 1); String nodeId = preference.substring(Preference.ONLY_NODE.type().length() + 1);
ensureNodeIdExists(nodes, nodeId); ensureNodeIdExists(nodes, nodeId);
return indexShard.onlyNodeActiveInitializingShardsIt(nodeId); return indexShard.onlyNodeActiveInitializingShardsIt(nodeId);
case ONLY_NODES:
String nodeAttribute = preference.substring(Preference.ONLY_NODES.type().length() + 1);
return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttribute, nodes);
default: default:
throw new IllegalArgumentException("unknown preference [" + preferenceType + "]"); throw new IllegalArgumentException("unknown preference [" + preferenceType + "]");
} }

View File

@ -57,7 +57,12 @@ public enum Preference {
/** /**
* Route to specific node only * Route to specific node only
*/ */
ONLY_NODE("_only_node"); ONLY_NODE("_only_node"),
/**
* Route to only node with attribute
*/
ONLY_NODES("_only_nodes");
private final String type; private final String type;
@ -97,6 +102,8 @@ public enum Preference {
case "_only_local": case "_only_local":
case "_onlyLocal": case "_onlyLocal":
return ONLY_LOCAL; return ONLY_LOCAL;
case "_only_nodes":
return ONLY_NODES;
default: default:
throw new IllegalArgumentException("no Preference for [" + preferenceType + "]"); throw new IllegalArgumentException("no Preference for [" + preferenceType + "]");
} }

View File

@ -262,6 +262,63 @@ public class RoutingIteratorTests extends ElasticsearchAllocationTestCase {
assertThat(shardRouting.currentNodeId(), equalTo("node2")); assertThat(shardRouting.currentNodeId(), equalTo("node2"));
} }
@Test
public void testNodeSelectorRouting(){
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.build());
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("fred","node1", ImmutableMap.of("disk", "ebs")))
.put(newNode("barney","node2", ImmutableMap.of("disk", "ephemeral")))
.localNodeId("node1")
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
ShardsIterator shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("disk:ebs",clusterState.nodes());
assertThat(shardsIterator.size(), equalTo(1));
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node1"));
shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("dis*:eph*",clusterState.nodes());
assertThat(shardsIterator.size(), equalTo(1));
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node2"));
shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("fred",clusterState.nodes());
assertThat(shardsIterator.size(), equalTo(1));
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node1"));
shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("bar*",clusterState.nodes());
assertThat(shardsIterator.size(), equalTo(1));
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node2"));
try {
shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("welma", clusterState.nodes());
fail("shouldve raised illegalArgumentException");
} catch (IllegalArgumentException illegal) {
//expected exception
}
shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("fred",clusterState.nodes());
assertThat(shardsIterator.size(), equalTo(1));
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node1"));
}
@Test @Test
public void testShardsAndPreferNodeRouting() { public void testShardsAndPreferNodeRouting() {

View File

@ -105,6 +105,10 @@ public abstract class ElasticsearchAllocationTestCase extends ElasticsearchTestC
return new DiscoveryNode("", nodeId, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT); return new DiscoveryNode("", nodeId, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT);
} }
public static DiscoveryNode newNode(String nodeName,String nodeId, Map<String, String> attributes) {
return new DiscoveryNode(nodeName, nodeId, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT);
}
public static DiscoveryNode newNode(String nodeId, Version version) { public static DiscoveryNode newNode(String nodeId, Version version) {
return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE, version); return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE, version);
} }

View File

@ -33,6 +33,10 @@ The `preference` is a query string parameter which can be set to:
and `3` in this case). This preference can be combined with other and `3` in this case). This preference can be combined with other
preferences but it has to appear first: `_shards:2,3;_primary` preferences but it has to appear first: `_shards:2,3;_primary`
`_only_nodes`::
Restricts the operation to nodes specified in node specification
https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster.html
Custom (string) value:: Custom (string) value::
A custom value will be used to guarantee that A custom value will be used to guarantee that
the same shards will be used for the same custom value. This can help the same shards will be used for the same custom value. This can help