Merge pull request #11464 from nirmalc/nodes-preference

Search `preference` based on node specification
This commit is contained in:
Simon Willnauer 2015-06-17 12:33:51 +02:00
commit 0434ecfb03
6 changed files with 104 additions and 1 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -330,6 +331,33 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return new PlainShardIterator(shardId, ordered);
}
/**
* Returns shards based on nodeAttributes given such as node name , node attribute, node IP
* Supports node specifications in cluster API
*
* @param nodeAttribute
* @param discoveryNodes
*/
public ShardIterator onlyNodeSelectorActiveInitializingShardsIt(String nodeAttribute, DiscoveryNodes discoveryNodes) {
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
Set<String> selectedNodes = Sets.newHashSet(discoveryNodes.resolveNodesIds(nodeAttribute));
for (ShardRouting shardRouting : activeShards) {
if (selectedNodes.contains(shardRouting.currentNodeId())) {
ordered.add(shardRouting);
}
}
for (ShardRouting shardRouting : allInitializingShards) {
if (selectedNodes.contains(shardRouting.currentNodeId())) {
ordered.add(shardRouting);
}
}
if (ordered.isEmpty()) {
throw new IllegalArgumentException("No data node with critera [" + nodeAttribute + "] found");
}
return new PlainShardIterator(shardId, ordered);
}
public ShardIterator preferNodeActiveInitializingShardsIt(String nodeId) {
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
// fill it in a randomized fashion

View File

@ -183,6 +183,9 @@ public class OperationRouting extends AbstractComponent {
String nodeId = preference.substring(Preference.ONLY_NODE.type().length() + 1);
ensureNodeIdExists(nodes, nodeId);
return indexShard.onlyNodeActiveInitializingShardsIt(nodeId);
case ONLY_NODES:
String nodeAttribute = preference.substring(Preference.ONLY_NODES.type().length() + 1);
return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttribute, nodes);
default:
throw new IllegalArgumentException("unknown preference [" + preferenceType + "]");
}

View File

@ -57,7 +57,12 @@ public enum Preference {
/**
* Route to specific node only
*/
ONLY_NODE("_only_node");
ONLY_NODE("_only_node"),
/**
* Route to only node with attribute
*/
ONLY_NODES("_only_nodes");
private final String type;
@ -97,6 +102,8 @@ public enum Preference {
case "_only_local":
case "_onlyLocal":
return ONLY_LOCAL;
case "_only_nodes":
return ONLY_NODES;
default:
throw new IllegalArgumentException("no Preference for [" + preferenceType + "]");
}

View File

@ -262,6 +262,63 @@ public class RoutingIteratorTests extends ElasticsearchAllocationTestCase {
assertThat(shardRouting.currentNodeId(), equalTo("node2"));
}
@Test
public void testNodeSelectorRouting(){
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.build());
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("fred","node1", ImmutableMap.of("disk", "ebs")))
.put(newNode("barney","node2", ImmutableMap.of("disk", "ephemeral")))
.localNodeId("node1")
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
ShardsIterator shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("disk:ebs",clusterState.nodes());
assertThat(shardsIterator.size(), equalTo(1));
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node1"));
shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("dis*:eph*",clusterState.nodes());
assertThat(shardsIterator.size(), equalTo(1));
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node2"));
shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("fred",clusterState.nodes());
assertThat(shardsIterator.size(), equalTo(1));
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node1"));
shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("bar*",clusterState.nodes());
assertThat(shardsIterator.size(), equalTo(1));
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node2"));
try {
shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("welma", clusterState.nodes());
fail("shouldve raised illegalArgumentException");
} catch (IllegalArgumentException illegal) {
//expected exception
}
shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("fred",clusterState.nodes());
assertThat(shardsIterator.size(), equalTo(1));
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node1"));
}
@Test
public void testShardsAndPreferNodeRouting() {

View File

@ -105,6 +105,10 @@ public abstract class ElasticsearchAllocationTestCase extends ElasticsearchTestC
return new DiscoveryNode("", nodeId, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT);
}
public static DiscoveryNode newNode(String nodeName,String nodeId, Map<String, String> attributes) {
return new DiscoveryNode(nodeName, nodeId, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT);
}
public static DiscoveryNode newNode(String nodeId, Version version) {
return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE, version);
}

View File

@ -33,6 +33,10 @@ The `preference` is a query string parameter which can be set to:
and `3` in this case). This preference can be combined with other
preferences but it has to appear first: `_shards:2,3;_primary`
`_only_nodes`::
Restricts the operation to nodes specified in node specification
https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster.html
Custom (string) value::
A custom value will be used to guarantee that
the same shards will be used for the same custom value. This can help