Uneven distribution of search requests across shards, closes 1103.

This commit is contained in:
kimchy 2011-07-08 05:32:59 +03:00
parent 9876fa5a76
commit 0642acd9ac
6 changed files with 126 additions and 11 deletions

View File

@ -21,11 +21,15 @@ package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.search.type.*;
import org.elasticsearch.action.search.type.TransportSearchCountAction;
import org.elasticsearch.action.search.type.TransportSearchDfsQueryAndFetchAction;
import org.elasticsearch.action.search.type.TransportSearchDfsQueryThenFetchAction;
import org.elasticsearch.action.search.type.TransportSearchQueryAndFetchAction;
import org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction;
import org.elasticsearch.action.search.type.TransportSearchScanAction;
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.IndexMissingException;
@ -89,8 +93,8 @@ public class TransportSearchAction extends BaseAction<SearchRequest, SearchRespo
ClusterState clusterState = clusterService.state();
String[] concreteIndices = clusterState.metaData().concreteIndices(searchRequest.indices());
Map<String, Set<String>> routingMap = clusterState.metaData().resolveSearchRouting(searchRequest.routing(), searchRequest.indices());
GroupShardsIterator groupIt = clusterService.operationRouting().searchShards(clusterState, searchRequest.indices(), concreteIndices, searchRequest.queryHint(), routingMap, searchRequest.preference());
if (groupIt.size() == 1) {
int shardCount = clusterService.operationRouting().searchShardsCount(clusterState, searchRequest.indices(), concreteIndices, searchRequest.queryHint(), routingMap, searchRequest.preference());
if (shardCount == 1) {
// if we only have one group, then we always want Q_A_F, no need for DFS, and no need to do THEN since we hit one shard
searchRequest.searchType(QUERY_AND_FETCH);
}

View File

@ -46,5 +46,7 @@ public interface OperationRouting {
GroupShardsIterator deleteByQueryShards(ClusterState clusterState, String index, @Nullable Set<String> routing) throws IndexMissingException;
int searchShardsCount(ClusterState clusterState, String[] indices, String[] concreteIndices, @Nullable String queryHint, @Nullable Map<String, Set<String>> routing, @Nullable String preference) throws IndexMissingException;
GroupShardsIterator searchShards(ClusterState clusterState, String[] indices, String[] concreteIndices, @Nullable String queryHint, @Nullable Map<String, Set<String>> routing, @Nullable String preference) throws IndexMissingException;
}

View File

@ -100,6 +100,41 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
return new GroupShardsIterator(set);
}
@Override public int searchShardsCount(ClusterState clusterState, String[] indices, String[] concreteIndices, @Nullable String queryHint, @Nullable Map<String, Set<String>> routing, @Nullable String preference) throws IndexMissingException {
if (concreteIndices == null || concreteIndices.length == 0) {
concreteIndices = clusterState.metaData().concreteAllIndices();
}
if (routing != null) {
HashSet<ShardId> set = new HashSet<ShardId>();
for (String index : concreteIndices) {
IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
Set<String> effectiveRouting = routing.get(index);
if (effectiveRouting != null) {
for (String r : effectiveRouting) {
int shardId = shardId(clusterState, index, null, null, r);
IndexShardRoutingTable indexShard = indexRouting.shard(shardId);
if (indexShard == null) {
throw new IndexShardMissingException(new ShardId(index, shardId));
}
// we might get duplicates, but that's ok, its an estimated count? (we just want to know if its 1 or not)
set.add(indexShard.shardId());
}
}
}
return set.size();
} else {
// we use list here since we know we are not going to create duplicates
int count = 0;
for (String index : concreteIndices) {
IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
for (IndexShardRoutingTable indexShard : indexRouting) {
count++;
}
}
return count;
}
}
@Override public GroupShardsIterator searchShards(ClusterState clusterState, String[] indices, String[] concreteIndices, @Nullable String queryHint, @Nullable Map<String, Set<String>> routing, @Nullable String preference) throws IndexMissingException {
if (concreteIndices == null || concreteIndices.length == 0) {
concreteIndices = clusterState.metaData().concreteAllIndices();

View File

@ -308,9 +308,11 @@ public class InternalSearchHit implements SearchHit {
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (explanation() != null) {
builder.field("_shard", shard.shardId());
builder.field("_node", shard.nodeId());
}
builder.field(Fields._INDEX, shard.index());
// builder.field("_shard", shard.shardId());
// builder.field("_node", shard.nodeId());
builder.field(Fields._TYPE, type);
builder.field(Fields._ID, id);
if (version != -1) {

View File

@ -0,0 +1,57 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.structure;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.testng.annotations.Test;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.cluster.routing.RoutingBuilders.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
@Test
public class RoutingIteratorTests {
@Test public void testRandomRouting() {
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))
.put(newIndexMetaDataBuilder("test2").numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test1").initializeEmpty(metaData.index("test1")))
.add(indexRoutingTable("test2").initializeEmpty(metaData.index("test2")))
.build();
ShardIterator shardIterator = routingTable.index("test1").shard(0).shardsRandomIt();
assertThat(shardIterator.hasNext(), equalTo(true));
ShardRouting shardRouting1 = shardIterator.next();
shardIterator = routingTable.index("test1").shard(0).shardsRandomIt();
assertThat(shardIterator.hasNext(), equalTo(true));
ShardRouting shardRouting2 = shardIterator.next();
assertThat(shardRouting1, not(sameInstance(shardRouting2)));
}
}

View File

@ -53,12 +53,27 @@ public class SearchPreferenceTests extends AbstractNodesTests {
return client("server1");
}
@Test public void noPreferenceRandom() throws Exception {
client.admin().indices().prepareDelete().execute().actionGet();
client.admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("number_of_shards", 1).put("number_of_replicas", 1)).execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
client.prepareIndex("test", "type1").setSource("field1", "value1").execute().actionGet();
client.admin().indices().prepareRefresh().execute().actionGet();
SearchResponse searchResponse = client.prepareSearch("test").setQuery(matchAllQuery()).execute().actionGet();
String firstNodeId = searchResponse.hits().getAt(0).shard().nodeId();
searchResponse = client.prepareSearch("test").setQuery(matchAllQuery()).execute().actionGet();
String secondNodeId = searchResponse.hits().getAt(0).shard().nodeId();
assertThat(firstNodeId, not(equalTo(secondNodeId)));
}
@Test public void simplePreferenceTests() throws Exception {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareDelete().execute().actionGet();
client.admin().indices().prepareCreate("test").execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();