Add _replica and _replica_first as search preference.

Just like specifying `?preference=_primary`, this adds the ability to
specify `?preference=_replica` or `?preference=_replica_first` on
requests that support it.

Resolves #12222
This commit is contained in:
Lee Hinman 2015-07-14 11:46:43 -06:00
parent 439c67ab15
commit a8391fcae9
6 changed files with 204 additions and 4 deletions

View File

@ -53,6 +53,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
final ImmutableList<ShardRouting> shards;
final ImmutableList<ShardRouting> activeShards;
final ImmutableList<ShardRouting> assignedShards;
final static ImmutableList<ShardRouting> NO_SHARDS = ImmutableList.of();
final boolean allShardsStarted;
/**
@ -279,6 +280,16 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return new PlainShardIterator(shardId, ordered);
}
/**
* Returns true if no primaries are active or initializing for this shard
*/
private boolean noPrimariesActive() {
if (!primaryAsList.isEmpty() && !primaryAsList.get(0).active() && !primaryAsList.get(0).initializing()) {
return true;
}
return false;
}
/**
* Returns an iterator only on the primary shard.
*/
@ -287,9 +298,8 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
}
public ShardIterator primaryActiveInitializingShardIt() {
if (!primaryAsList.isEmpty() && !primaryAsList.get(0).active() && !primaryAsList.get(0).initializing()) {
List<ShardRouting> primaryList = ImmutableList.of();
return new PlainShardIterator(shardId, primaryList);
if (noPrimariesActive()) {
return new PlainShardIterator(shardId, NO_SHARDS);
}
return primaryShardIt();
}
@ -312,6 +322,49 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return new PlainShardIterator(shardId, ordered);
}
public ShardIterator replicaActiveInitializingShardIt() {
// If the primaries are unassigned, return an empty list (there aren't
// any replicas to query anyway)
if (noPrimariesActive()) {
return new PlainShardIterator(shardId, NO_SHARDS);
}
LinkedList<ShardRouting> ordered = new LinkedList<>();
for (ShardRouting replica : shuffler.shuffle(replicas)) {
if (replica.active()) {
ordered.addFirst(replica);
} else if (replica.initializing()) {
ordered.addLast(replica);
}
}
return new PlainShardIterator(shardId, ordered);
}
public ShardIterator replicaFirstActiveInitializingShardsIt() {
// If the primaries are unassigned, return an empty list (there aren't
// any replicas to query anyway)
if (noPrimariesActive()) {
return new PlainShardIterator(shardId, NO_SHARDS);
}
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
// fill it in a randomized fashion with the active replicas
for (ShardRouting replica : shuffler.shuffle(replicas)) {
if (replica.active()) {
ordered.add(replica);
}
}
// Add the primary shard
ordered.add(primary);
// Add initializing shards last
if (!allInitializingShards.isEmpty()) {
ordered.addAll(allInitializingShards);
}
return new PlainShardIterator(shardId, ordered);
}
public ShardIterator onlyNodeActiveInitializingShardsIt(String nodeId) {
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
// fill it in a randomized fashion

View File

@ -174,8 +174,12 @@ public class OperationRouting extends AbstractComponent {
return indexShard.preferNodeActiveInitializingShardsIt(localNodeId);
case PRIMARY:
return indexShard.primaryActiveInitializingShardIt();
case REPLICA:
return indexShard.replicaActiveInitializingShardIt();
case PRIMARY_FIRST:
return indexShard.primaryFirstActiveInitializingShardsIt();
case REPLICA_FIRST:
return indexShard.replicaFirstActiveInitializingShardsIt();
case ONLY_LOCAL:
return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
case ONLY_NODE:

View File

@ -44,11 +44,21 @@ public enum Preference {
*/
PRIMARY("_primary"),
/**
* Route to replica shards
*/
REPLICA("_replica"),
/**
* Route to primary shards first
*/
PRIMARY_FIRST("_primary_first"),
/**
* Route to replica shards first
*/
REPLICA_FIRST("_replica_first"),
/**
* Route to the local shard only
*/
@ -96,9 +106,14 @@ public enum Preference {
return LOCAL;
case "_primary":
return PRIMARY;
case "_replica":
return REPLICA;
case "_primary_first":
case "_primaryFirst":
return PRIMARY_FIRST;
case "_replica_first":
case "_replicaFirst":
return REPLICA_FIRST;
case "_only_local":
case "_onlyLocal":
return ONLY_LOCAL;

View File

@ -381,4 +381,90 @@ public class RoutingIteratorTests extends ElasticsearchAllocationTestCase {
assertThat(shardIterators.iterator().next().shardId().id(), equalTo(0));
assertThat(shardIterators.iterator().next().nextOrNull().currentNodeId(), equalTo("node1"));
}
@Test
public void testReplicaShardPreferenceIters() throws Exception {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.build());
OperationRouting operationRouting = new OperationRouting(Settings.Builder.EMPTY_SETTINGS, new AwarenessAllocationDecider());
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(2))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("node1"))
.put(newNode("node2"))
.put(newNode("node3"))
.localNodeId("node1")
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
// When replicas haven't initialized, it comes back with the primary first, then initializing replicas
GroupShardsIterator shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_replica_first");
assertThat(shardIterators.size(), equalTo(2)); // two potential shards
ShardIterator iter = shardIterators.iterator().next();
assertThat(iter.size(), equalTo(3)); // three potential candidates for the shard
ShardRouting routing = iter.nextOrNull();
assertNotNull(routing);
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertTrue(routing.primary()); // replicas haven't initialized yet, so primary is first
assertTrue(routing.started());
routing = iter.nextOrNull();
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertFalse(routing.primary());
assertTrue(routing.initializing());
routing = iter.nextOrNull();
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertFalse(routing.primary());
assertTrue(routing.initializing());
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_replica");
assertThat(shardIterators.size(), equalTo(2)); // two potential shards
iter = shardIterators.iterator().next();
assertThat(iter.size(), equalTo(2)); // two potential replicas for the shard
routing = iter.nextOrNull();
assertNotNull(routing);
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertFalse(routing.primary());
routing = iter.nextOrNull();
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertFalse(routing.primary());
shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_replica_first");
assertThat(shardIterators.size(), equalTo(2)); // two potential shards
iter = shardIterators.iterator().next();
assertThat(iter.size(), equalTo(3)); // three potential candidates for the shard
routing = iter.nextOrNull();
assertNotNull(routing);
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertFalse(routing.primary());
routing = iter.nextOrNull();
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertFalse(routing.primary());
// finally the primary
routing = iter.nextOrNull();
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertTrue(routing.primary());
}
}

