move operation routing to be cluster level and not index level

This commit is contained in:
kimchy 2010-10-19 16:23:39 +02:00
parent c38f07a713
commit 4c8978237f
29 changed files with 133 additions and 146 deletions

View File

@ -62,7 +62,7 @@ public class TransportBroadcastPingAction extends TransportBroadcastOperationAct
}
@Override protected GroupShardsIterator shards(BroadcastPingRequest request, ClusterState clusterState) {
return indicesService.searchShards(clusterState, request.indices(), request.queryHint());
return clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint());
}
@Override protected BroadcastPingResponse newResponse(BroadcastPingRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {

View File

@ -161,10 +161,10 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
ShardId shardId = null;
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
shardId = indicesService.indexServiceSafe(indexRequest.index()).operationRouting().indexShards(clusterState, indexRequest.type(), indexRequest.id()).shardId();
shardId = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id()).shardId();
} else if (request instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request;
shardId = indicesService.indexServiceSafe(deleteRequest.index()).operationRouting().deleteShards(clusterState, deleteRequest.type(), deleteRequest.id()).shardId();
shardId = clusterService.operationRouting().deleteShards(clusterState, deleteRequest.index(), deleteRequest.type(), deleteRequest.id()).shardId();
}
List<BulkItemRequest> list = requestsByShard.get(shardId);
if (list == null) {

View File

@ -76,7 +76,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
}
@Override protected GroupShardsIterator shards(CountRequest request, ClusterState clusterState) {
return indicesService.searchShards(clusterState, request.indices(), request.queryHint());
return clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint());
}
@Override protected void checkBlock(CountRequest request, ClusterState state) {

View File

@ -111,7 +111,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
}
@Override protected ShardsIterator shards(ClusterState clusterState, DeleteRequest request) {
return indicesService.indexServiceSafe(request.index()).operationRouting()
.deleteShards(clusterService.state(), request.type(), request.id());
return clusterService.operationRouting()
.deleteShards(clusterService.state(), request.index(), request.type(), request.id());
}
}

View File

@ -75,7 +75,7 @@ public class TransportIndexDeleteByQueryAction extends TransportIndexReplication
}
@Override protected GroupShardsIterator shards(IndexDeleteByQueryRequest request) {
return indicesService.indexServiceSafe(request.index()).operationRouting().deleteByQueryShards(clusterService.state());
return clusterService.operationRouting().deleteByQueryShards(clusterService.state(), request.index());
}
@Override protected ShardDeleteByQueryRequest newShardRequestInstance(IndexDeleteByQueryRequest request, int shardId) {

View File

@ -72,7 +72,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
}
@Override protected ShardsIterator shards(ClusterState clusterState, ShardDeleteByQueryRequest request) {
GroupShardsIterator group = indicesService.indexServiceSafe(request.index()).operationRouting().deleteByQueryShards(clusterService.state());
GroupShardsIterator group = clusterService.operationRouting().deleteByQueryShards(clusterService.state(), request.index());
for (ShardsIterator shards : group) {
if (shards.shardId().id() == request.shardId()) {
return shards;

View File

@ -124,8 +124,8 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
@Override protected ShardsIterator shards(ClusterState clusterState, IndexRequest request) {
return indicesService.indexServiceSafe(request.index()).operationRouting()
.indexShards(clusterService.state(), request.type(), request.id());
return clusterService.operationRouting()
.indexShards(clusterService.state(), request.index(), request.type(), request.id());
}
@Override protected IndexResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {

View File

@ -114,7 +114,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index);
}
shardsIts = indicesService.searchShards(clusterState, request.indices(), request.queryHint());
shardsIts = clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint());
expectedSuccessfulOps = shardsIts.size();
expectedTotalOps = shardsIts.totalSizeActive();

View File

@ -105,8 +105,8 @@ public abstract class TransportSingleOperationAction<Request extends SingleOpera
checkBlock(request, clusterState);
this.shardsIt = indicesService.indexServiceSafe(request.index()).operationRouting()
.getShards(clusterState, request.type(), request.id());
this.shardsIt = clusterService.operationRouting()
.getShards(clusterState, request.index(), request.type(), request.id());
}
public void start() {

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.*;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationModule;
import org.elasticsearch.cluster.routing.operation.OperationRoutingModule;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.AbstractModule;
@ -46,7 +47,7 @@ public class ClusterModule extends AbstractModule implements SpawnModules {
}
@Override public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(new ShardAllocationModule(settings));
return ImmutableList.of(new ShardAllocationModule(settings), new OperationRoutingModule(settings));
}
@Override

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.unit.TimeValue;
@ -41,6 +42,9 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
*/
ClusterState state();
OperationRouting operationRouting();
/**
* Adds a listener for updated cluster states.
*/

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.routing;
package org.elasticsearch.cluster.routing.operation;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
@ -28,20 +28,17 @@ import org.elasticsearch.indices.IndexMissingException;
import javax.annotation.Nullable;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public interface OperationRouting {
ShardsIterator indexShards(ClusterState clusterState, String type, String id) throws IndexMissingException, IndexShardMissingException;
ShardsIterator indexShards(ClusterState clusterState, String index, String type, String id) throws IndexMissingException, IndexShardMissingException;
ShardsIterator deleteShards(ClusterState clusterState, String type, String id) throws IndexMissingException, IndexShardMissingException;
ShardsIterator deleteShards(ClusterState clusterState, String index, String type, String id) throws IndexMissingException, IndexShardMissingException;
ShardsIterator getShards(ClusterState clusterState, String type, String id) throws IndexMissingException, IndexShardMissingException;
ShardsIterator getShards(ClusterState clusterState, String index, String type, String id) throws IndexMissingException, IndexShardMissingException;
/**
* Returns the shards grouped by shard
*/
GroupShardsIterator deleteByQueryShards(ClusterState clusterState) throws IndexMissingException;
GroupShardsIterator deleteByQueryShards(ClusterState clusterState, String index) throws IndexMissingException;
GroupShardsIterator searchShards(ClusterState clusterState, @Nullable String queryHint) throws IndexMissingException;
GroupShardsIterator searchShards(ClusterState clusterState, String[] indices, @Nullable String queryHint) throws IndexMissingException;
}

View File

@ -17,16 +17,16 @@
* under the License.
*/
package org.elasticsearch.index.routing;
package org.elasticsearch.cluster.routing.operation;
import org.elasticsearch.cluster.routing.operation.hash.HashFunction;
import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction;
import org.elasticsearch.cluster.routing.operation.plain.PlainOperationRoutingModule;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.routing.hash.HashFunction;
import org.elasticsearch.index.routing.hash.djb.DjbHashFunction;
import org.elasticsearch.index.routing.plain.PlainOperationRoutingModule;
import static org.elasticsearch.common.inject.Modules.*;
@ -35,17 +35,17 @@ import static org.elasticsearch.common.inject.Modules.*;
*/
public class OperationRoutingModule extends AbstractModule implements SpawnModules {
private final Settings indexSettings;
private final Settings settings;
public OperationRoutingModule(Settings indexSettings) {
this.indexSettings = indexSettings;
public OperationRoutingModule(Settings settings) {
this.settings = settings;
}
@Override public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(createModule(indexSettings.getAsClass("index.routing.type", PlainOperationRoutingModule.class, "org.elasticsearch.index.routing.", "OperationRoutingModule"), indexSettings));
return ImmutableList.of(createModule(settings.getAsClass("cluster.routing.operation.type", PlainOperationRoutingModule.class, "org.elasticsearch.cluster.routing.operation.", "OperationRoutingModule"), settings));
}
@Override protected void configure() {
bind(HashFunction.class).to(indexSettings.getAsClass("index.routing.hash.type", DjbHashFunction.class, "org.elasticsearch.index.routing.hash.", "HashFunction")).asEagerSingleton();
bind(HashFunction.class).to(settings.getAsClass("cluster.routing.operation.hash.type", DjbHashFunction.class, "org.elasticsearch.cluster.routing.operation.hash.", "HashFunction")).asEagerSingleton();
}
}

View File

@ -17,10 +17,10 @@
* under the License.
*/
package org.elasticsearch.index.routing.hash;
package org.elasticsearch.cluster.routing.operation.hash;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public interface HashFunction {

View File

@ -17,12 +17,12 @@
* under the License.
*/
package org.elasticsearch.index.routing.hash.djb;
package org.elasticsearch.cluster.routing.operation.hash.djb;
import org.elasticsearch.index.routing.hash.HashFunction;
import org.elasticsearch.cluster.routing.operation.hash.HashFunction;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class DjbHashFunction implements HashFunction {

View File

@ -17,12 +17,12 @@
* under the License.
*/
package org.elasticsearch.index.routing.hash.simple;
package org.elasticsearch.cluster.routing.operation.hash.simple;
import org.elasticsearch.index.routing.hash.HashFunction;
import org.elasticsearch.cluster.routing.operation.hash.HashFunction;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class SimpleHashFunction implements HashFunction {

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.routing.plain;
package org.elasticsearch.cluster.routing.operation.plain;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -25,77 +25,82 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.cluster.routing.operation.hash.HashFunction;
import org.elasticsearch.common.collect.IdentityHashSet;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.routing.OperationRouting;
import org.elasticsearch.index.routing.hash.HashFunction;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexMissingException;
import javax.annotation.Nullable;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class PlainOperationRouting extends AbstractIndexComponent implements OperationRouting {
public class PlainOperationRouting extends AbstractComponent implements OperationRouting {
private final HashFunction hashFunction;
@Inject public PlainOperationRouting(Index index, @IndexSettings Settings indexSettings, HashFunction hashFunction) {
super(index, indexSettings);
@Inject public PlainOperationRouting(Settings indexSettings, HashFunction hashFunction) {
super(indexSettings);
this.hashFunction = hashFunction;
}
@Override public ShardsIterator indexShards(ClusterState clusterState, String type, String id) throws IndexMissingException, IndexShardMissingException {
return shards(clusterState, type, id).shardsIt();
@Override public ShardsIterator indexShards(ClusterState clusterState, String index, String type, String id) throws IndexMissingException, IndexShardMissingException {
return shards(clusterState, index, type, id).shardsIt();
}
@Override public ShardsIterator deleteShards(ClusterState clusterState, String type, String id) throws IndexMissingException, IndexShardMissingException {
return shards(clusterState, type, id).shardsIt();
@Override public ShardsIterator deleteShards(ClusterState clusterState, String index, String type, String id) throws IndexMissingException, IndexShardMissingException {
return shards(clusterState, index, type, id).shardsIt();
}
@Override public ShardsIterator getShards(ClusterState clusterState, String type, String id) throws IndexMissingException, IndexShardMissingException {
return shards(clusterState, type, id).shardsRandomIt();
@Override public ShardsIterator getShards(ClusterState clusterState, String index, String type, String id) throws IndexMissingException, IndexShardMissingException {
return shards(clusterState, index, type, id).shardsRandomIt();
}
@Override public GroupShardsIterator deleteByQueryShards(ClusterState clusterState) throws IndexMissingException {
return indexRoutingTable(clusterState).groupByShardsIt();
@Override public GroupShardsIterator deleteByQueryShards(ClusterState clusterState, String index) throws IndexMissingException {
return indexRoutingTable(clusterState, index).groupByShardsIt();
}
@Override public GroupShardsIterator searchShards(ClusterState clusterState, String[] indices, @Nullable String queryHint) throws IndexMissingException {
if (indices == null || indices.length == 0) {
indices = clusterState.metaData().concreteAllIndices();
}
@Override public GroupShardsIterator searchShards(ClusterState clusterState, @Nullable String queryHint) throws IndexMissingException {
IdentityHashSet<ShardsIterator> set = new IdentityHashSet<ShardsIterator>();
IndexRoutingTable indexRouting = indexRoutingTable(clusterState);
for (String index : indices) {
IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
for (IndexShardRoutingTable indexShard : indexRouting) {
set.add(indexShard.shardsRandomIt());
}
}
return new GroupShardsIterator(set);
}
public IndexMetaData indexMetaData(ClusterState clusterState) {
IndexMetaData indexMetaData = clusterState.metaData().index(index.name());
public IndexMetaData indexMetaData(ClusterState clusterState, String index) {
IndexMetaData indexMetaData = clusterState.metaData().index(index);
if (indexMetaData == null) {
throw new IndexMissingException(index);
throw new IndexMissingException(new Index(index));
}
return indexMetaData;
}
protected IndexRoutingTable indexRoutingTable(ClusterState clusterState) {
IndexRoutingTable indexRouting = clusterState.routingTable().index(index.name());
protected IndexRoutingTable indexRoutingTable(ClusterState clusterState, String index) {
IndexRoutingTable indexRouting = clusterState.routingTable().index(index);
if (indexRouting == null) {
throw new IndexMissingException(index);
throw new IndexMissingException(new Index(index));
}
return indexRouting;
}
protected IndexShardRoutingTable shards(ClusterState clusterState, String type, String id) {
int shardId = Math.abs(hash(type, id)) % indexMetaData(clusterState).numberOfShards();
IndexShardRoutingTable indexShard = indexRoutingTable(clusterState).shard(shardId);
protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String type, String id) {
int shardId = Math.abs(hash(type, id)) % indexMetaData(clusterState, index).numberOfShards();
IndexShardRoutingTable indexShard = indexRoutingTable(clusterState, index).shard(shardId);
if (indexShard == null) {
throw new IndexShardMissingException(new ShardId(index, shardId));
}

View File

@ -17,10 +17,10 @@
* under the License.
*/
package org.elasticsearch.index.routing.plain;
package org.elasticsearch.cluster.routing.operation.plain;
import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.index.routing.OperationRouting;
/**
* @author kimchy (Shay Banon)

View File

@ -23,6 +23,7 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
@ -58,6 +59,8 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private final DiscoveryService discoveryService;
private final OperationRouting operationRouting;
private final TransportService transportService;
private volatile ExecutorService updateTasksExecutor;
@ -68,9 +71,10 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private volatile ClusterState clusterState = newClusterStateBuilder().build();
@Inject public InternalClusterService(Settings settings, DiscoveryService discoveryService, TransportService transportService, ThreadPool threadPool,
@Inject public InternalClusterService(Settings settings, DiscoveryService discoveryService, OperationRouting operationRouting, TransportService transportService, ThreadPool threadPool,
TimerService timerService) {
super(settings);
this.operationRouting = operationRouting;
this.transportService = transportService;
this.discoveryService = discoveryService;
this.threadPool = threadPool;
@ -102,6 +106,10 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
return discoveryService.localNode();
}
@Override public OperationRouting operationRouting() {
return operationRouting;
}
public ClusterState state() {
return this.clusterState;
}

View File

@ -30,7 +30,6 @@ import org.elasticsearch.index.engine.IndexEngine;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.routing.OperationRouting;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
@ -46,8 +45,6 @@ public interface IndexService extends IndexComponent, Iterable<IndexShard>, Clos
IndexCache cache();
OperationRouting operationRouting();
MapperService mapperService();
IndexQueryParserService queryParserService();

View File

@ -43,7 +43,6 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerModule;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.routing.OperationRouting;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.IndexShardManagement;
import org.elasticsearch.index.shard.IndexShardModule;
@ -100,8 +99,6 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
private final IndexStore indexStore;
private final OperationRouting operationRouting;
private volatile ImmutableMap<Integer, Injector> shardsInjectors = ImmutableMap.of();
private volatile ImmutableMap<Integer, IndexShard> shards = ImmutableMap.of();
@ -110,7 +107,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
@Inject public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, ThreadPool threadPool,
MapperService mapperService, IndexQueryParserService queryParserService, SimilarityService similarityService,
IndexCache indexCache, IndexEngine indexEngine, IndexGateway indexGateway, IndexStore indexStore, OperationRouting operationRouting) {
IndexCache indexCache, IndexEngine indexEngine, IndexGateway indexGateway, IndexStore indexStore) {
super(index, indexSettings);
this.injector = injector;
this.threadPool = threadPool;
@ -122,7 +119,6 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
this.indexEngine = indexEngine;
this.indexGateway = indexGateway;
this.indexStore = indexStore;
this.operationRouting = operationRouting;
this.pluginsService = injector.getInstance(PluginsService.class);
this.indicesLifecycle = (InternalIndicesLifecycle) injector.getInstance(IndicesLifecycle.class);
@ -174,10 +170,6 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
return indexCache;
}
@Override public OperationRouting operationRouting() {
return operationRouting;
}
@Override public MapperService mapperService() {
return mapperService;
}

View File

@ -20,8 +20,6 @@
package org.elasticsearch.indices;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadSafe;
@ -52,13 +50,6 @@ public interface IndicesService extends Iterable<IndexService>, LifecycleCompone
IndexService indexServiceSafe(String index) throws IndexMissingException;
/**
* Gets all the "searchable" shards on all the given indices.
*
* @see org.elasticsearch.index.routing.OperationRouting#searchShards(org.elasticsearch.cluster.ClusterState, String)
*/
GroupShardsIterator searchShards(ClusterState clusterState, String[] indices, String queryHint) throws ElasticSearchException;
IndexService createIndex(String index, Settings settings, String localNodeId) throws ElasticSearchException;
void deleteIndex(String index) throws ElasticSearchException;

View File

@ -23,8 +23,6 @@ import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.FieldCache;
import org.apache.lucene.search.IndexReaderPurgedListener;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.collect.UnmodifiableIterator;
@ -48,7 +46,6 @@ import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.gateway.IndexGatewayModule;
import org.elasticsearch.index.mapper.MapperServiceModule;
import org.elasticsearch.index.query.IndexQueryParserModule;
import org.elasticsearch.index.routing.OperationRoutingModule;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettingsModule;
import org.elasticsearch.index.shard.service.IndexShard;
@ -197,18 +194,6 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
return indexService;
}
@Override public GroupShardsIterator searchShards(ClusterState clusterState, String[] indexNames, String queryHint) throws ElasticSearchException {
if (indexNames == null || indexNames.length == 0) {
ImmutableMap<String, IndexService> indices = this.indices;
indexNames = indices.keySet().toArray(new String[indices.keySet().size()]);
}
GroupShardsIterator its = new GroupShardsIterator();
for (String index : indexNames) {
its.add(indexServiceSafe(index).operationRouting().searchShards(clusterState, queryHint));
}
return its;
}
public synchronized IndexService createIndex(String sIndexName, Settings settings, String localNodeId) throws ElasticSearchException {
Index index = new Index(sIndexName);
if (indicesInjectors.containsKey(index.name())) {
@ -240,7 +225,6 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
modules.add(new IndexQueryParserModule(indexSettings));
modules.add(new MapperServiceModule());
modules.add(new IndexGatewayModule(indexSettings, injector.getInstance(Gateway.class)));
modules.add(new OperationRoutingModule(indexSettings));
modules.add(new IndexModule());
Injector indexInjector = modules.createChildInjector(injector);

View File

@ -55,7 +55,7 @@ import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class SingleInstanceEmbeddedSearchTests extends AbstractNodesTests {

View File

@ -137,6 +137,7 @@ public class TransportTwoServersSearchTests extends AbstractNodesTests {
@Test public void testQueryThenFetch() throws Exception {
SearchSourceBuilder source = searchSource()
.query(termQuery("multi", "test"))
.sort("nid", SortOrder.DESC) // we have to sort here to have some ordering with dist scoring
.from(0).size(60).explain(true);
SearchResponse searchResponse = client.search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH).scroll(new Scroll(timeValueMinutes(10)))).actionGet();
@ -191,6 +192,11 @@ public class TransportTwoServersSearchTests extends AbstractNodesTests {
.query(termQuery("multi", "test"))
.from(0).size(20).explain(true);
Set<String> expectedIds = Sets.newHashSet();
for (int i = 0; i < 100; i++) {
expectedIds.add(Integer.toString(i));
}
SearchResponse searchResponse = client.search(searchRequest("test").source(source).searchType(QUERY_AND_FETCH).scroll(new Scroll(timeValueMinutes(10)))).actionGet();
assertThat("Failures " + Arrays.toString(searchResponse.shardFailures()), searchResponse.shardFailures().length, equalTo(0));
assertThat(searchResponse.hits().totalHits(), equalTo(100l));
@ -199,17 +205,15 @@ public class TransportTwoServersSearchTests extends AbstractNodesTests {
SearchHit hit = searchResponse.hits().hits()[i];
// System.out.println(hit.shard() + ": " + hit.explanation());
assertThat(hit.explanation(), notNullValue());
assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - i - 1)));
// we can't really check here, since its query and fetch, and not controlling distribution
// assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - i - 1)));
assertThat("make sure we don't have duplicates", expectedIds.remove(hit.id()), notNullValue());
}
searchResponse = client.searchScroll(searchScrollRequest(searchResponse.scrollId())).actionGet();
assertThat(searchResponse.hits().totalHits(), equalTo(100l));
assertThat(searchResponse.hits().hits().length, equalTo(40));
Set<String> expectedIds = Sets.newHashSet();
for (int i = 0; i < 40; i++) {
expectedIds.add(Integer.toString(i));
}
for (int i = 0; i < 40; i++) {
SearchHit hit = searchResponse.hits().hits()[i];
// assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - 60 - 1 - i)));
@ -224,6 +228,12 @@ public class TransportTwoServersSearchTests extends AbstractNodesTests {
.query(termQuery("multi", "test"))
.from(0).size(20).explain(true);
Set<String> expectedIds = Sets.newHashSet();
for (int i = 0; i < 100; i++) {
expectedIds.add(Integer.toString(i));
}
SearchResponse searchResponse = client.search(searchRequest("test").source(source).searchType(DFS_QUERY_AND_FETCH).scroll(new Scroll(timeValueMinutes(10)))).actionGet();
assertThat("Failures " + Arrays.toString(searchResponse.shardFailures()), searchResponse.shardFailures().length, equalTo(0));
assertThat(searchResponse.hits().totalHits(), equalTo(100l));
@ -232,17 +242,14 @@ public class TransportTwoServersSearchTests extends AbstractNodesTests {
SearchHit hit = searchResponse.hits().hits()[i];
// System.out.println(hit.shard() + ": " + hit.explanation());
assertThat(hit.explanation(), notNullValue());
assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - i - 1)));
// assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - i - 1)));
assertThat("make sure we don't have duplicates", expectedIds.remove(hit.id()), notNullValue());
}
searchResponse = client.searchScroll(searchScrollRequest(searchResponse.scrollId())).actionGet();
assertThat(searchResponse.hits().totalHits(), equalTo(100l));
assertThat(searchResponse.hits().hits().length, equalTo(40));
Set<String> expectedIds = Sets.newHashSet();
for (int i = 0; i < 40; i++) {
expectedIds.add(Integer.toString(i));
}
for (int i = 0; i < 40; i++) {
SearchHit hit = searchResponse.hits().hits()[i];
// System.out.println(hit.shard() + ": " + hit.explanation());
@ -325,6 +332,7 @@ public class TransportTwoServersSearchTests extends AbstractNodesTests {
}
return jsonBuilder().startObject()
.field("id", id)
.field("nid", Integer.parseInt(id))
.field("name", nameValue + id)
.field("age", age)
.field("multi", multi.toString())

View File

@ -114,7 +114,7 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
.from(0).size(60).explain(true).indexBoost("test", 1.0f).indexBoost("test2", 2.0f);
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardsIterator shardsIt : indicesService.searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardRouting shardRouting : shardsIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -182,7 +182,7 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
.from(0).size(60).explain(true).sort("age", SortOrder.ASC);
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardsIterator shardsIt : indicesService.searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardRouting shardRouting : shardsIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -270,8 +270,13 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
.query(termQuery("multi", "test"))
.from(0).size(20).explain(true);
Set<String> expectedIds = Sets.newHashSet();
for (int i = 0; i < 100; i++) {
expectedIds.add(Integer.toString(i));
}
Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = newHashMap();
for (ShardsIterator shardsIt : indicesService.searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardRouting shardRouting : shardsIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -289,7 +294,8 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
for (int i = 0; i < 60; i++) {
SearchHit hit = hits.hits()[i];
// System.out.println(hit.id() + " " + hit.explanation());
assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - i - 1)));
// assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - i - 1)));
assertThat("make sure we don't have duplicates", expectedIds.remove(hit.id()), notNullValue());
}
// scrolling with query+fetch is not perfect when it comes to dist sorting
@ -304,10 +310,6 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
hits = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults).hits();
assertThat(hits.totalHits(), equalTo(100l));
assertThat(hits.hits().length, equalTo(40));
Set<String> expectedIds = Sets.newHashSet();
for (int i = 0; i < 40; i++) {
expectedIds.add(Integer.toString(i));
}
for (int i = 0; i < 40; i++) {
SearchHit hit = hits.hits()[i];
// System.out.println(hit.id() + " " + hit.explanation());
@ -326,7 +328,7 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
.facet(FacetBuilders.queryFacet("test1", termQuery("name", "test1")));
Map<SearchShardTarget, QuerySearchResultProvider> queryResults = newHashMap();
for (ShardsIterator shardsIt : indicesService.searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardRouting shardRouting : shardsIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -370,6 +372,6 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
for (int i = 0; i < age; i++) {
multi.append(" ").append(nameValue);
}
return "{ type1 : { \"id\" : \"" + id + "\", \"name\" : \"" + (nameValue + id) + "\", age : " + age + ", multi : \"" + multi.toString() + "\", _boost : " + (age * 10) + " } }";
return "{ type1 : { \"id\" : \"" + id + "\", \"nid\" : " + id + ", \"name\" : \"" + (nameValue + id) + "\", age : " + age + ", multi : \"" + multi.toString() + "\", _boost : " + (age * 10) + " } }";
}
}

View File

@ -24,16 +24,14 @@ import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.cluster.routing.operation.plain.PlainOperationRouting;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.trove.ExtTIntArrayList;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.routing.OperationRouting;
import org.elasticsearch.index.routing.plain.PlainOperationRouting;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.search.*;
@ -73,7 +71,7 @@ import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNodesTests {
@ -120,7 +118,7 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode
.from(0).size(60).explain(true);
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardsIterator shardsIt : indicesService.searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardRouting shardRouting : shardsIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -187,7 +185,7 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode
.from(0).size(60).explain(true).sort("age", SortOrder.ASC);
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardsIterator shardsIt : indicesService.searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardRouting shardRouting : shardsIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -277,7 +275,7 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode
// do this with dfs, since we have uneven distribution of docs between shards
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardsIterator shardsIt : indicesService.searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardRouting shardRouting : shardsIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -332,7 +330,7 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode
.facet(queryFacet("test1").query(termQuery("name", "test1")));
Map<SearchShardTarget, QuerySearchResultProvider> queryResults = newHashMap();
for (ShardsIterator shardsIt : indicesService.searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardRouting shardRouting : shardsIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -392,8 +390,8 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode
*/
public static class UnevenOperationRoutingStrategy extends PlainOperationRouting {
@Inject public UnevenOperationRoutingStrategy(Index index, @IndexSettings Settings indexSettings) {
super(index, indexSettings, null);
@Inject public UnevenOperationRoutingStrategy(Settings settings) {
super(settings, null);
}
@Override protected int hash(String type, String id) {

View File

@ -1,8 +1,8 @@
cluster:
routing:
schedule: 100ms
operation:
type: org.elasticsearch.test.integration.search.TwoInstanceUnbalancedShardsEmbeddedSearchTests$UnevenOperationRoutingModule
index:
number_of_shards: 3
number_of_replicas: 0
routing:
type: org.elasticsearch.test.integration.search.TwoInstanceUnbalancedShardsEmbeddedSearchTests$UnevenOperationRoutingModule

View File

@ -78,7 +78,7 @@ public class HighlightSearchTests extends AbstractNodesTests {
@Test public void testSimpleHighlighting() throws Exception {
SearchResponse searchResponse = client.prepareSearch()
.setIndices("test")
.setSearchType(QUERY_THEN_FETCH)
.setSearchType(DFS_QUERY_THEN_FETCH)
.setQuery(termQuery("_all", "test"))
.setFrom(0).setSize(60)
.addHighlightedField("_all").setHighlighterOrder("score").setHighlighterPreTags("<xxx>").setHighlighterPostTags("</xxx>")