Search / Get: Allow to specify a preference on which shards (or order) it will be executed, closes #769.

This commit is contained in:
kimchy 2011-03-14 11:30:01 +02:00
parent 3192654e80
commit 1bcd3b67ee
19 changed files with 229 additions and 24 deletions

View File

@ -65,7 +65,7 @@ public class TransportBroadcastPingAction extends TransportBroadcastOperationAct
}
@Override protected GroupShardsIterator shards(BroadcastPingRequest request, ClusterState clusterState) {
return clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint(), null);
return clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint(), null, null);
}
@Override protected BroadcastPingResponse newResponse(BroadcastPingRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {

View File

@ -83,7 +83,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
}
@Override protected GroupShardsIterator shards(CountRequest request, ClusterState clusterState) {
return clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint(), request.routing());
return clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint(), request.routing(), null);
}
@Override protected void checkBlock(CountRequest request, ClusterState state) {

View File

@ -99,6 +99,15 @@ public class GetRequest extends SingleShardOperationRequest {
return this;
}
/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or
* a custom value, which guarantees that the same order will be used across different requests.
*/
public GetRequest preference(String preference) {
this.preference = preference;
return this;
}
/**
* Explicitly specify the fields that will be returned. By default, the <tt>_source</tt>

View File

@ -71,6 +71,7 @@ public class SearchRequest implements ActionRequest {
@Nullable private String queryHint;
@Nullable private String routing;
@Nullable private String preference;
private byte[] source;
private int sourceOffset;
@ -220,6 +221,20 @@ public class SearchRequest implements ActionRequest {
return this;
}
/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or
* a custom value, which guarantees that the same order will be used across different requests.
*/
public SearchRequest preference(String preference) {
this.preference = preference;
return this;
}
public String preference() {
return this.preference;
}
/**
* The search type to execute, defaults to {@link SearchType#DEFAULT}.
*/
@ -509,6 +524,9 @@ public class SearchRequest implements ActionRequest {
if (in.readBoolean()) {
routing = in.readUTF();
}
if (in.readBoolean()) {
preference = in.readUTF();
}
if (in.readBoolean()) {
scroll = readScroll(in);
@ -567,6 +585,12 @@ public class SearchRequest implements ActionRequest {
out.writeBoolean(true);
out.writeUTF(routing);
}
if (preference == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(preference);
}
if (scroll == null) {
out.writeBoolean(false);

View File

@ -85,7 +85,7 @@ public class TransportSearchAction extends BaseAction<SearchRequest, SearchRespo
try {
ClusterState clusterState = clusterService.state();
searchRequest.indices(clusterState.metaData().concreteIndices(searchRequest.indices()));
GroupShardsIterator groupIt = clusterService.operationRouting().searchShards(clusterState, searchRequest.indices(), searchRequest.queryHint(), searchRequest.routing());
GroupShardsIterator groupIt = clusterService.operationRouting().searchShards(clusterState, searchRequest.indices(), searchRequest.queryHint(), searchRequest.routing(), searchRequest.preference());
if (groupIt.size() == 1) {
// if we only have one group, then we always want Q_A_F, no need for DFS, and no need to do THEN since we hit one shard
searchRequest.searchType(QUERY_AND_FETCH);

View File

@ -110,7 +110,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index);
}
shardsIts = clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint(), request.routing());
shardsIts = clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint(), request.routing(), request.preference());
expectedSuccessfulOps = shardsIts.size();
// we need to add 1 for non active partition, since we count it in the total!
expectedTotalOps = shardsIts.totalSizeActiveWith1ForEmpty();

View File

@ -36,6 +36,7 @@ public abstract class SingleShardOperationRequest implements ActionRequest {
protected String type;
protected String id;
protected String routing;
protected String preference;
private boolean threadedListener = false;
private boolean threadedOperation = true;
@ -84,6 +85,10 @@ public abstract class SingleShardOperationRequest implements ActionRequest {
return this.routing;
}
public String preference() {
return this.preference;
}
/**
* Should the listener be called on a separate thread if needed.
*/
@ -118,6 +123,9 @@ public abstract class SingleShardOperationRequest implements ActionRequest {
if (in.readBoolean()) {
routing = in.readUTF();
}
if (in.readBoolean()) {
preference = in.readUTF();
}
// no need to pass threading over the network, they are always false when coming throw a thread pool
}
@ -131,6 +139,12 @@ public abstract class SingleShardOperationRequest implements ActionRequest {
out.writeBoolean(true);
out.writeUTF(routing);
}
if (preference == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(preference);
}
}
}

View File

@ -109,7 +109,7 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
checkBlock(request, clusterState);
this.shardIt = clusterService.operationRouting()
.getShards(clusterState, request.index(), request.type(), request.id(), request.routing());
.getShards(clusterState, request.index(), request.type(), request.id(), request.routing(), request.preference());
}
public void start() {

View File

@ -70,6 +70,16 @@ public class GetRequestBuilder extends BaseRequestBuilder<GetRequest, GetRespons
return this;
}
/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or
* a custom value, which guarantees that the same order will be used across different requests.
*/
public GetRequestBuilder setPreference(String preference) {
request.preference(preference);
return this;
}
/**
* Explicitly specify the fields that will be returned. By default, the <tt>_source</tt>
* field will be returned.

View File

@ -152,6 +152,16 @@ public class SearchRequestBuilder extends BaseRequestBuilder<SearchRequest, Sear
return this;
}
/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or
* a custom value, which guarantees that the same order will be used across different requests.
*/
public SearchRequestBuilder setPreference(String preference) {
request.preference(preference);
return this;
}
/**
* Controls the the search operation threading model.
*/

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.util.concurrent.jsr166y.ThreadLocalRandom;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@ -99,6 +100,9 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return count;
}
/**
* Returns a regular shard iterator.
*/
public ShardIterator shardsIt() {
return new PlainShardIterator(shardId, shards);
}
@ -107,6 +111,43 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return new PlainShardIterator(shardId, shards, index);
}
/**
* Returns an iterator only on the primary shard.
*/
public ShardIterator primaryShardIt() {
ShardRouting primary = primaryShard();
if (primary == null) {
return new PlainShardIterator(shardId, ImmutableList.<ShardRouting>of());
}
return new PlainShardIterator(shardId, ImmutableList.of(primary));
}
/**
* Prefers execution on the local node if applicable.
*/
public ShardIterator preferLocalShardsIt(String nodeId) {
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(this.shards.size());
// fill it in a randomized fashion
int index = counter.getAndIncrement();
for (int i = 0; i < this.shards.size(); i++) {
int loc = (index + i) % this.shards.size();
ordered.add(this.shards.get(loc));
}
// find the local one, and push it upfront
for (int i = 0; i < ordered.size(); i++) {
ShardRouting current = ordered.get(i);
if (nodeId.equals(current.currentNodeId())) {
ordered.set(i, ordered.get(0));
ordered.set(0, current);
break;
}
}
return new PlainShardIterator(shardId, ordered);
}
/**
* Returns a random shards iterator.
*/
public ShardIterator shardsRandomIt() {
return new PlainShardIterator(shardId, shards, counter.getAndIncrement());
}