View File

@ -88,7 +88,7 @@ public class SearchPreferenceTests extends ElasticsearchIntegrationTest {
@Test
public void simplePreferenceTests() throws Exception {
createIndex("test");
client().admin().indices().prepareCreate("test").setSettings("number_of_replicas=1").get();
ensureGreen();
client().prepareIndex("test", "type1").setSource("field1", "value1").execute().actionGet();
@ -104,12 +104,47 @@ public class SearchPreferenceTests extends ElasticsearchIntegrationTest {
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_primary").execute().actionGet();
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet();
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet();
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet();
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet();
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
}
@Test
public void testReplicaPreference() throws Exception {
client().admin().indices().prepareCreate("test").setSettings("number_of_replicas=0").get();
ensureGreen();
client().prepareIndex("test", "type1").setSource("field1", "value1").execute().actionGet();
refresh();
try {
client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
fail("should have failed because there are no replicas");
} catch (Exception e) {
// pass
}
SearchResponse resp = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet();
assertThat(resp.getHits().totalHits(), equalTo(1l));
client().admin().indices().prepareUpdateSettings("test").setSettings("number_of_replicas=1").get();
ensureGreen("test");
resp = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
assertThat(resp.getHits().totalHits(), equalTo(1l));
}
@Test (expected = IllegalArgumentException.class)
public void testThatSpecifyingNonExistingNodesReturnsUsefulError() throws Exception {
createIndex("test");

View File

@ -16,6 +16,13 @@ The `preference` is a query string parameter which can be set to:
The operation will go and be executed on the primary
shard, and if not available (failover), will execute on other shards.
`_replica`::
The operation will go and be executed only on a replica shard.
`_replica_first`::
The operation will go and be executed only on a replica shard, and if
not available (failover), will execute on other shards.
`_local`::
The operation will prefer to be executed on a local
allocated shard if possible.