Fixes for _only_nodes preferences:
* Handle multiple attributes/name (coma separated). * Shuffle the nodes that match the preferences. Fix #12546 Fix #12700
This commit is contained in:
parent
cb2cfdd9c0
commit
238d390637
|
@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.Randomness;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -331,15 +332,13 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
|
||||
public ShardIterator onlyNodeActiveInitializingShardsIt(String nodeId) {
|
||||
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
|
||||
// fill it in a randomized fashion
|
||||
for (int i = 0; i < activeShards.size(); i++) {
|
||||
ShardRouting shardRouting = activeShards.get(i);
|
||||
int seed = shuffler.nextSeed();
|
||||
for (ShardRouting shardRouting : shuffler.shuffle(activeShards, seed)) {
|
||||
if (nodeId.equals(shardRouting.currentNodeId())) {
|
||||
ordered.add(shardRouting);
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < allInitializingShards.size(); i++) {
|
||||
ShardRouting shardRouting = allInitializingShards.get(i);
|
||||
for (ShardRouting shardRouting : shuffler.shuffle(allInitializingShards, seed)) {
|
||||
if (nodeId.equals(shardRouting.currentNodeId())) {
|
||||
ordered.add(shardRouting);
|
||||
}
|
||||
|
@ -347,26 +346,31 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
return new PlainShardIterator(shardId, ordered);
|
||||
}
|
||||
|
||||
public ShardIterator onlyNodeSelectorActiveInitializingShardsIt(String nodeAttributes, DiscoveryNodes discoveryNodes) {
|
||||
return onlyNodeSelectorActiveInitializingShardsIt(new String[] {nodeAttributes}, discoveryNodes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns shards based on nodeAttributes given such as node name , node attribute, node IP
|
||||
* Supports node specifications in cluster API
|
||||
*/
|
||||
public ShardIterator onlyNodeSelectorActiveInitializingShardsIt(String nodeAttribute, DiscoveryNodes discoveryNodes) {
|
||||
public ShardIterator onlyNodeSelectorActiveInitializingShardsIt(String[] nodeAttributes, DiscoveryNodes discoveryNodes) {
|
||||
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
|
||||
Set<String> selectedNodes = Sets.newHashSet(discoveryNodes.resolveNodesIds(nodeAttribute));
|
||||
|
||||
for (ShardRouting shardRouting : activeShards) {
|
||||
Set<String> selectedNodes = Sets.newHashSet(discoveryNodes.resolveNodesIds(nodeAttributes));
|
||||
int seed = shuffler.nextSeed();
|
||||
for (ShardRouting shardRouting : shuffler.shuffle(activeShards, seed)) {
|
||||
if (selectedNodes.contains(shardRouting.currentNodeId())) {
|
||||
ordered.add(shardRouting);
|
||||
}
|
||||
}
|
||||
for (ShardRouting shardRouting : allInitializingShards) {
|
||||
for (ShardRouting shardRouting : shuffler.shuffle(allInitializingShards, seed)) {
|
||||
if (selectedNodes.contains(shardRouting.currentNodeId())) {
|
||||
ordered.add(shardRouting);
|
||||
}
|
||||
}
|
||||
if (ordered.isEmpty()) {
|
||||
throw new IllegalArgumentException("No data node with criteria [" + nodeAttribute + "] found");
|
||||
throw new IllegalArgumentException("no data nodes with critera(s) " +
|
||||
Strings.arrayToCommaDelimitedString(nodeAttributes) + "] found for shard:" + shardId());
|
||||
}
|
||||
return new PlainShardIterator(shardId, ordered);
|
||||
}
|
||||
|
|
|
@ -177,8 +177,8 @@ public class OperationRouting extends AbstractComponent {
|
|||
ensureNodeIdExists(nodes, nodeId);
|
||||
return indexShard.onlyNodeActiveInitializingShardsIt(nodeId);
|
||||
case ONLY_NODES:
|
||||
String nodeAttribute = preference.substring(Preference.ONLY_NODES.type().length() + 1);
|
||||
return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttribute, nodes);
|
||||
String nodeAttributes = preference.substring(Preference.ONLY_NODES.type().length() + 1);
|
||||
return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttributes.split(","), nodes);
|
||||
default:
|
||||
throw new IllegalArgumentException("unknown preference [" + preferenceType + "]");
|
||||
}
|
||||
|
|
|
@ -320,6 +320,24 @@ public class RoutingIteratorTests extends ESAllocationTestCase {
|
|||
assertThat(shardsIterator.size(), equalTo(1));
|
||||
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node2"));
|
||||
|
||||
shardsIterator = clusterState.routingTable().index("test").shard(0)
|
||||
.onlyNodeSelectorActiveInitializingShardsIt(new String[] {"disk:eph*","disk:ebs"},clusterState.nodes());
|
||||
assertThat(shardsIterator.size(), equalTo(2));
|
||||
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node2"));
|
||||
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node1"));
|
||||
|
||||
shardsIterator = clusterState.routingTable().index("test").shard(0)
|
||||
.onlyNodeSelectorActiveInitializingShardsIt(new String[] {"disk:*", "invalid_name"},clusterState.nodes());
|
||||
assertThat(shardsIterator.size(), equalTo(2));
|
||||
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node2"));
|
||||
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node1"));
|
||||
|
||||
shardsIterator = clusterState.routingTable().index("test").shard(0)
|
||||
.onlyNodeSelectorActiveInitializingShardsIt(new String[] {"disk:*", "disk:*"},clusterState.nodes());
|
||||
assertThat(shardsIterator.size(), equalTo(2));
|
||||
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node2"));
|
||||
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node1"));
|
||||
|
||||
try {
|
||||
shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("welma", clusterState.nodes());
|
||||
fail("should have raised illegalArgumentException");
|
||||
|
|
|
@ -19,22 +19,30 @@
|
|||
|
||||
package org.elasticsearch.search.preference;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(minNumDataNodes = 2)
|
||||
public class SearchPreferenceIT extends ESIntegTestCase {
|
||||
|
@ -155,4 +163,63 @@ public class SearchPreferenceIT extends ESIntegTestCase {
|
|||
assertThat(e.getMessage(), is("No data node with id[DOES-NOT-EXIST] found"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testNodesOnlyRandom() throws Exception {
|
||||
assertAcked(prepareCreate("test").setSettings(
|
||||
//this test needs at least a replica to make sure two consecutive searches go to two different copies of the same data
|
||||
Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_REPLICAS, between(1, maximumNumberOfReplicas()))));
|
||||
ensureGreen();
|
||||
client().prepareIndex("test", "type1").setSource("field1", "value1").execute().actionGet();
|
||||
refresh();
|
||||
|
||||
final Client client = internalCluster().smartClient();
|
||||
SearchRequestBuilder request = client.prepareSearch("test")
|
||||
.setQuery(matchAllQuery()).setPreference("_only_nodes:*,nodes*"); // multiple wildchar to cover multi-param usecase
|
||||
assertSearchOnRandomNodes(request);
|
||||
|
||||
request = client.prepareSearch("test")
|
||||
.setQuery(matchAllQuery()).setPreference("_only_nodes:*");
|
||||
assertSearchOnRandomNodes(request);
|
||||
|
||||
ArrayList<String> allNodeIds = new ArrayList<>();
|
||||
ArrayList<String> allNodeNames = new ArrayList<>();
|
||||
ArrayList<String> allNodeHosts = new ArrayList<>();
|
||||
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet();
|
||||
for (NodeStats node : nodeStats.getNodes()) {
|
||||
allNodeIds.add(node.getNode().getId());
|
||||
allNodeNames.add(node.getNode().getName());
|
||||
allNodeHosts.add(node.getHostname());
|
||||
}
|
||||
|
||||
String node_expr = "_only_nodes:" + Strings.arrayToCommaDelimitedString(allNodeIds.toArray());
|
||||
request = client.prepareSearch("test").setQuery(matchAllQuery()).setPreference(node_expr);
|
||||
assertSearchOnRandomNodes(request);
|
||||
|
||||
node_expr = "_only_nodes:" + Strings.arrayToCommaDelimitedString(allNodeNames.toArray());
|
||||
request = client.prepareSearch("test").setQuery(matchAllQuery()).setPreference(node_expr);
|
||||
assertSearchOnRandomNodes(request);
|
||||
|
||||
node_expr = "_only_nodes:" + Strings.arrayToCommaDelimitedString(allNodeHosts.toArray());
|
||||
request = client.prepareSearch("test").setQuery(matchAllQuery()).setPreference(node_expr);
|
||||
assertSearchOnRandomNodes(request);
|
||||
|
||||
node_expr = "_only_nodes:" + Strings.arrayToCommaDelimitedString(allNodeHosts.toArray());
|
||||
request = client.prepareSearch("test").setQuery(matchAllQuery()).setPreference(node_expr);
|
||||
assertSearchOnRandomNodes(request);
|
||||
|
||||
// Mix of valid and invalid nodes
|
||||
node_expr = "_only_nodes:*,invalidnode";
|
||||
request = client.prepareSearch("test").setQuery(matchAllQuery()).setPreference(node_expr);
|
||||
assertSearchOnRandomNodes(request);
|
||||
}
|
||||
|
||||
private void assertSearchOnRandomNodes(SearchRequestBuilder request) {
|
||||
Set<String> hitNodes = new HashSet<>();
|
||||
for (int i = 0; i < 2; i++) {
|
||||
SearchResponse searchResponse = request.execute().actionGet();
|
||||
assertThat(searchResponse.getHits().getHits().length, greaterThan(0));
|
||||
hitNodes.add(searchResponse.getHits().getAt(0).shard().nodeId());
|
||||
}
|
||||
assertThat(hitNodes.size(), greaterThan(1));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue