remove _primary and _replica shard preferences (#26791)
The shard preference _primary, _replica and its variants were useful for the asynchronous replication. However, with the current impl, they are no longer useful and should be removed. Closes #26335
This commit is contained in:
parent
9db21cd23f
commit
bf4c3642b2
|
@ -142,8 +142,8 @@ public class NoopSearchRequestBuilder extends ActionRequestBuilder<SearchRequest
|
|||
|
||||
/**
|
||||
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
|
||||
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or
|
||||
* a custom value, which guarantees that the same order will be used across different requests.
|
||||
* <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
|
||||
* will be used across different requests.
|
||||
*/
|
||||
public NoopSearchRequestBuilder setPreference(String preference) {
|
||||
request.preference(preference);
|
||||
|
|
|
@ -146,8 +146,8 @@ public class ClusterSearchShardsRequest extends MasterNodeReadRequest<ClusterSea
|
|||
|
||||
/**
|
||||
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
|
||||
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or
|
||||
* a custom value, which guarantees that the same order will be used across different requests.
|
||||
* <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
|
||||
* will be used across different requests.
|
||||
*/
|
||||
public ClusterSearchShardsRequest preference(String preference) {
|
||||
this.preference = preference;
|
||||
|
|
|
@ -55,8 +55,8 @@ public class ClusterSearchShardsRequestBuilder extends MasterNodeReadOperationRe
|
|||
|
||||
/**
|
||||
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
|
||||
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or
|
||||
* a custom value, which guarantees that the same order will be used across different requests.
|
||||
* <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
|
||||
* will be used across different requests.
|
||||
*/
|
||||
public ClusterSearchShardsRequestBuilder setPreference(String preference) {
|
||||
request.preference(preference);
|
||||
|
|
|
@ -152,8 +152,8 @@ public class GetRequest extends SingleShardRequest<GetRequest> implements Realti
|
|||
|
||||
/**
|
||||
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
|
||||
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or
|
||||
* a custom value, which guarantees that the same order will be used across different requests.
|
||||
* <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
|
||||
* will be used across different requests.
|
||||
*/
|
||||
public GetRequest preference(String preference) {
|
||||
this.preference = preference;
|
||||
|
|
|
@ -76,8 +76,8 @@ public class GetRequestBuilder extends SingleShardOperationRequestBuilder<GetReq
|
|||
|
||||
/**
|
||||
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
|
||||
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or
|
||||
* a custom value, which guarantees that the same order will be used across different requests.
|
||||
* <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
|
||||
* will be used across different requests.
|
||||
*/
|
||||
public GetRequestBuilder setPreference(String preference) {
|
||||
request.preference(preference);
|
||||
|
|
|
@ -284,8 +284,8 @@ public class MultiGetRequest extends ActionRequest implements Iterable<MultiGetR
|
|||
|
||||
/**
|
||||
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
|
||||
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or
|
||||
* a custom value, which guarantees that the same order will be used across different requests.
|
||||
* <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
|
||||
* will be used across different requests.
|
||||
*/
|
||||
public MultiGetRequest preference(String preference) {
|
||||
this.preference = preference;
|
||||
|
|
|
@ -58,8 +58,8 @@ public class MultiGetRequestBuilder extends ActionRequestBuilder<MultiGetRequest
|
|||
|
||||
/**
|
||||
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
|
||||
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or
|
||||
* a custom value, which guarantees that the same order will be used across different requests.
|
||||
* <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
|
||||
* will be used across different requests.
|
||||
*/
|
||||
public MultiGetRequestBuilder setPreference(String preference) {
|
||||
request.preference(preference);
|
||||
|
|
|
@ -64,8 +64,8 @@ public class MultiGetShardRequest extends SingleShardRequest<MultiGetShardReques
|
|||
|
||||
/**
|
||||
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
|
||||
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or
|
||||
* a custom value, which guarantees that the same order will be used across different requests.
|
||||
* <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
|
||||
* will be used across different requests.
|
||||
*/
|
||||
public MultiGetShardRequest preference(String preference) {
|
||||
this.preference = preference;
|
||||
|
|
|
@ -241,8 +241,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
|
|||
|
||||
/**
|
||||
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
|
||||
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or
|
||||
* a custom value, which guarantees that the same order will be used across different requests.
|
||||
* <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
|
||||
* will be used across different requests.
|
||||
*/
|
||||
public SearchRequest preference(String preference) {
|
||||
this.preference = preference;
|
||||
|
|
|
@ -144,8 +144,8 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
|
|||
|
||||
/**
|
||||
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
|
||||
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or
|
||||
* a custom value, which guarantees that the same order will be used across different requests.
|
||||
* <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
|
||||
* will be used across different requests.
|
||||
*/
|
||||
public SearchRequestBuilder setPreference(String preference) {
|
||||
request.preference(preference);
|
||||
|
|
|
@ -59,8 +59,8 @@ public class MultiTermVectorsShardRequest extends SingleShardRequest<MultiTermVe
|
|||
|
||||
/**
|
||||
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
|
||||
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or
|
||||
* a custom value, which guarantees that the same order will be used across different requests.
|
||||
* <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
|
||||
* will be used across different requests.
|
||||
*/
|
||||
public MultiTermVectorsShardRequest preference(String preference) {
|
||||
this.preference = preference;
|
||||
|
|
|
@ -294,8 +294,7 @@ public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> i
|
|||
|
||||
/**
|
||||
* Sets the preference to execute the search. Defaults to randomize across
|
||||
* shards. Can be set to <tt>_local</tt> to prefer local shards,
|
||||
* <tt>_primary</tt> to execute only on primary shards, or a custom value,
|
||||
* shards. Can be set to <tt>_local</tt> to prefer local shards or a custom value,
|
||||
* which guarantees that the same order will be used across different
|
||||
* requests.
|
||||
*/
|
||||
|
|
|
@ -99,8 +99,8 @@ public class TermVectorsRequestBuilder extends ActionRequestBuilder<TermVectorsR
|
|||
|
||||
/**
|
||||
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
|
||||
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or
|
||||
* a custom value, which guarantees that the same order will be used across different requests.
|
||||
* <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
|
||||
* will be used across different requests.
|
||||
*/
|
||||
public TermVectorsRequestBuilder setPreference(String preference) {
|
||||
request.preference(preference);
|
||||
|
|
|
@ -441,74 +441,6 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
return new PlainShardIterator(shardId, primaryAsList);
|
||||
}
|
||||
|
||||
public ShardIterator primaryActiveInitializingShardIt() {
|
||||
if (noPrimariesActive()) {
|
||||
return new PlainShardIterator(shardId, NO_SHARDS);
|
||||
}
|
||||
return primaryShardIt();
|
||||
}
|
||||
|
||||
public ShardIterator primaryFirstActiveInitializingShardsIt() {
|
||||
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
|
||||
// fill it in a randomized fashion
|
||||
for (ShardRouting shardRouting : shuffler.shuffle(activeShards)) {
|
||||
ordered.add(shardRouting);
|
||||
if (shardRouting.primary()) {
|
||||
// switch, its the matching node id
|
||||
ordered.set(ordered.size() - 1, ordered.get(0));
|
||||
ordered.set(0, shardRouting);
|
||||
}
|
||||
}
|
||||
// no need to worry about primary first here..., its temporal
|
||||
if (!allInitializingShards.isEmpty()) {
|
||||
ordered.addAll(allInitializingShards);
|
||||
}
|
||||
return new PlainShardIterator(shardId, ordered);
|
||||
}
|
||||
|
||||
public ShardIterator replicaActiveInitializingShardIt() {
|
||||
// If the primaries are unassigned, return an empty list (there aren't
|
||||
// any replicas to query anyway)
|
||||
if (noPrimariesActive()) {
|
||||
return new PlainShardIterator(shardId, NO_SHARDS);
|
||||
}
|
||||
|
||||
LinkedList<ShardRouting> ordered = new LinkedList<>();
|
||||
for (ShardRouting replica : shuffler.shuffle(replicas)) {
|
||||
if (replica.active()) {
|
||||
ordered.addFirst(replica);
|
||||
} else if (replica.initializing()) {
|
||||
ordered.addLast(replica);
|
||||
}
|
||||
}
|
||||
return new PlainShardIterator(shardId, ordered);
|
||||
}
|
||||
|
||||
public ShardIterator replicaFirstActiveInitializingShardsIt() {
|
||||
// If the primaries are unassigned, return an empty list (there aren't
|
||||
// any replicas to query anyway)
|
||||
if (noPrimariesActive()) {
|
||||
return new PlainShardIterator(shardId, NO_SHARDS);
|
||||
}
|
||||
|
||||
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
|
||||
// fill it in a randomized fashion with the active replicas
|
||||
for (ShardRouting replica : shuffler.shuffle(replicas)) {
|
||||
if (replica.active()) {
|
||||
ordered.add(replica);
|
||||
}
|
||||
}
|
||||
|
||||
// Add the primary shard
|
||||
ordered.add(primary);
|
||||
|
||||
// Add initializing shards last
|
||||
if (!allInitializingShards.isEmpty()) {
|
||||
ordered.addAll(allInitializingShards);
|
||||
}
|
||||
return new PlainShardIterator(shardId, ordered);
|
||||
}
|
||||
|
||||
public ShardIterator onlyNodeActiveInitializingShardsIt(String nodeId) {
|
||||
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
|
||||
int seed = shuffler.nextSeed();
|
||||
|
|
|
@ -198,14 +198,6 @@ public class OperationRouting extends AbstractComponent {
|
|||
return indexShard.preferNodeActiveInitializingShardsIt(nodesIds);
|
||||
case LOCAL:
|
||||
return indexShard.preferNodeActiveInitializingShardsIt(Collections.singleton(localNodeId));
|
||||
case PRIMARY:
|
||||
return indexShard.primaryActiveInitializingShardIt();
|
||||
case REPLICA:
|
||||
return indexShard.replicaActiveInitializingShardIt();
|
||||
case PRIMARY_FIRST:
|
||||
return indexShard.primaryFirstActiveInitializingShardsIt();
|
||||
case REPLICA_FIRST:
|
||||
return indexShard.replicaFirstActiveInitializingShardsIt();
|
||||
case ONLY_LOCAL:
|
||||
return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
|
||||
case ONLY_NODES:
|
||||
|
|
|
@ -39,26 +39,6 @@ public enum Preference {
|
|||
*/
|
||||
LOCAL("_local"),
|
||||
|
||||
/**
|
||||
* Route to primary shards
|
||||
*/
|
||||
PRIMARY("_primary"),
|
||||
|
||||
/**
|
||||
* Route to replica shards
|
||||
*/
|
||||
REPLICA("_replica"),
|
||||
|
||||
/**
|
||||
* Route to primary shards first
|
||||
*/
|
||||
PRIMARY_FIRST("_primary_first"),
|
||||
|
||||
/**
|
||||
* Route to replica shards first
|
||||
*/
|
||||
REPLICA_FIRST("_replica_first"),
|
||||
|
||||
/**
|
||||
* Route to the local shard only
|
||||
*/
|
||||
|
@ -97,16 +77,6 @@ public enum Preference {
|
|||
return PREFER_NODES;
|
||||
case "_local":
|
||||
return LOCAL;
|
||||
case "_primary":
|
||||
return PRIMARY;
|
||||
case "_replica":
|
||||
return REPLICA;
|
||||
case "_primary_first":
|
||||
case "_primaryFirst":
|
||||
return PRIMARY_FIRST;
|
||||
case "_replica_first":
|
||||
case "_replicaFirst":
|
||||
return REPLICA_FIRST;
|
||||
case "_only_local":
|
||||
case "_onlyLocal":
|
||||
return ONLY_LOCAL;
|
||||
|
|
|
@ -50,6 +50,7 @@ import static java.util.Collections.singletonMap;
|
|||
import static java.util.Collections.unmodifiableMap;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
||||
import static org.hamcrest.Matchers.anyOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
@ -415,10 +416,6 @@ public class RoutingIteratorTests extends ESAllocationTestCase {
|
|||
}
|
||||
|
||||
public void testReplicaShardPreferenceIters() throws Exception {
|
||||
AllocationService strategy = createAllocationService(Settings.builder()
|
||||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
||||
.build());
|
||||
|
||||
OperationRouting operationRouting = new OperationRouting(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
|
||||
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
||||
|
||||
|
@ -430,69 +427,22 @@ public class RoutingIteratorTests extends ESAllocationTestCase {
|
|||
.addAsNew(metaData.index("test"))
|
||||
.build();
|
||||
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build();
|
||||
final ClusterState clusterState = ClusterState
|
||||
.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
|
||||
.metaData(metaData)
|
||||
.routingTable(routingTable)
|
||||
.nodes(DiscoveryNodes.builder()
|
||||
.add(newNode("node1"))
|
||||
.add(newNode("node2"))
|
||||
.add(newNode("node3"))
|
||||
.localNodeId("node1"))
|
||||
.build();
|
||||
|
||||
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
|
||||
.add(newNode("node1"))
|
||||
.add(newNode("node2"))
|
||||
.add(newNode("node3"))
|
||||
.localNodeId("node1")
|
||||
).build();
|
||||
clusterState = strategy.reroute(clusterState, "reroute");
|
||||
|
||||
clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
|
||||
|
||||
// When replicas haven't initialized, it comes back with the primary first, then initializing replicas
|
||||
GroupShardsIterator<ShardIterator> shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_replica_first");
|
||||
assertThat(shardIterators.size(), equalTo(2)); // two potential shards
|
||||
ShardIterator iter = shardIterators.iterator().next();
|
||||
assertThat(iter.size(), equalTo(3)); // three potential candidates for the shard
|
||||
ShardRouting routing = iter.nextOrNull();
|
||||
assertNotNull(routing);
|
||||
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
|
||||
assertTrue(routing.primary()); // replicas haven't initialized yet, so primary is first
|
||||
assertTrue(routing.started());
|
||||
routing = iter.nextOrNull();
|
||||
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
|
||||
assertFalse(routing.primary());
|
||||
assertTrue(routing.initializing());
|
||||
routing = iter.nextOrNull();
|
||||
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
|
||||
assertFalse(routing.primary());
|
||||
assertTrue(routing.initializing());
|
||||
|
||||
clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
|
||||
|
||||
clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
|
||||
|
||||
|
||||
shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_replica");
|
||||
assertThat(shardIterators.size(), equalTo(2)); // two potential shards
|
||||
iter = shardIterators.iterator().next();
|
||||
assertThat(iter.size(), equalTo(2)); // two potential replicas for the shard
|
||||
routing = iter.nextOrNull();
|
||||
assertNotNull(routing);
|
||||
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
|
||||
assertFalse(routing.primary());
|
||||
routing = iter.nextOrNull();
|
||||
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
|
||||
assertFalse(routing.primary());
|
||||
|
||||
shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_replica_first");
|
||||
assertThat(shardIterators.size(), equalTo(2)); // two potential shards
|
||||
iter = shardIterators.iterator().next();
|
||||
assertThat(iter.size(), equalTo(3)); // three potential candidates for the shard
|
||||
routing = iter.nextOrNull();
|
||||
assertNotNull(routing);
|
||||
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
|
||||
assertFalse(routing.primary());
|
||||
routing = iter.nextOrNull();
|
||||
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
|
||||
assertFalse(routing.primary());
|
||||
// finally the primary
|
||||
routing = iter.nextOrNull();
|
||||
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
|
||||
assertTrue(routing.primary());
|
||||
String[] removedPreferences = {"_primary", "_primary_first", "_replica", "_replica_first"};
|
||||
for (String pref : removedPreferences) {
|
||||
expectThrows(IllegalArgumentException.class,
|
||||
() -> operationRouting.searchShards(clusterState, new String[]{"test"}, null, pref));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
|||
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.cli.MockTerminal;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
|
@ -210,7 +211,10 @@ public class TruncateTranslogIT extends ESIntegTestCase {
|
|||
logger.info("--> starting the replica node to test recovery");
|
||||
internalCluster().startNode();
|
||||
ensureGreen("test");
|
||||
assertHitCount(client().prepareSearch("test").setPreference("_replica").setQuery(matchAllQuery()).get(), numDocsToKeep);
|
||||
for (String node : internalCluster().nodesInclude("test")) {
|
||||
SearchRequestBuilder q = client().prepareSearch("test").setPreference("_only_nodes:" + node).setQuery(matchAllQuery());
|
||||
assertHitCount(q.get(), numDocsToKeep);
|
||||
}
|
||||
final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").setActiveOnly(false).get();
|
||||
final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream()
|
||||
.filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get();
|
||||
|
@ -308,7 +312,9 @@ public class TruncateTranslogIT extends ESIntegTestCase {
|
|||
logger.info("--> starting the replica node to test recovery");
|
||||
internalCluster().startNode();
|
||||
ensureGreen("test");
|
||||
assertHitCount(client().prepareSearch("test").setPreference("_replica").setQuery(matchAllQuery()).get(), totalDocs);
|
||||
for (String node : internalCluster().nodesInclude("test")) {
|
||||
assertHitCount(client().prepareSearch("test").setPreference("_only_nodes:" + node).setQuery(matchAllQuery()).get(), totalDocs);
|
||||
}
|
||||
|
||||
final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").setActiveOnly(false).get();
|
||||
final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream()
|
||||
|
|
|
@ -406,8 +406,7 @@ public class RareClusterStateIT extends ESIntegTestCase {
|
|||
}
|
||||
});
|
||||
|
||||
// Wait for document to be indexed on primary
|
||||
assertBusy(() -> assertTrue(client().prepareGet("index", "type", "1").setPreference("_primary").get().isExists()));
|
||||
assertBusy(() -> assertTrue(client().prepareGet("index", "type", "1").get().isExists()));
|
||||
|
||||
// The mappings have not been propagated to the replica yet as a consequence the document count not be indexed
|
||||
// We wait on purpose to make sure that the document is not indexed because the shard operation is stalled
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
|
|||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
|
@ -73,13 +74,21 @@ public class SearchWhileCreatingIndexIT extends ESIntegTestCase {
|
|||
|
||||
logger.info("using preference {}", preference);
|
||||
// we want to make sure that while recovery happens, and a replica gets recovered, its properly refreshed
|
||||
ClusterHealthStatus status = client().admin().cluster().prepareHealth("test").get().getStatus();;
|
||||
ClusterHealthStatus status = client().admin().cluster().prepareHealth("test").get().getStatus();
|
||||
|
||||
while (status != ClusterHealthStatus.GREEN) {
|
||||
// first, verify that search on the primary search works
|
||||
SearchResponse searchResponse = client().prepareSearch("test").setPreference("_primary").setQuery(QueryBuilders.termQuery("field", "test")).execute().actionGet();
|
||||
assertHitCount(searchResponse, 1);
|
||||
for (IndexShardRoutingTable shardRoutingTable : clusterService().state().routingTable().index("test")) {
|
||||
String primaryNode = shardRoutingTable.primaryShard().currentNodeId();
|
||||
SearchResponse searchResponse = client().prepareSearch("test")
|
||||
.setPreference("_only_nodes:" + primaryNode)
|
||||
.setQuery(QueryBuilders.termQuery("field", "test"))
|
||||
.execute().actionGet();
|
||||
assertHitCount(searchResponse, 1);
|
||||
break;
|
||||
}
|
||||
Client client = client();
|
||||
searchResponse = client.prepareSearch("test").setPreference(preference + Integer.toString(counter++)).setQuery(QueryBuilders.termQuery("field", "test")).execute().actionGet();
|
||||
SearchResponse searchResponse = client.prepareSearch("test").setPreference(preference + Integer.toString(counter++)).setQuery(QueryBuilders.termQuery("field", "test")).execute().actionGet();
|
||||
if (searchResponse.getHits().getTotalHits() != 1) {
|
||||
refresh();
|
||||
SearchResponse searchResponseAfterRefresh = client.prepareSearch("test").setPreference(preference).setQuery(QueryBuilders.termQuery("field", "test")).execute().actionGet();
|
||||
|
@ -93,8 +102,13 @@ public class SearchWhileCreatingIndexIT extends ESIntegTestCase {
|
|||
status = client().admin().cluster().prepareHealth("test").get().getStatus();
|
||||
internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + 1);
|
||||
}
|
||||
SearchResponse searchResponse = client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "test")).execute().actionGet();
|
||||
assertHitCount(searchResponse, 1);
|
||||
|
||||
for (String node : internalCluster().nodesInclude("test")) {
|
||||
SearchResponse searchResponse = client().prepareSearch("test")
|
||||
.setPreference("_prefer_nodes:" + node)
|
||||
.setQuery(QueryBuilders.termQuery("field", "test")).execute().actionGet();
|
||||
assertHitCount(searchResponse, 1);
|
||||
}
|
||||
cluster().wipeIndices("test");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -301,7 +301,6 @@ public class MatchedQueriesIT extends ESIntegTestCase {
|
|||
.should(queryStringQuery("dolor").queryName("dolor"))
|
||||
.should(queryStringQuery("elit").queryName("elit"))
|
||||
)
|
||||
.setPreference("_primary")
|
||||
.get();
|
||||
|
||||
assertHitCount(searchResponse, 2L);
|
||||
|
|
|
@ -107,7 +107,7 @@ public class RandomScoreFunctionIT extends ESIntegTestCase {
|
|||
for (int o = 0; o < outerIters; o++) {
|
||||
final int seed = randomInt();
|
||||
String preference = randomRealisticUnicodeOfLengthBetween(1, 10); // at least one char!!
|
||||
// randomPreference should not start with '_' (reserved for known preference types (e.g. _shards, _primary)
|
||||
// randomPreference should not start with '_' (reserved for known preference types (e.g. _shards)
|
||||
while (preference.startsWith("_")) {
|
||||
preference = randomRealisticUnicodeOfLengthBetween(1, 10);
|
||||
}
|
||||
|
|
|
@ -44,7 +44,6 @@ import static org.hamcrest.CoreMatchers.containsString;
|
|||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasToString;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
|
||||
|
@ -67,7 +66,7 @@ public class SearchPreferenceIT extends ESIntegTestCase {
|
|||
refresh();
|
||||
internalCluster().stopRandomDataNode();
|
||||
client().admin().cluster().prepareHealth().setWaitForStatus(ClusterHealthStatus.RED).execute().actionGet();
|
||||
String[] preferences = new String[] {"_primary", "_local", "_primary_first", "_prefer_nodes:somenode", "_prefer_nodes:server2", "_prefer_nodes:somenode,server2"};
|
||||
String[] preferences = new String[]{"_local", "_prefer_nodes:somenode", "_prefer_nodes:server2", "_prefer_nodes:somenode,server2"};
|
||||
for (String pref : preferences) {
|
||||
logger.info("--> Testing out preference={}", pref);
|
||||
SearchResponse searchResponse = client().prepareSearch().setSize(0).setPreference(pref).execute().actionGet();
|
||||
|
@ -113,54 +112,14 @@ public class SearchPreferenceIT extends ESIntegTestCase {
|
|||
client().prepareIndex("test", "type1").setSource("field1", "value1").execute().actionGet();
|
||||
refresh();
|
||||
|
||||
SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_local").execute().actionGet();
|
||||
SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).execute().actionGet();
|
||||
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
|
||||
|
||||
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_local").execute().actionGet();
|
||||
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
|
||||
|
||||
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_primary").execute().actionGet();
|
||||
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
|
||||
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_primary").execute().actionGet();
|
||||
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
|
||||
|
||||
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
|
||||
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
|
||||
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
|
||||
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
|
||||
|
||||
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet();
|
||||
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
|
||||
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet();
|
||||
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
|
||||
|
||||
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet();
|
||||
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
|
||||
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet();
|
||||
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
|
||||
}
|
||||
|
||||
public void testReplicaPreference() throws Exception {
|
||||
client().admin().indices().prepareCreate("test").setSettings("{\"number_of_replicas\": 0}", XContentType.JSON).get();
|
||||
ensureGreen();
|
||||
|
||||
client().prepareIndex("test", "type1").setSource("field1", "value1").execute().actionGet();
|
||||
refresh();
|
||||
|
||||
try {
|
||||
client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
|
||||
fail("should have failed because there are no replicas");
|
||||
} catch (Exception e) {
|
||||
// pass
|
||||
}
|
||||
|
||||
SearchResponse resp = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet();
|
||||
assertThat(resp.getHits().getTotalHits(), equalTo(1L));
|
||||
|
||||
client().admin().indices().prepareUpdateSettings("test").setSettings("{\"number_of_replicas\": 1}", XContentType.JSON).get();
|
||||
ensureGreen("test");
|
||||
|
||||
resp = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
|
||||
assertThat(resp.getHits().getTotalHits(), equalTo(1L));
|
||||
}
|
||||
|
||||
public void testThatSpecifyingNonExistingNodesReturnsUsefulError() throws Exception {
|
||||
|
|
|
@ -134,14 +134,12 @@ public class QueryProfilerIT extends ESIntegTestCase {
|
|||
.setQuery(q)
|
||||
.setProfile(false)
|
||||
.addSort("_id", SortOrder.ASC)
|
||||
.setPreference("_primary")
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH);
|
||||
|
||||
SearchRequestBuilder profile = client().prepareSearch("test")
|
||||
.setQuery(q)
|
||||
.setProfile(true)
|
||||
.addSort("_id", SortOrder.ASC)
|
||||
.setPreference("_primary")
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH);
|
||||
|
||||
MultiSearchResponse.Item[] responses = client().prepareMultiSearch()
|
||||
|
|
|
@ -79,7 +79,7 @@ public class SimpleSearchIT extends ESIntegTestCase {
|
|||
int iters = scaledRandomIntBetween(10, 20);
|
||||
for (int i = 0; i < iters; i++) {
|
||||
String randomPreference = randomUnicodeOfLengthBetween(0, 4);
|
||||
// randomPreference should not start with '_' (reserved for known preference types (e.g. _shards, _primary)
|
||||
// randomPreference should not start with '_' (reserved for known preference types (e.g. _shards)
|
||||
while (randomPreference.startsWith("_")) {
|
||||
randomPreference = randomUnicodeOfLengthBetween(0, 4);
|
||||
}
|
||||
|
|
|
@ -275,10 +275,6 @@ replicas.
|
|||
|
||||
The `preference` can be set to:
|
||||
|
||||
`_primary`::
|
||||
The operation will go and be executed only on the primary
|
||||
shards.
|
||||
|
||||
`_local`::
|
||||
The operation will prefer to be executed on a local
|
||||
allocated shard if possible.
|
||||
|
|
|
@ -91,8 +91,7 @@ will control the version of the document the operation is intended to be
|
|||
executed against. A good example of a use case for versioning is
|
||||
performing a transactional read-then-update. Specifying a `version` from
|
||||
the document initially read ensures no changes have happened in the
|
||||
meantime (when reading in order to update, it is recommended to set
|
||||
`preference` to `_primary`). For example:
|
||||
meantime. For example:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
|
@ -242,7 +241,7 @@ The result of the above index operation is:
|
|||
[[index-routing]]
|
||||
=== Routing
|
||||
|
||||
By default, shard placement — or `routing` — is controlled by using a
|
||||
By default, shard placement ? or `routing` ? is controlled by using a
|
||||
hash of the document's id value. For more explicit control, the value
|
||||
fed into the hash function used by the router can be directly specified
|
||||
on a per-operation basis using the `routing` parameter. For example:
|
||||
|
|
|
@ -6,8 +6,11 @@
|
|||
Due to cross-cluster search using `:` to separate a cluster and index name,
|
||||
cluster names may no longer contain `:`.
|
||||
|
||||
==== new default for `wait_for_active_shards` parameter of the open index command
|
||||
==== New default for `wait_for_active_shards` parameter of the open index command
|
||||
|
||||
The default value for the `wait_for_active_shards` parameter of the open index API
|
||||
is changed from 0 to 1, which means that the command will now by default wait for all
|
||||
primary shards of the opened index to be allocated.
|
||||
|
||||
==== Shard preferences `_primary`, `_primary_first`, `_replica`, and `_replica_first` are removed
|
||||
These shard preferences are removed in favour of the `_prefer_nodes` and `_only_nodes` preferences.
|
||||
|
|
|
@ -7,21 +7,6 @@ search. By default, the operation is randomized among the available shard copies
|
|||
The `preference` is a query string parameter which can be set to:
|
||||
|
||||
[horizontal]
|
||||
`_primary`::
|
||||
The operation will go and be executed only on the primary
|
||||
shards.
|
||||
|
||||
`_primary_first`::
|
||||
The operation will go and be executed on the primary
|
||||
shard, and if not available (failover), will execute on other shards.
|
||||
|
||||
`_replica`::
|
||||
The operation will go and be executed only on a replica shard.
|
||||
|
||||
`_replica_first`::
|
||||
The operation will go and be executed only on a replica shard, and if
|
||||
not available (failover), will execute on other shards.
|
||||
|
||||
`_local`::
|
||||
The operation will prefer to be executed on a local
|
||||
allocated shard if possible.
|
||||
|
@ -33,7 +18,7 @@ The `preference` is a query string parameter which can be set to:
|
|||
`_shards:2,3`::
|
||||
Restricts the operation to the specified shards. (`2`
|
||||
and `3` in this case). This preference can be combined with other
|
||||
preferences but it has to appear first: `_shards:2,3|_primary`
|
||||
preferences but it has to appear first: `_shards:2,3|_local`
|
||||
|
||||
`_only_nodes`::
|
||||
Restricts the operation to nodes specified in <<cluster,node specification>>
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.http.entity.StringEntity;
|
|||
import org.apache.http.util.EntityUtils;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.common.Booleans;
|
||||
import org.elasticsearch.common.CheckedFunction;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
@ -37,12 +38,15 @@ import org.elasticsearch.test.rest.yaml.ObjectPath;
|
|||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Base64;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
|
@ -227,17 +231,15 @@ public class FullClusterRestartIT extends ESRestTestCase {
|
|||
Map<String, Object> recoverRsp = toMap(client().performRequest("GET", "/" + index + "/_recovery"));
|
||||
logger.debug("--> recovery status:\n{}", recoverRsp);
|
||||
|
||||
Map<String, Object> responseBody = toMap(client().performRequest("GET", "/" + index + "/_search",
|
||||
Collections.singletonMap("preference", "_primary")));
|
||||
assertNoFailures(responseBody);
|
||||
int foundHits1 = (int) XContentMapValues.extractValue("hits.total", responseBody);
|
||||
|
||||
responseBody = toMap(client().performRequest("GET", "/" + index + "/_search",
|
||||
Collections.singletonMap("preference", "_replica")));
|
||||
assertNoFailures(responseBody);
|
||||
int foundHits2 = (int) XContentMapValues.extractValue("hits.total", responseBody);
|
||||
assertEquals(foundHits1, foundHits2);
|
||||
// TODO: do something more with the replicas! index?
|
||||
Set<Integer> counts = new HashSet<>();
|
||||
for (String node : dataNodes(index, client())) {
|
||||
Map<String, Object> responseBody = toMap(client().performRequest("GET", "/" + index + "/_search",
|
||||
Collections.singletonMap("preference", "_only_nodes:" + node)));
|
||||
assertNoFailures(responseBody);
|
||||
int hits = (int) XContentMapValues.extractValue("hits.total", responseBody);
|
||||
counts.add(hits);
|
||||
}
|
||||
assertEquals("All nodes should have a consistent number of documents", 1, counts.size());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -940,4 +942,15 @@ public class FullClusterRestartIT extends ESRestTestCase {
|
|||
logger.debug("Refreshing [{}]", index);
|
||||
client().performRequest("POST", "/" + index + "/_refresh");
|
||||
}
|
||||
|
||||
private List<String> dataNodes(String index, RestClient client) throws IOException {
|
||||
Response response = client.performRequest("GET", index + "/_stats", singletonMap("level", "shards"));
|
||||
List<String> nodes = new ArrayList<>();
|
||||
List<Object> shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards.0");
|
||||
for (Object shard : shardStats) {
|
||||
final String nodeId = ObjectPath.evaluate(shard, "routing.node");
|
||||
nodes.add(nodeId);
|
||||
}
|
||||
return nodes;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -171,9 +171,6 @@ public class IndexingIT extends ESRestTestCase {
|
|||
assertVersion(index, 5, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc5);
|
||||
assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 5);
|
||||
}
|
||||
// the number of documents on the primary and on the recovered replica should match the number of indexed documents
|
||||
assertCount(index, "_primary", 5);
|
||||
assertCount(index, "_replica", 5);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -232,9 +229,10 @@ public class IndexingIT extends ESRestTestCase {
|
|||
updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 1));
|
||||
ensureGreen();
|
||||
assertOK(client().performRequest("POST", index + "/_refresh"));
|
||||
// the number of documents on the primary and on the recovered replica should match the number of indexed documents
|
||||
assertCount(index, "_primary", numDocs);
|
||||
assertCount(index, "_replica", numDocs);
|
||||
|
||||
for (Shard shard : buildShards(index, nodes, newNodeClient)) {
|
||||
assertCount(index, "_only_nodes:" + shard.node.nodeName, numDocs);
|
||||
}
|
||||
assertSeqNoOnShards(index, nodes, numDocs, newNodeClient);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,8 +11,6 @@
|
|||
|
||||
- do:
|
||||
count:
|
||||
# we count through the primary in case there is a replica that has not yet fully recovered
|
||||
preference: _primary
|
||||
index: test_index
|
||||
|
||||
- match: {count: 2}
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.elasticsearch.cluster.routing.Preference;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -52,7 +51,7 @@ public class RandomizingClient extends FilterClient {
|
|||
SearchType.DFS_QUERY_THEN_FETCH,
|
||||
SearchType.QUERY_THEN_FETCH));
|
||||
if (random.nextInt(10) == 0) {
|
||||
defaultPreference = RandomPicks.randomFrom(random, EnumSet.of(Preference.PRIMARY_FIRST, Preference.LOCAL)).type();
|
||||
defaultPreference = Preference.LOCAL.type();
|
||||
} else if (random.nextInt(10) == 0) {
|
||||
String s = TestUtil.randomRealisticUnicodeString(random, 1, 10);
|
||||
defaultPreference = s.startsWith("_") ? null : s; // '_' is a reserved character
|
||||
|
|
Loading…
Reference in New Issue