View File

@ -37,9 +37,9 @@ public interface OperationRouting {
GroupShardsIterator broadcastDeleteShards(ClusterState clusterState, String index) throws IndexMissingException, IndexShardMissingException;
ShardIterator getShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException;
ShardIterator getShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing, @Nullable String preference) throws IndexMissingException, IndexShardMissingException;
GroupShardsIterator deleteByQueryShards(ClusterState clusterState, String index, @Nullable String routing) throws IndexMissingException;
GroupShardsIterator searchShards(ClusterState clusterState, String[] indices, @Nullable String queryHint, @Nullable String routing) throws IndexMissingException;
GroupShardsIterator searchShards(ClusterState clusterState, String[] indices, @Nullable String queryHint, @Nullable String routing, @Nullable String preference) throws IndexMissingException;
}

View File

@ -26,16 +26,20 @@ import org.elasticsearch.cluster.routing.operation.hash.HashFunction;
*/
public class DjbHashFunction implements HashFunction {
@Override public int hash(String routing) {
public static int DJB_HASH(String value) {
long hash = 5381;
for (int i = 0; i < routing.length(); i++) {
hash = ((hash << 5) + hash) + routing.charAt(i);
for (int i = 0; i < value.length(); i++) {
hash = ((hash << 5) + hash) + value.charAt(i);
}
return (int) hash;
}
@Override public int hash(String routing) {
return DJB_HASH(routing);
}
@Override public int hash(String type, String id) {
long hash = 5381;

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.cluster.routing.operation.hash.HashFunction;
import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -65,8 +66,8 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
return shards(clusterState, index, type, id, routing).shardsIt();
}
@Override public ShardIterator getShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException {
return shards(clusterState, index, type, id, routing).shardsRandomIt();
@Override public ShardIterator getShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing, @Nullable String preference) throws IndexMissingException, IndexShardMissingException {
return preferenceShardIterator(shards(clusterState, index, type, id, routing), clusterState.nodes().localNodeId(), preference);
}
@Override public GroupShardsIterator broadcastDeleteShards(ClusterState clusterState, String index) throws IndexMissingException {
@ -97,7 +98,7 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
return new GroupShardsIterator(set);
}
@Override public GroupShardsIterator searchShards(ClusterState clusterState, String[] indices, @Nullable String queryHint, @Nullable String routing) throws IndexMissingException {
@Override public GroupShardsIterator searchShards(ClusterState clusterState, String[] indices, @Nullable String queryHint, @Nullable String routing, @Nullable String preference) throws IndexMissingException {
if (indices == null || indices.length == 0) {
indices = clusterState.metaData().concreteAllIndices();
}
@ -119,7 +120,7 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
throw new IndexShardMissingException(new ShardId(index, shardId));
}
// we might get duplicates, but that's ok, they will override one another
set.add(indexShard.shardsRandomIt());
set.add(preferenceShardIterator(indexShard, clusterState.nodes().localNodeId(), preference));
}
}
return new GroupShardsIterator(set);
@ -129,13 +130,27 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
for (String index : indices) {
IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
for (IndexShardRoutingTable indexShard : indexRouting) {
set.add(indexShard.shardsRandomIt());
set.add(preferenceShardIterator(indexShard, clusterState.nodes().localNodeId(), preference));
}
}
return new GroupShardsIterator(set);
}
}
private ShardIterator preferenceShardIterator(IndexShardRoutingTable indexShard, String nodeId, @Nullable String preference) {
if (preference == null) {
return indexShard.shardsRandomIt();
}
if ("_local".equals(preference)) {
return indexShard.preferLocalShardsIt(nodeId);
}
if ("_primary".equals(preference)) {
return indexShard.primaryShardIt();
}
// if not, then use it as the index
return indexShard.shardsIt(DjbHashFunction.DJB_HASH(preference));
}
public IndexMetaData indexMetaData(ClusterState clusterState, String index) {
IndexMetaData indexMetaData = clusterState.metaData().index(index);
if (indexMetaData == null) {

View File

@ -59,7 +59,7 @@ public class RestGetAction extends BaseRestHandler {
getRequest.operationThreaded(true);
getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh()));
getRequest.routing(request.param("routing"));
getRequest.preference(request.param("preference"));
String sField = request.param("fields");
if (sField != null) {

View File

@ -144,6 +144,7 @@ public class RestSearchAction extends BaseRestHandler {
searchRequest.types(RestActions.splitTypes(request.param("type")));
searchRequest.queryHint(request.param("query_hint"));
searchRequest.routing(request.param("routing"));
searchRequest.preference(request.param("preference"));
return searchRequest;
}

View File

@ -114,7 +114,7 @@ public class ThreeShardsEmbeddedSearchTests extends AbstractNodesTests {
.from(0).size(60).explain(true).indexBoost("test", 1.0f).indexBoost("test2", 2.0f);
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.QUERY_THEN_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -182,7 +182,7 @@ public class ThreeShardsEmbeddedSearchTests extends AbstractNodesTests {
.from(0).size(60).explain(true).sort("age", SortOrder.ASC);
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.QUERY_THEN_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -276,7 +276,7 @@ public class ThreeShardsEmbeddedSearchTests extends AbstractNodesTests {
}
Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = newHashMap();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.QUERY_AND_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -328,7 +328,7 @@ public class ThreeShardsEmbeddedSearchTests extends AbstractNodesTests {
.facet(FacetBuilders.queryFacet("test1", termQuery("name", "test1")));
Map<SearchShardTarget, QuerySearchResultProvider> queryResults = newHashMap();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.QUERY_THEN_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));

View File

@ -122,7 +122,7 @@ public class ThreeShardsUnbalancedShardsEmbeddedSearchTests extends AbstractNode
.from(0).size(60).explain(true);
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.DFS_QUERY_THEN_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -189,7 +189,7 @@ public class ThreeShardsUnbalancedShardsEmbeddedSearchTests extends AbstractNode
.from(0).size(60).explain(true).sort("age", SortOrder.ASC);
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.DFS_QUERY_THEN_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -279,7 +279,7 @@ public class ThreeShardsUnbalancedShardsEmbeddedSearchTests extends AbstractNode
// do this with dfs, since we have uneven distribution of docs between shards
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.QUERY_AND_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -334,7 +334,7 @@ public class ThreeShardsUnbalancedShardsEmbeddedSearchTests extends AbstractNode
.facet(queryFacet("test1").query(termQuery("name", "test1")));
Map<SearchShardTarget, QuerySearchResultProvider> queryResults = newHashMap();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.QUERY_THEN_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));

View File

@ -0,0 +1,77 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.search.preference;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.elasticsearch.index.query.xcontent.QueryBuilders.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
@Test
public class SearchPreferenceTests extends AbstractNodesTests {
private Client client;
@BeforeClass public void createNodes() throws Exception {
Settings settings = settingsBuilder().put("number_of_shards", 3).put("number_of_replicas", 1).build();
startNode("server1", settings);
startNode("server2", settings);
client = getClient();
}
@AfterClass public void closeNodes() {
client.close();
closeAllNodes();
}
protected Client getClient() {
return client("server1");
}
@Test public void simplePreferenceTests() throws Exception {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test").execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
client.prepareIndex("test", "type1").setSource("field1", "value1").execute().actionGet();
client.admin().indices().prepareRefresh().execute().actionGet();
SearchResponse searchResponse = client.prepareSearch().setQuery(matchAllQuery()).setPreference("_local").execute().actionGet();
assertThat(searchResponse.hits().totalHits(), equalTo(1l));
searchResponse = client.prepareSearch().setQuery(matchAllQuery()).setPreference("_primary").execute().actionGet();
assertThat(searchResponse.hits().totalHits(), equalTo(1l));
searchResponse = client.prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet();
assertThat(searchResponse.hits().totalHits(), equalTo(1l));
}
}