Add search preference to prefer multiple nodes
The search preference _prefer_node allows specifying a single node to prefer when routing a request. This functionality can be enhanced by permitting multiple nodes to be preferred. This commit replaces the search preference _prefer_node with the search preference _prefer_nodes which supplants the former by specifying a single node and otherwise adds functionality. Relates #18872
This commit is contained in:
parent
d0e4485d42
commit
e96722d91c
|
@ -375,21 +375,22 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
return new PlainShardIterator(shardId, ordered);
|
||||
}
|
||||
|
||||
public ShardIterator preferNodeActiveInitializingShardsIt(String nodeId) {
|
||||
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
|
||||
public ShardIterator preferNodeActiveInitializingShardsIt(Set<String> nodeIds) {
|
||||
ArrayList<ShardRouting> preferred = new ArrayList<>(activeShards.size() + allInitializingShards.size());
|
||||
ArrayList<ShardRouting> notPreferred = new ArrayList<>(activeShards.size() + allInitializingShards.size());
|
||||
// fill it in a randomized fashion
|
||||
for (ShardRouting shardRouting : shuffler.shuffle(activeShards)) {
|
||||
ordered.add(shardRouting);
|
||||
if (nodeId.equals(shardRouting.currentNodeId())) {
|
||||
// switch, its the matching node id
|
||||
ordered.set(ordered.size() - 1, ordered.get(0));
|
||||
ordered.set(0, shardRouting);
|
||||
if (nodeIds.contains(shardRouting.currentNodeId())) {
|
||||
preferred.add(shardRouting);
|
||||
} else {
|
||||
notPreferred.add(shardRouting);
|
||||
}
|
||||
}
|
||||
preferred.addAll(notPreferred);
|
||||
if (!allInitializingShards.isEmpty()) {
|
||||
ordered.addAll(allInitializingShards);
|
||||
preferred.addAll(allInitializingShards);
|
||||
}
|
||||
return new PlainShardIterator(shardId, ordered);
|
||||
return new PlainShardIterator(shardId, preferred);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -33,17 +33,15 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
import org.elasticsearch.index.shard.ShardNotFoundException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class OperationRouting extends AbstractComponent {
|
||||
|
||||
|
||||
private final AwarenessAllocationDecider awarenessAllocationDecider;
|
||||
|
||||
@Inject
|
||||
|
@ -158,10 +156,14 @@ public class OperationRouting extends AbstractComponent {
|
|||
}
|
||||
preferenceType = Preference.parse(preference);
|
||||
switch (preferenceType) {
|
||||
case PREFER_NODE:
|
||||
return indexShard.preferNodeActiveInitializingShardsIt(preference.substring(Preference.PREFER_NODE.type().length() + 1));
|
||||
case PREFER_NODES:
|
||||
final Set<String> nodesIds =
|
||||
Arrays.stream(
|
||||
preference.substring(Preference.PREFER_NODES.type().length() + 1).split(",")
|
||||
).collect(Collectors.toSet());
|
||||
return indexShard.preferNodeActiveInitializingShardsIt(nodesIds);
|
||||
case LOCAL:
|
||||
return indexShard.preferNodeActiveInitializingShardsIt(localNodeId);
|
||||
return indexShard.preferNodeActiveInitializingShardsIt(Collections.singleton(localNodeId));
|
||||
case PRIMARY:
|
||||
return indexShard.primaryActiveInitializingShardIt();
|
||||
case REPLICA:
|
||||
|
|
|
@ -30,9 +30,9 @@ public enum Preference {
|
|||
SHARDS("_shards"),
|
||||
|
||||
/**
|
||||
* Route to preferred node, if possible
|
||||
* Route to preferred nodes, if possible
|
||||
*/
|
||||
PREFER_NODE("_prefer_node"),
|
||||
PREFER_NODES("_prefer_nodes"),
|
||||
|
||||
/**
|
||||
* Route to local node, if possible
|
||||
|
@ -98,8 +98,8 @@ public enum Preference {
|
|||
switch (preferenceType) {
|
||||
case "_shards":
|
||||
return SHARDS;
|
||||
case "_prefer_node":
|
||||
return PREFER_NODE;
|
||||
case "_prefer_nodes":
|
||||
return PREFER_NODES;
|
||||
case "_only_node":
|
||||
return ONLY_NODE;
|
||||
case "_local":
|
||||
|
@ -123,6 +123,7 @@ public enum Preference {
|
|||
throw new IllegalArgumentException("no Preference for [" + preferenceType + "]");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -18,16 +18,31 @@
|
|||
*/
|
||||
package org.elasticsearch.cluster.routing;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.test.ClusterServiceUtils;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class OperationRoutingTests extends ESTestCase{
|
||||
|
||||
public void testGenerateShardId() {
|
||||
|
@ -170,4 +185,48 @@ public class OperationRoutingTests extends ESTestCase{
|
|||
assertEquals(shard, entry.getValue().intValue());
|
||||
}
|
||||
}
|
||||
|
||||
public void testPreferNodes() throws InterruptedException, IOException {
|
||||
TestThreadPool threadPool = null;
|
||||
ClusterService clusterService = null;
|
||||
try {
|
||||
threadPool = new TestThreadPool("testPreferNodes");
|
||||
clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
||||
final String indexName = "test";
|
||||
ClusterServiceUtils.setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(indexName, true, randomInt(8)));
|
||||
final Index index = clusterService.state().metaData().index(indexName).getIndex();
|
||||
final List<ShardRouting> shards = clusterService.state().getRoutingNodes().assignedShards(new ShardId(index, 0));
|
||||
final int count = randomIntBetween(1, shards.size());
|
||||
int position = 0;
|
||||
final List<String> nodes = new ArrayList<>();
|
||||
final List<ShardRouting> expected = new ArrayList<>();
|
||||
for (int i = 0; i < count; i++) {
|
||||
if (randomBoolean() && !shards.get(position).initializing()) {
|
||||
nodes.add(shards.get(position).currentNodeId());
|
||||
expected.add(shards.get(position));
|
||||
position++;
|
||||
} else {
|
||||
nodes.add("missing_" + i);
|
||||
}
|
||||
}
|
||||
final ShardIterator it =
|
||||
new OperationRouting(Settings.EMPTY, new AwarenessAllocationDecider())
|
||||
.getShards(clusterService.state(), indexName, 0, "_prefer_nodes:" + String.join(",", nodes));
|
||||
final List<ShardRouting> all = new ArrayList<>();
|
||||
ShardRouting shard;
|
||||
while ((shard = it.nextOrNull()) != null) {
|
||||
all.add(shard);
|
||||
}
|
||||
final Set<ShardRouting> preferred = new HashSet<>();
|
||||
preferred.addAll(all.subList(0, expected.size()));
|
||||
// the preferred shards should be at the front of the list
|
||||
assertThat(preferred, containsInAnyOrder(expected.toArray()));
|
||||
// verify all the shards are there
|
||||
assertThat(all.size(), equalTo(shards.size()));
|
||||
} finally {
|
||||
IOUtils.close(clusterService);
|
||||
terminate(threadPool);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.elasticsearch.test.ESAllocationTestCase;
|
|||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
|
@ -401,15 +402,23 @@ public class RoutingIteratorTests extends ESAllocationTestCase {
|
|||
assertThat(shardIterators.iterator().next().shardId().id(), equalTo(0));
|
||||
assertThat(shardIterators.iterator().next().nextOrNull().currentNodeId(), not(equalTo(firstRoundNodeId)));
|
||||
|
||||
shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_shards:0;_prefer_node:node1");
|
||||
shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_shards:0;_prefer_nodes:node1");
|
||||
assertThat(shardIterators.size(), equalTo(1));
|
||||
assertThat(shardIterators.iterator().next().shardId().id(), equalTo(0));
|
||||
assertThat(shardIterators.iterator().next().nextOrNull().currentNodeId(), equalTo("node1"));
|
||||
|
||||
shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_shards:0;_prefer_node:node1");
|
||||
shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_shards:0;_prefer_nodes:node1,node2");
|
||||
assertThat(shardIterators.size(), equalTo(1));
|
||||
assertThat(shardIterators.iterator().next().shardId().id(), equalTo(0));
|
||||
assertThat(shardIterators.iterator().next().nextOrNull().currentNodeId(), equalTo("node1"));
|
||||
Iterator<ShardIterator> iterator = shardIterators.iterator();
|
||||
final ShardIterator it = iterator.next();
|
||||
assertThat(it.shardId().id(), equalTo(0));
|
||||
final String firstNodeId = it.nextOrNull().currentNodeId();
|
||||
assertThat(firstNodeId, anyOf(equalTo("node1"), equalTo("node2")));
|
||||
if ("node1".equals(firstNodeId)) {
|
||||
assertThat(it.nextOrNull().currentNodeId(), equalTo("node2"));
|
||||
} else {
|
||||
assertThat(it.nextOrNull().currentNodeId(), equalTo("node1"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testReplicaShardPreferenceIters() throws Exception {
|
||||
|
|
|
@ -56,7 +56,7 @@ public class SearchPreferenceIT extends ESIntegTestCase {
|
|||
refresh();
|
||||
internalCluster().stopRandomDataNode();
|
||||
client().admin().cluster().prepareHealth().setWaitForStatus(ClusterHealthStatus.RED).execute().actionGet();
|
||||
String[] preferences = new String[] {"_primary", "_local", "_primary_first", "_prefer_node:somenode", "_prefer_node:server2"};
|
||||
String[] preferences = new String[] {"_primary", "_local", "_primary_first", "_prefer_nodes:somenode", "_prefer_nodes:server2", "_prefer_nodes:somenode,server2"};
|
||||
for (String pref : preferences) {
|
||||
logger.info("--> Testing out preference={}", pref);
|
||||
SearchResponse searchResponse = client().prepareSearch().setSize(0).setPreference(pref).execute().actionGet();
|
||||
|
|
|
@ -185,3 +185,10 @@ This is now consistent for source filtering on other places in the search API.
|
|||
In the response for profiling queries, the `query_type` has been renamed to `type` and `lucene` has been renamed to
|
||||
`description`. These changes have been made so the response format is more friendly to supporting other types of profiling
|
||||
in the future.
|
||||
|
||||
==== Search prefernce
|
||||
|
||||
The <<search-request-preference,search preference>> `_prefer_node` has
|
||||
been superseded by `_prefer_nodes`. By specifying a single node,
|
||||
`_prefer_nodes` provides the same functionality as `_prefer_node` but
|
||||
also supports specifying multiple nodes.
|
||||
|
|
|
@ -31,9 +31,9 @@ The `preference` is a query string parameter which can be set to:
|
|||
Restricts the search to execute only on a node with
|
||||
the provided node id (`xyz` in this case).
|
||||
|
||||
`_prefer_node:xyz`::
|
||||
Prefers execution on the node with the provided
|
||||
node id (`xyz` in this case) if applicable.
|
||||
`_prefer_nodes:abc,xyz`::
|
||||
Prefers execution on the nodes with the provided
|
||||
node ids (`abc` or `xyz` in this case) if applicable.
|
||||
|
||||
`_shards:2,3`::
|
||||
Restricts the operation to the specified shards. (`2`
|
||||
|
|
Loading…
Reference in New Issue