diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 55fa01747b3..ef5011f4dbb 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -53,6 +53,7 @@ public class IndexShardRoutingTable implements Iterable { final ImmutableList shards; final ImmutableList activeShards; final ImmutableList assignedShards; + final static ImmutableList NO_SHARDS = ImmutableList.of(); final boolean allShardsStarted; /** @@ -279,6 +280,16 @@ public class IndexShardRoutingTable implements Iterable { return new PlainShardIterator(shardId, ordered); } + /** + * Returns true if no primaries are active or initializing for this shard + */ + private boolean noPrimariesActive() { + if (!primaryAsList.isEmpty() && !primaryAsList.get(0).active() && !primaryAsList.get(0).initializing()) { + return true; + } + return false; + } + /** * Returns an iterator only on the primary shard. */ @@ -287,9 +298,8 @@ public class IndexShardRoutingTable implements Iterable { } public ShardIterator primaryActiveInitializingShardIt() { - if (!primaryAsList.isEmpty() && !primaryAsList.get(0).active() && !primaryAsList.get(0).initializing()) { - List primaryList = ImmutableList.of(); - return new PlainShardIterator(shardId, primaryList); + if (noPrimariesActive()) { + return new PlainShardIterator(shardId, NO_SHARDS); } return primaryShardIt(); } @@ -312,6 +322,49 @@ public class IndexShardRoutingTable implements Iterable { return new PlainShardIterator(shardId, ordered); } + public ShardIterator replicaActiveInitializingShardIt() { + // If the primaries are unassigned, return an empty list (there aren't + // any replicas to query anyway) + if (noPrimariesActive()) { + return new PlainShardIterator(shardId, NO_SHARDS); + } + + LinkedList ordered = new LinkedList<>(); + for (ShardRouting replica : shuffler.shuffle(replicas)) { + if (replica.active()) { + ordered.addFirst(replica); + } else if (replica.initializing()) { + ordered.addLast(replica); + } + } + return new PlainShardIterator(shardId, ordered); + } + + public ShardIterator replicaFirstActiveInitializingShardsIt() { + // If the primaries are unassigned, return an empty list (there aren't + // any replicas to query anyway) + if (noPrimariesActive()) { + return new PlainShardIterator(shardId, NO_SHARDS); + } + + ArrayList ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size()); + // fill it in a randomized fashion with the active replicas + for (ShardRouting replica : shuffler.shuffle(replicas)) { + if (replica.active()) { + ordered.add(replica); + } + } + + // Add the primary shard + ordered.add(primary); + + // Add initializing shards last + if (!allInitializingShards.isEmpty()) { + ordered.addAll(allInitializingShards); + } + return new PlainShardIterator(shardId, ordered); + } + public ShardIterator onlyNodeActiveInitializingShardsIt(String nodeId) { ArrayList ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size()); // fill it in a randomized fashion diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index ae12a387b73..6db68524992 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -174,8 +174,12 @@ public class OperationRouting extends AbstractComponent { return indexShard.preferNodeActiveInitializingShardsIt(localNodeId); case PRIMARY: return indexShard.primaryActiveInitializingShardIt(); + case REPLICA: + return indexShard.replicaActiveInitializingShardIt(); case PRIMARY_FIRST: return indexShard.primaryFirstActiveInitializingShardsIt(); + case REPLICA_FIRST: + return indexShard.replicaFirstActiveInitializingShardsIt(); case ONLY_LOCAL: return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId); case ONLY_NODE: diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/Preference.java b/core/src/main/java/org/elasticsearch/cluster/routing/Preference.java index e9057bfe681..6de251b9d52 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/Preference.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/Preference.java @@ -44,11 +44,21 @@ public enum Preference { */ PRIMARY("_primary"), + /** + * Route to replica shards + */ + REPLICA("_replica"), + /** * Route to primary shards first */ PRIMARY_FIRST("_primary_first"), + /** + * Route to replica shards first + */ + REPLICA_FIRST("_replica_first"), + /** * Route to the local shard only */ @@ -96,9 +106,14 @@ public enum Preference { return LOCAL; case "_primary": return PRIMARY; + case "_replica": + return REPLICA; case "_primary_first": case "_primaryFirst": return PRIMARY_FIRST; + case "_replica_first": + case "_replicaFirst": + return REPLICA_FIRST; case "_only_local": case "_onlyLocal": return ONLY_LOCAL; diff --git a/core/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java b/core/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java index e5d7e7b917b..d54f931e8d8 100644 --- a/core/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java @@ -381,4 +381,90 @@ public class RoutingIteratorTests extends ElasticsearchAllocationTestCase { assertThat(shardIterators.iterator().next().shardId().id(), equalTo(0)); assertThat(shardIterators.iterator().next().nextOrNull().currentNodeId(), equalTo("node1")); } + + @Test + public void testReplicaShardPreferenceIters() throws Exception { + AllocationService strategy = createAllocationService(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .build()); + + OperationRouting operationRouting = new OperationRouting(Settings.Builder.EMPTY_SETTINGS, new AwarenessAllocationDecider()); + + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(2)) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); + + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() + .put(newNode("node1")) + .put(newNode("node2")) + .put(newNode("node3")) + .localNodeId("node1") + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + // When replicas haven't initialized, it comes back with the primary first, then initializing replicas + GroupShardsIterator shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_replica_first"); + assertThat(shardIterators.size(), equalTo(2)); // two potential shards + ShardIterator iter = shardIterators.iterator().next(); + assertThat(iter.size(), equalTo(3)); // three potential candidates for the shard + ShardRouting routing = iter.nextOrNull(); + assertNotNull(routing); + assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1))); + assertTrue(routing.primary()); // replicas haven't initialized yet, so primary is first + assertTrue(routing.started()); + routing = iter.nextOrNull(); + assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1))); + assertFalse(routing.primary()); + assertTrue(routing.initializing()); + routing = iter.nextOrNull(); + assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1))); + assertFalse(routing.primary()); + assertTrue(routing.initializing()); + + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + + shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_replica"); + assertThat(shardIterators.size(), equalTo(2)); // two potential shards + iter = shardIterators.iterator().next(); + assertThat(iter.size(), equalTo(2)); // two potential replicas for the shard + routing = iter.nextOrNull(); + assertNotNull(routing); + assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1))); + assertFalse(routing.primary()); + routing = iter.nextOrNull(); + assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1))); + assertFalse(routing.primary()); + + shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_replica_first"); + assertThat(shardIterators.size(), equalTo(2)); // two potential shards + iter = shardIterators.iterator().next(); + assertThat(iter.size(), equalTo(3)); // three potential candidates for the shard + routing = iter.nextOrNull(); + assertNotNull(routing); + assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1))); + assertFalse(routing.primary()); + routing = iter.nextOrNull(); + assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1))); + assertFalse(routing.primary()); + // finally the primary + routing = iter.nextOrNull(); + assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1))); + assertTrue(routing.primary()); + } + } \ No newline at end of file diff --git a/core/src/test/java/org/elasticsearch/search/preference/SearchPreferenceTests.java b/core/src/test/java/org/elasticsearch/search/preference/SearchPreferenceTests.java index bdfac267224..dc125558209 100644 --- a/core/src/test/java/org/elasticsearch/search/preference/SearchPreferenceTests.java +++ b/core/src/test/java/org/elasticsearch/search/preference/SearchPreferenceTests.java @@ -88,7 +88,7 @@ public class SearchPreferenceTests extends ElasticsearchIntegrationTest { @Test public void simplePreferenceTests() throws Exception { - createIndex("test"); + client().admin().indices().prepareCreate("test").setSettings("number_of_replicas=1").get(); ensureGreen(); client().prepareIndex("test", "type1").setSource("field1", "value1").execute().actionGet(); @@ -104,12 +104,47 @@ public class SearchPreferenceTests extends ElasticsearchIntegrationTest { searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_primary").execute().actionGet(); assertThat(searchResponse.getHits().totalHits(), equalTo(1l)); + searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet(); + assertThat(searchResponse.getHits().totalHits(), equalTo(1l)); + searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet(); + assertThat(searchResponse.getHits().totalHits(), equalTo(1l)); + + searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet(); + assertThat(searchResponse.getHits().totalHits(), equalTo(1l)); + searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet(); + assertThat(searchResponse.getHits().totalHits(), equalTo(1l)); + searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet(); assertThat(searchResponse.getHits().totalHits(), equalTo(1l)); searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet(); assertThat(searchResponse.getHits().totalHits(), equalTo(1l)); } + @Test + public void testReplicaPreference() throws Exception { + client().admin().indices().prepareCreate("test").setSettings("number_of_replicas=0").get(); + ensureGreen(); + + client().prepareIndex("test", "type1").setSource("field1", "value1").execute().actionGet(); + refresh(); + + try { + client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet(); + fail("should have failed because there are no replicas"); + } catch (Exception e) { + // pass + } + + SearchResponse resp = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet(); + assertThat(resp.getHits().totalHits(), equalTo(1l)); + + client().admin().indices().prepareUpdateSettings("test").setSettings("number_of_replicas=1").get(); + ensureGreen("test"); + + resp = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet(); + assertThat(resp.getHits().totalHits(), equalTo(1l)); + } + @Test (expected = IllegalArgumentException.class) public void testThatSpecifyingNonExistingNodesReturnsUsefulError() throws Exception { createIndex("test"); diff --git a/docs/reference/search/request/preference.asciidoc b/docs/reference/search/request/preference.asciidoc index 28ec3bd96b8..0d07f29475e 100644 --- a/docs/reference/search/request/preference.asciidoc +++ b/docs/reference/search/request/preference.asciidoc @@ -16,6 +16,13 @@ The `preference` is a query string parameter which can be set to: The operation will go and be executed on the primary shard, and if not available (failover), will execute on other shards. +`_replica`:: + The operation will go and be executed only on a replica shard. + +`_replica_first`:: + The operation will go and be executed only on a replica shard, and if + not available (failover), will execute on other shards. + `_local`:: The operation will prefer to be executed on a local allocated shard if possible.