From 72a9d34eb85a40535ac45b20d1a868576432c93f Mon Sep 17 00:00:00 2001 From: Nirmal Chidambaram Date: Mon, 1 Jun 2015 17:25:04 -0500 Subject: [PATCH] 5925 - Allow node specification in preference -Allow node selector api's with new preference ONLY_NODES ( selector apis like https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster.html) -Update documentation --- .../routing/IndexShardRoutingTable.java | 28 +++++++++ .../cluster/routing/OperationRouting.java | 3 + .../cluster/routing/Preference.java | 9 ++- .../structure/RoutingIteratorTests.java | 57 +++++++++++++++++++ .../test/ElasticsearchAllocationTestCase.java | 4 ++ .../search/request/preference.asciidoc | 4 ++ 6 files changed, 104 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index e76c8d924bc..3400827f8c0 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; import com.google.common.collect.UnmodifiableIterator; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -330,6 +331,33 @@ public class IndexShardRoutingTable implements Iterable { return new PlainShardIterator(shardId, ordered); } + /** + * Returns shards based on nodeAttributes given such as node name , node attribute, node IP + * Supports node specifications in cluster API + * + * @param nodeAttribute + * @param discoveryNodes + */ + public ShardIterator onlyNodeSelectorActiveInitializingShardsIt(String nodeAttribute, DiscoveryNodes discoveryNodes) { + ArrayList ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size()); + Set selectedNodes = Sets.newHashSet(discoveryNodes.resolveNodesIds(nodeAttribute)); + + for (ShardRouting shardRouting : activeShards) { + if (selectedNodes.contains(shardRouting.currentNodeId())) { + ordered.add(shardRouting); + } + } + for (ShardRouting shardRouting : allInitializingShards) { + if (selectedNodes.contains(shardRouting.currentNodeId())) { + ordered.add(shardRouting); + } + } + if (ordered.isEmpty()) { + throw new IllegalArgumentException("No data node with critera [" + nodeAttribute + "] found"); + } + return new PlainShardIterator(shardId, ordered); + } + public ShardIterator preferNodeActiveInitializingShardsIt(String nodeId) { ArrayList ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size()); // fill it in a randomized fashion diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index ef46b6e8875..0ea80d18a0d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -183,6 +183,9 @@ public class OperationRouting extends AbstractComponent { String nodeId = preference.substring(Preference.ONLY_NODE.type().length() + 1); ensureNodeIdExists(nodes, nodeId); return indexShard.onlyNodeActiveInitializingShardsIt(nodeId); + case ONLY_NODES: + String nodeAttribute = preference.substring(Preference.ONLY_NODES.type().length() + 1); + return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttribute, nodes); default: throw new IllegalArgumentException("unknown preference [" + preferenceType + "]"); } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/Preference.java b/core/src/main/java/org/elasticsearch/cluster/routing/Preference.java index e8842f0de3e..e9057bfe681 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/Preference.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/Preference.java @@ -57,7 +57,12 @@ public enum Preference { /** * Route to specific node only */ - ONLY_NODE("_only_node"); + ONLY_NODE("_only_node"), + + /** + * Route to only node with attribute + */ + ONLY_NODES("_only_nodes"); private final String type; @@ -97,6 +102,8 @@ public enum Preference { case "_only_local": case "_onlyLocal": return ONLY_LOCAL; + case "_only_nodes": + return ONLY_NODES; default: throw new IllegalArgumentException("no Preference for [" + preferenceType + "]"); } diff --git a/core/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java b/core/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java index 8bde8877d28..72ae1ffad18 100644 --- a/core/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java @@ -262,6 +262,63 @@ public class RoutingIteratorTests extends ElasticsearchAllocationTestCase { assertThat(shardRouting.currentNodeId(), equalTo("node2")); } + @Test + public void testNodeSelectorRouting(){ + AllocationService strategy = createAllocationService(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always") + .build()); + + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); + + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() + .put(newNode("fred","node1", ImmutableMap.of("disk", "ebs"))) + .put(newNode("barney","node2", ImmutableMap.of("disk", "ephemeral"))) + .localNodeId("node1") + ).build(); + + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + ShardsIterator shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("disk:ebs",clusterState.nodes()); + assertThat(shardsIterator.size(), equalTo(1)); + assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node1")); + + shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("dis*:eph*",clusterState.nodes()); + assertThat(shardsIterator.size(), equalTo(1)); + assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node2")); + + shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("fred",clusterState.nodes()); + assertThat(shardsIterator.size(), equalTo(1)); + assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node1")); + + shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("bar*",clusterState.nodes()); + assertThat(shardsIterator.size(), equalTo(1)); + assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node2")); + + try { + shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("welma", clusterState.nodes()); + fail("shouldve raised illegalArgumentException"); + } catch (IllegalArgumentException illegal) { + //expected exception + } + + shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("fred",clusterState.nodes()); + assertThat(shardsIterator.size(), equalTo(1)); + assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node1")); + } + @Test public void testShardsAndPreferNodeRouting() { diff --git a/core/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java b/core/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java index c932d8219f1..885b77a1f8e 100644 --- a/core/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java +++ b/core/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java @@ -105,6 +105,10 @@ public abstract class ElasticsearchAllocationTestCase extends ElasticsearchTestC return new DiscoveryNode("", nodeId, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT); } + public static DiscoveryNode newNode(String nodeName,String nodeId, Map attributes) { + return new DiscoveryNode(nodeName, nodeId, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT); + } + public static DiscoveryNode newNode(String nodeId, Version version) { return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE, version); } diff --git a/docs/reference/search/request/preference.asciidoc b/docs/reference/search/request/preference.asciidoc index 80904fb1aff..28ec3bd96b8 100644 --- a/docs/reference/search/request/preference.asciidoc +++ b/docs/reference/search/request/preference.asciidoc @@ -33,6 +33,10 @@ The `preference` is a query string parameter which can be set to: and `3` in this case). This preference can be combined with other preferences but it has to appear first: `_shards:2,3;_primary` +`_only_nodes`:: + Restricts the operation to nodes specified in node specification + https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster.html + Custom (string) value:: A custom value will be used to guarantee that the same shards will be used for the same custom value. This can help