Merge pull request #18483 from jimferenczi/fix_only_nodes

Fix _only_nodes preferences
This commit is contained in:
Jim Ferenczi 2016-05-23 10:03:44 +02:00
commit 51d3fbf296
4 changed files with 104 additions and 15 deletions

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -331,15 +332,13 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
public ShardIterator onlyNodeActiveInitializingShardsIt(String nodeId) {
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
// fill it in a randomized fashion
for (int i = 0; i < activeShards.size(); i++) {
ShardRouting shardRouting = activeShards.get(i);
int seed = shuffler.nextSeed();
for (ShardRouting shardRouting : shuffler.shuffle(activeShards, seed)) {
if (nodeId.equals(shardRouting.currentNodeId())) {
ordered.add(shardRouting);
}
}
for (int i = 0; i < allInitializingShards.size(); i++) {
ShardRouting shardRouting = allInitializingShards.get(i);
for (ShardRouting shardRouting : shuffler.shuffle(allInitializingShards, seed)) {
if (nodeId.equals(shardRouting.currentNodeId())) {
ordered.add(shardRouting);
}
@ -347,26 +346,31 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return new PlainShardIterator(shardId, ordered);
}
public ShardIterator onlyNodeSelectorActiveInitializingShardsIt(String nodeAttributes, DiscoveryNodes discoveryNodes) {
return onlyNodeSelectorActiveInitializingShardsIt(new String[] {nodeAttributes}, discoveryNodes);
}
/**
* Returns shards based on nodeAttributes given such as node name , node attribute, node IP
* Supports node specifications in cluster API
*/
public ShardIterator onlyNodeSelectorActiveInitializingShardsIt(String nodeAttribute, DiscoveryNodes discoveryNodes) {
public ShardIterator onlyNodeSelectorActiveInitializingShardsIt(String[] nodeAttributes, DiscoveryNodes discoveryNodes) {
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
Set<String> selectedNodes = Sets.newHashSet(discoveryNodes.resolveNodesIds(nodeAttribute));
for (ShardRouting shardRouting : activeShards) {
Set<String> selectedNodes = Sets.newHashSet(discoveryNodes.resolveNodesIds(nodeAttributes));
int seed = shuffler.nextSeed();
for (ShardRouting shardRouting : shuffler.shuffle(activeShards, seed)) {
if (selectedNodes.contains(shardRouting.currentNodeId())) {
ordered.add(shardRouting);
}
}
for (ShardRouting shardRouting : allInitializingShards) {
for (ShardRouting shardRouting : shuffler.shuffle(allInitializingShards, seed)) {
if (selectedNodes.contains(shardRouting.currentNodeId())) {
ordered.add(shardRouting);
}
}
if (ordered.isEmpty()) {
throw new IllegalArgumentException("No data node with criteria [" + nodeAttribute + "] found");
throw new IllegalArgumentException("no data nodes with critera(s) " +
Strings.arrayToCommaDelimitedString(nodeAttributes) + "] found for shard:" + shardId());
}
return new PlainShardIterator(shardId, ordered);
}

View File

@ -177,8 +177,8 @@ public class OperationRouting extends AbstractComponent {
ensureNodeIdExists(nodes, nodeId);
return indexShard.onlyNodeActiveInitializingShardsIt(nodeId);
case ONLY_NODES:
String nodeAttribute = preference.substring(Preference.ONLY_NODES.type().length() + 1);
return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttribute, nodes);
String nodeAttributes = preference.substring(Preference.ONLY_NODES.type().length() + 1);
return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttributes.split(","), nodes);
default:
throw new IllegalArgumentException("unknown preference [" + preferenceType + "]");
}

View File

@ -320,6 +320,24 @@ public class RoutingIteratorTests extends ESAllocationTestCase {
assertThat(shardsIterator.size(), equalTo(1));
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node2"));
shardsIterator = clusterState.routingTable().index("test").shard(0)
.onlyNodeSelectorActiveInitializingShardsIt(new String[] {"disk:eph*","disk:ebs"},clusterState.nodes());
assertThat(shardsIterator.size(), equalTo(2));
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node2"));
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node1"));
shardsIterator = clusterState.routingTable().index("test").shard(0)
.onlyNodeSelectorActiveInitializingShardsIt(new String[] {"disk:*", "invalid_name"},clusterState.nodes());
assertThat(shardsIterator.size(), equalTo(2));
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node2"));
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node1"));
shardsIterator = clusterState.routingTable().index("test").shard(0)
.onlyNodeSelectorActiveInitializingShardsIt(new String[] {"disk:*", "disk:*"},clusterState.nodes());
assertThat(shardsIterator.size(), equalTo(2));
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node2"));
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node1"));
try {
shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("welma", clusterState.nodes());
fail("should have raised illegalArgumentException");

View File

@ -19,22 +19,30 @@
package org.elasticsearch.search.preference;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ESIntegTestCase.ClusterScope(minNumDataNodes = 2)
public class SearchPreferenceIT extends ESIntegTestCase {
@ -155,4 +163,63 @@ public class SearchPreferenceIT extends ESIntegTestCase {
assertThat(e.getMessage(), is("No data node with id[DOES-NOT-EXIST] found"));
}
}
public void testNodesOnlyRandom() throws Exception {
assertAcked(prepareCreate("test").setSettings(
//this test needs at least a replica to make sure two consecutive searches go to two different copies of the same data
Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_REPLICAS, between(1, maximumNumberOfReplicas()))));
ensureGreen();
client().prepareIndex("test", "type1").setSource("field1", "value1").execute().actionGet();
refresh();
final Client client = internalCluster().smartClient();
SearchRequestBuilder request = client.prepareSearch("test")
.setQuery(matchAllQuery()).setPreference("_only_nodes:*,nodes*"); // multiple wildchar to cover multi-param usecase
assertSearchOnRandomNodes(request);
request = client.prepareSearch("test")
.setQuery(matchAllQuery()).setPreference("_only_nodes:*");
assertSearchOnRandomNodes(request);
ArrayList<String> allNodeIds = new ArrayList<>();
ArrayList<String> allNodeNames = new ArrayList<>();
ArrayList<String> allNodeHosts = new ArrayList<>();
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet();
for (NodeStats node : nodeStats.getNodes()) {
allNodeIds.add(node.getNode().getId());
allNodeNames.add(node.getNode().getName());
allNodeHosts.add(node.getHostname());
}
String node_expr = "_only_nodes:" + Strings.arrayToCommaDelimitedString(allNodeIds.toArray());
request = client.prepareSearch("test").setQuery(matchAllQuery()).setPreference(node_expr);
assertSearchOnRandomNodes(request);
node_expr = "_only_nodes:" + Strings.arrayToCommaDelimitedString(allNodeNames.toArray());
request = client.prepareSearch("test").setQuery(matchAllQuery()).setPreference(node_expr);
assertSearchOnRandomNodes(request);
node_expr = "_only_nodes:" + Strings.arrayToCommaDelimitedString(allNodeHosts.toArray());
request = client.prepareSearch("test").setQuery(matchAllQuery()).setPreference(node_expr);
assertSearchOnRandomNodes(request);
node_expr = "_only_nodes:" + Strings.arrayToCommaDelimitedString(allNodeHosts.toArray());
request = client.prepareSearch("test").setQuery(matchAllQuery()).setPreference(node_expr);
assertSearchOnRandomNodes(request);
// Mix of valid and invalid nodes
node_expr = "_only_nodes:*,invalidnode";
request = client.prepareSearch("test").setQuery(matchAllQuery()).setPreference(node_expr);
assertSearchOnRandomNodes(request);
}
private void assertSearchOnRandomNodes(SearchRequestBuilder request) {
Set<String> hitNodes = new HashSet<>();
for (int i = 0; i < 2; i++) {
SearchResponse searchResponse = request.execute().actionGet();
assertThat(searchResponse.getHits().getHits().length, greaterThan(0));
hitNodes.add(searchResponse.getHits().getAt(0).shard().nodeId());
}
assertThat(hitNodes.size(), greaterThan(1));
}
}