diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java index 2088a851e98..b3b638fad80 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java @@ -62,7 +62,7 @@ public class TransportBroadcastPingAction extends TransportBroadcastOperationAct } @Override protected GroupShardsIterator shards(BroadcastPingRequest request, ClusterState clusterState) { - return indicesService.searchShards(clusterState, request.indices(), request.queryHint()); + return clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint()); } @Override protected BroadcastPingResponse newResponse(BroadcastPingRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 0da4f17ff32..a41061b0212 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -161,10 +161,10 @@ public class TransportBulkAction extends BaseAction { ShardId shardId = null; if (request instanceof IndexRequest) { IndexRequest indexRequest = (IndexRequest) request; - shardId = indicesService.indexServiceSafe(indexRequest.index()).operationRouting().indexShards(clusterState, indexRequest.type(), indexRequest.id()).shardId(); + shardId = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id()).shardId(); } else if (request instanceof DeleteRequest) { DeleteRequest deleteRequest = (DeleteRequest) request; - shardId = indicesService.indexServiceSafe(deleteRequest.index()).operationRouting().deleteShards(clusterState, deleteRequest.type(), deleteRequest.id()).shardId(); + shardId = clusterService.operationRouting().deleteShards(clusterState, deleteRequest.index(), deleteRequest.type(), deleteRequest.id()).shardId(); } List list = requestsByShard.get(shardId); if (list == null) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java index 617c6d31fc7..91b117ffb71 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java @@ -76,7 +76,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction spawnModules() { - return ImmutableList.of(new ShardAllocationModule(settings)); + return ImmutableList.of(new ShardAllocationModule(settings), new OperationRoutingModule(settings)); } @Override diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterService.java index d374787991b..645967804a8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterService.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.operation.OperationRouting; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.unit.TimeValue; @@ -41,6 +42,9 @@ public interface ClusterService extends LifecycleComponent { */ ClusterState state(); + OperationRouting operationRouting(); + + /** * Adds a listener for updated cluster states. */ diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/OperationRouting.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/OperationRouting.java similarity index 73% rename from modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/OperationRouting.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/OperationRouting.java index 7da54928cec..8f903025d30 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/OperationRouting.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/OperationRouting.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.index.routing; +package org.elasticsearch.cluster.routing.operation; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; @@ -28,20 +28,17 @@ import org.elasticsearch.indices.IndexMissingException; import javax.annotation.Nullable; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public interface OperationRouting { - ShardsIterator indexShards(ClusterState clusterState, String type, String id) throws IndexMissingException, IndexShardMissingException; + ShardsIterator indexShards(ClusterState clusterState, String index, String type, String id) throws IndexMissingException, IndexShardMissingException; - ShardsIterator deleteShards(ClusterState clusterState, String type, String id) throws IndexMissingException, IndexShardMissingException; + ShardsIterator deleteShards(ClusterState clusterState, String index, String type, String id) throws IndexMissingException, IndexShardMissingException; - ShardsIterator getShards(ClusterState clusterState, String type, String id) throws IndexMissingException, IndexShardMissingException; + ShardsIterator getShards(ClusterState clusterState, String index, String type, String id) throws IndexMissingException, IndexShardMissingException; - /** - * Returns the shards grouped by shard - */ - GroupShardsIterator deleteByQueryShards(ClusterState clusterState) throws IndexMissingException; + GroupShardsIterator deleteByQueryShards(ClusterState clusterState, String index) throws IndexMissingException; - GroupShardsIterator searchShards(ClusterState clusterState, @Nullable String queryHint) throws IndexMissingException; + GroupShardsIterator searchShards(ClusterState clusterState, String[] indices, @Nullable String queryHint) throws IndexMissingException; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/OperationRoutingModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/OperationRoutingModule.java similarity index 62% rename from modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/OperationRoutingModule.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/OperationRoutingModule.java index a8314de9fe8..95eeb106bb4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/OperationRoutingModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/OperationRoutingModule.java @@ -17,16 +17,16 @@ * under the License. */ -package org.elasticsearch.index.routing; +package org.elasticsearch.cluster.routing.operation; +import org.elasticsearch.cluster.routing.operation.hash.HashFunction; +import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction; +import org.elasticsearch.cluster.routing.operation.plain.PlainOperationRoutingModule; import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.SpawnModules; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.routing.hash.HashFunction; -import org.elasticsearch.index.routing.hash.djb.DjbHashFunction; -import org.elasticsearch.index.routing.plain.PlainOperationRoutingModule; import static org.elasticsearch.common.inject.Modules.*; @@ -35,17 +35,17 @@ import static org.elasticsearch.common.inject.Modules.*; */ public class OperationRoutingModule extends AbstractModule implements SpawnModules { - private final Settings indexSettings; + private final Settings settings; - public OperationRoutingModule(Settings indexSettings) { - this.indexSettings = indexSettings; + public OperationRoutingModule(Settings settings) { + this.settings = settings; } @Override public Iterable spawnModules() { - return ImmutableList.of(createModule(indexSettings.getAsClass("index.routing.type", PlainOperationRoutingModule.class, "org.elasticsearch.index.routing.", "OperationRoutingModule"), indexSettings)); + return ImmutableList.of(createModule(settings.getAsClass("cluster.routing.operation.type", PlainOperationRoutingModule.class, "org.elasticsearch.cluster.routing.operation.", "OperationRoutingModule"), settings)); } @Override protected void configure() { - bind(HashFunction.class).to(indexSettings.getAsClass("index.routing.hash.type", DjbHashFunction.class, "org.elasticsearch.index.routing.hash.", "HashFunction")).asEagerSingleton(); + bind(HashFunction.class).to(settings.getAsClass("cluster.routing.operation.hash.type", DjbHashFunction.class, "org.elasticsearch.cluster.routing.operation.hash.", "HashFunction")).asEagerSingleton(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/hash/HashFunction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/hash/HashFunction.java similarity index 90% rename from modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/hash/HashFunction.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/hash/HashFunction.java index 35c87f745f9..f2b6efbff6d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/hash/HashFunction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/hash/HashFunction.java @@ -17,10 +17,10 @@ * under the License. */ -package org.elasticsearch.index.routing.hash; +package org.elasticsearch.cluster.routing.operation.hash; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public interface HashFunction { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/hash/djb/DjbHashFunction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/hash/djb/DjbHashFunction.java similarity index 88% rename from modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/hash/djb/DjbHashFunction.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/hash/djb/DjbHashFunction.java index a63286ecfdd..e55f5d4f245 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/hash/djb/DjbHashFunction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/hash/djb/DjbHashFunction.java @@ -17,12 +17,12 @@ * under the License. */ -package org.elasticsearch.index.routing.hash.djb; +package org.elasticsearch.cluster.routing.operation.hash.djb; -import org.elasticsearch.index.routing.hash.HashFunction; +import org.elasticsearch.cluster.routing.operation.hash.HashFunction; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class DjbHashFunction implements HashFunction { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/hash/simple/SimpleHashFunction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/hash/simple/SimpleHashFunction.java similarity index 85% rename from modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/hash/simple/SimpleHashFunction.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/hash/simple/SimpleHashFunction.java index afd4b26be78..d8642a12ceb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/hash/simple/SimpleHashFunction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/hash/simple/SimpleHashFunction.java @@ -17,12 +17,12 @@ * under the License. */ -package org.elasticsearch.index.routing.hash.simple; +package org.elasticsearch.cluster.routing.operation.hash.simple; -import org.elasticsearch.index.routing.hash.HashFunction; +import org.elasticsearch.cluster.routing.operation.hash.HashFunction; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class SimpleHashFunction implements HashFunction { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/plain/PlainOperationRouting.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/plain/PlainOperationRouting.java similarity index 60% rename from modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/plain/PlainOperationRouting.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/plain/PlainOperationRouting.java index 5aef8e1a963..b9ef6e2b6d8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/plain/PlainOperationRouting.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/plain/PlainOperationRouting.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.index.routing.plain; +package org.elasticsearch.cluster.routing.operation.plain; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -25,77 +25,82 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.routing.operation.OperationRouting; +import org.elasticsearch.cluster.routing.operation.hash.HashFunction; import org.elasticsearch.common.collect.IdentityHashSet; +import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexShardMissingException; -import org.elasticsearch.index.routing.OperationRouting; -import org.elasticsearch.index.routing.hash.HashFunction; -import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexMissingException; import javax.annotation.Nullable; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ -public class PlainOperationRouting extends AbstractIndexComponent implements OperationRouting { +public class PlainOperationRouting extends AbstractComponent implements OperationRouting { private final HashFunction hashFunction; - @Inject public PlainOperationRouting(Index index, @IndexSettings Settings indexSettings, HashFunction hashFunction) { - super(index, indexSettings); + @Inject public PlainOperationRouting(Settings indexSettings, HashFunction hashFunction) { + super(indexSettings); this.hashFunction = hashFunction; } - @Override public ShardsIterator indexShards(ClusterState clusterState, String type, String id) throws IndexMissingException, IndexShardMissingException { - return shards(clusterState, type, id).shardsIt(); + @Override public ShardsIterator indexShards(ClusterState clusterState, String index, String type, String id) throws IndexMissingException, IndexShardMissingException { + return shards(clusterState, index, type, id).shardsIt(); } - @Override public ShardsIterator deleteShards(ClusterState clusterState, String type, String id) throws IndexMissingException, IndexShardMissingException { - return shards(clusterState, type, id).shardsIt(); + @Override public ShardsIterator deleteShards(ClusterState clusterState, String index, String type, String id) throws IndexMissingException, IndexShardMissingException { + return shards(clusterState, index, type, id).shardsIt(); } - @Override public ShardsIterator getShards(ClusterState clusterState, String type, String id) throws IndexMissingException, IndexShardMissingException { - return shards(clusterState, type, id).shardsRandomIt(); + @Override public ShardsIterator getShards(ClusterState clusterState, String index, String type, String id) throws IndexMissingException, IndexShardMissingException { + return shards(clusterState, index, type, id).shardsRandomIt(); } - @Override public GroupShardsIterator deleteByQueryShards(ClusterState clusterState) throws IndexMissingException { - return indexRoutingTable(clusterState).groupByShardsIt(); + @Override public GroupShardsIterator deleteByQueryShards(ClusterState clusterState, String index) throws IndexMissingException { + return indexRoutingTable(clusterState, index).groupByShardsIt(); } - @Override public GroupShardsIterator searchShards(ClusterState clusterState, @Nullable String queryHint) throws IndexMissingException { + @Override public GroupShardsIterator searchShards(ClusterState clusterState, String[] indices, @Nullable String queryHint) throws IndexMissingException { + if (indices == null || indices.length == 0) { + indices = clusterState.metaData().concreteAllIndices(); + } + IdentityHashSet set = new IdentityHashSet(); - IndexRoutingTable indexRouting = indexRoutingTable(clusterState); - for (IndexShardRoutingTable indexShard : indexRouting) { - set.add(indexShard.shardsRandomIt()); + for (String index : indices) { + IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index); + for (IndexShardRoutingTable indexShard : indexRouting) { + set.add(indexShard.shardsRandomIt()); + } } return new GroupShardsIterator(set); } - public IndexMetaData indexMetaData(ClusterState clusterState) { - IndexMetaData indexMetaData = clusterState.metaData().index(index.name()); + public IndexMetaData indexMetaData(ClusterState clusterState, String index) { + IndexMetaData indexMetaData = clusterState.metaData().index(index); if (indexMetaData == null) { - throw new IndexMissingException(index); + throw new IndexMissingException(new Index(index)); } return indexMetaData; } - protected IndexRoutingTable indexRoutingTable(ClusterState clusterState) { - IndexRoutingTable indexRouting = clusterState.routingTable().index(index.name()); + protected IndexRoutingTable indexRoutingTable(ClusterState clusterState, String index) { + IndexRoutingTable indexRouting = clusterState.routingTable().index(index); if (indexRouting == null) { - throw new IndexMissingException(index); + throw new IndexMissingException(new Index(index)); } return indexRouting; } - protected IndexShardRoutingTable shards(ClusterState clusterState, String type, String id) { - int shardId = Math.abs(hash(type, id)) % indexMetaData(clusterState).numberOfShards(); - IndexShardRoutingTable indexShard = indexRoutingTable(clusterState).shard(shardId); + protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String type, String id) { + int shardId = Math.abs(hash(type, id)) % indexMetaData(clusterState, index).numberOfShards(); + IndexShardRoutingTable indexShard = indexRoutingTable(clusterState, index).shard(shardId); if (indexShard == null) { throw new IndexShardMissingException(new ShardId(index, shardId)); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/plain/PlainOperationRoutingModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/plain/PlainOperationRoutingModule.java similarity index 89% rename from modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/plain/PlainOperationRoutingModule.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/plain/PlainOperationRoutingModule.java index c8b5c888d85..dde4129061f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/routing/plain/PlainOperationRoutingModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/plain/PlainOperationRoutingModule.java @@ -17,10 +17,10 @@ * under the License. */ -package org.elasticsearch.index.routing.plain; +package org.elasticsearch.cluster.routing.operation.plain; +import org.elasticsearch.cluster.routing.operation.OperationRouting; import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.index.routing.OperationRouting; /** * @author kimchy (Shay Banon) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index c187aceb635..d30dc5ced1b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -23,6 +23,7 @@ import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.operation.OperationRouting; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -58,6 +59,8 @@ public class InternalClusterService extends AbstractLifecycleComponent, Clos IndexCache cache(); - OperationRouting operationRouting(); - MapperService mapperService(); IndexQueryParserService queryParserService(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java index c7fd218eac4..70146cec587 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java @@ -43,7 +43,6 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.merge.policy.MergePolicyModule; import org.elasticsearch.index.merge.scheduler.MergeSchedulerModule; import org.elasticsearch.index.query.IndexQueryParserService; -import org.elasticsearch.index.routing.OperationRouting; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.IndexShardManagement; import org.elasticsearch.index.shard.IndexShardModule; @@ -100,8 +99,6 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde private final IndexStore indexStore; - private final OperationRouting operationRouting; - private volatile ImmutableMap shardsInjectors = ImmutableMap.of(); private volatile ImmutableMap shards = ImmutableMap.of(); @@ -110,7 +107,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde @Inject public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, SimilarityService similarityService, - IndexCache indexCache, IndexEngine indexEngine, IndexGateway indexGateway, IndexStore indexStore, OperationRouting operationRouting) { + IndexCache indexCache, IndexEngine indexEngine, IndexGateway indexGateway, IndexStore indexStore) { super(index, indexSettings); this.injector = injector; this.threadPool = threadPool; @@ -122,7 +119,6 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde this.indexEngine = indexEngine; this.indexGateway = indexGateway; this.indexStore = indexStore; - this.operationRouting = operationRouting; this.pluginsService = injector.getInstance(PluginsService.class); this.indicesLifecycle = (InternalIndicesLifecycle) injector.getInstance(IndicesLifecycle.class); @@ -174,10 +170,6 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde return indexCache; } - @Override public OperationRouting operationRouting() { - return operationRouting; - } - @Override public MapperService mapperService() { return mapperService; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesService.java index e5ec26d175d..d77c3460a6f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -20,8 +20,6 @@ package org.elasticsearch.indices; import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadSafe; @@ -52,13 +50,6 @@ public interface IndicesService extends Iterable, LifecycleCompone IndexService indexServiceSafe(String index) throws IndexMissingException; - /** - * Gets all the "searchable" shards on all the given indices. - * - * @see org.elasticsearch.index.routing.OperationRouting#searchShards(org.elasticsearch.cluster.ClusterState, String) - */ - GroupShardsIterator searchShards(ClusterState clusterState, String[] indices, String queryHint) throws ElasticSearchException; - IndexService createIndex(String index, Settings settings, String localNodeId) throws ElasticSearchException; void deleteIndex(String index) throws ElasticSearchException; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java index 17ec3447555..423089cedbf 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java @@ -23,8 +23,6 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.FieldCache; import org.apache.lucene.search.IndexReaderPurgedListener; import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.collect.UnmodifiableIterator; @@ -48,7 +46,6 @@ import org.elasticsearch.index.gateway.IndexGateway; import org.elasticsearch.index.gateway.IndexGatewayModule; import org.elasticsearch.index.mapper.MapperServiceModule; import org.elasticsearch.index.query.IndexQueryParserModule; -import org.elasticsearch.index.routing.OperationRoutingModule; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.shard.service.IndexShard; @@ -197,18 +194,6 @@ public class InternalIndicesService extends AbstractLifecycleComponent indices = this.indices; - indexNames = indices.keySet().toArray(new String[indices.keySet().size()]); - } - GroupShardsIterator its = new GroupShardsIterator(); - for (String index : indexNames) { - its.add(indexServiceSafe(index).operationRouting().searchShards(clusterState, queryHint)); - } - return its; - } - public synchronized IndexService createIndex(String sIndexName, Settings settings, String localNodeId) throws ElasticSearchException { Index index = new Index(sIndexName); if (indicesInjectors.containsKey(index.name())) { @@ -240,7 +225,6 @@ public class InternalIndicesService extends AbstractLifecycleComponent expectedIds = Sets.newHashSet(); + for (int i = 0; i < 100; i++) { + expectedIds.add(Integer.toString(i)); + } + SearchResponse searchResponse = client.search(searchRequest("test").source(source).searchType(QUERY_AND_FETCH).scroll(new Scroll(timeValueMinutes(10)))).actionGet(); assertThat("Failures " + Arrays.toString(searchResponse.shardFailures()), searchResponse.shardFailures().length, equalTo(0)); assertThat(searchResponse.hits().totalHits(), equalTo(100l)); @@ -199,17 +205,15 @@ public class TransportTwoServersSearchTests extends AbstractNodesTests { SearchHit hit = searchResponse.hits().hits()[i]; // System.out.println(hit.shard() + ": " + hit.explanation()); assertThat(hit.explanation(), notNullValue()); - assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - i - 1))); + // we can't really check here, since its query and fetch, and not controlling distribution +// assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - i - 1))); + assertThat("make sure we don't have duplicates", expectedIds.remove(hit.id()), notNullValue()); } searchResponse = client.searchScroll(searchScrollRequest(searchResponse.scrollId())).actionGet(); assertThat(searchResponse.hits().totalHits(), equalTo(100l)); assertThat(searchResponse.hits().hits().length, equalTo(40)); - Set expectedIds = Sets.newHashSet(); - for (int i = 0; i < 40; i++) { - expectedIds.add(Integer.toString(i)); - } for (int i = 0; i < 40; i++) { SearchHit hit = searchResponse.hits().hits()[i]; // assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - 60 - 1 - i))); @@ -224,6 +228,12 @@ public class TransportTwoServersSearchTests extends AbstractNodesTests { .query(termQuery("multi", "test")) .from(0).size(20).explain(true); + Set expectedIds = Sets.newHashSet(); + for (int i = 0; i < 100; i++) { + expectedIds.add(Integer.toString(i)); + } + + SearchResponse searchResponse = client.search(searchRequest("test").source(source).searchType(DFS_QUERY_AND_FETCH).scroll(new Scroll(timeValueMinutes(10)))).actionGet(); assertThat("Failures " + Arrays.toString(searchResponse.shardFailures()), searchResponse.shardFailures().length, equalTo(0)); assertThat(searchResponse.hits().totalHits(), equalTo(100l)); @@ -232,17 +242,14 @@ public class TransportTwoServersSearchTests extends AbstractNodesTests { SearchHit hit = searchResponse.hits().hits()[i]; // System.out.println(hit.shard() + ": " + hit.explanation()); assertThat(hit.explanation(), notNullValue()); - assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - i - 1))); +// assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - i - 1))); + assertThat("make sure we don't have duplicates", expectedIds.remove(hit.id()), notNullValue()); } searchResponse = client.searchScroll(searchScrollRequest(searchResponse.scrollId())).actionGet(); assertThat(searchResponse.hits().totalHits(), equalTo(100l)); assertThat(searchResponse.hits().hits().length, equalTo(40)); - Set expectedIds = Sets.newHashSet(); - for (int i = 0; i < 40; i++) { - expectedIds.add(Integer.toString(i)); - } for (int i = 0; i < 40; i++) { SearchHit hit = searchResponse.hits().hits()[i]; // System.out.println(hit.shard() + ": " + hit.explanation()); @@ -325,6 +332,7 @@ public class TransportTwoServersSearchTests extends AbstractNodesTests { } return jsonBuilder().startObject() .field("id", id) + .field("nid", Integer.parseInt(id)) .field("name", nameValue + id) .field("age", age) .field("multi", multi.toString()) diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceEmbeddedSearchTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceEmbeddedSearchTests.java index 9ca61145a6b..23bd28f058d 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceEmbeddedSearchTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceEmbeddedSearchTests.java @@ -114,7 +114,7 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests { .from(0).size(60).explain(true).indexBoost("test", 1.0f).indexBoost("test2", 2.0f); List dfsResults = newArrayList(); - for (ShardsIterator shardsIt : indicesService.searchShards(clusterService.state(), new String[]{"test"}, null)) { + for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) { for (ShardRouting shardRouting : shardsIt) { InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder) .scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES))); @@ -182,7 +182,7 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests { .from(0).size(60).explain(true).sort("age", SortOrder.ASC); List dfsResults = newArrayList(); - for (ShardsIterator shardsIt : indicesService.searchShards(clusterService.state(), new String[]{"test"}, null)) { + for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) { for (ShardRouting shardRouting : shardsIt) { InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder) .scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES))); @@ -270,8 +270,13 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests { .query(termQuery("multi", "test")) .from(0).size(20).explain(true); + Set expectedIds = Sets.newHashSet(); + for (int i = 0; i < 100; i++) { + expectedIds.add(Integer.toString(i)); + } + Map queryFetchResults = newHashMap(); - for (ShardsIterator shardsIt : indicesService.searchShards(clusterService.state(), new String[]{"test"}, null)) { + for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) { for (ShardRouting shardRouting : shardsIt) { InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder) .scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES))); @@ -289,7 +294,8 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests { for (int i = 0; i < 60; i++) { SearchHit hit = hits.hits()[i]; // System.out.println(hit.id() + " " + hit.explanation()); - assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - i - 1))); +// assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - i - 1))); + assertThat("make sure we don't have duplicates", expectedIds.remove(hit.id()), notNullValue()); } // scrolling with query+fetch is not perfect when it comes to dist sorting @@ -304,10 +310,6 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests { hits = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults).hits(); assertThat(hits.totalHits(), equalTo(100l)); assertThat(hits.hits().length, equalTo(40)); - Set expectedIds = Sets.newHashSet(); - for (int i = 0; i < 40; i++) { - expectedIds.add(Integer.toString(i)); - } for (int i = 0; i < 40; i++) { SearchHit hit = hits.hits()[i]; // System.out.println(hit.id() + " " + hit.explanation()); @@ -326,7 +328,7 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests { .facet(FacetBuilders.queryFacet("test1", termQuery("name", "test1"))); Map queryResults = newHashMap(); - for (ShardsIterator shardsIt : indicesService.searchShards(clusterService.state(), new String[]{"test"}, null)) { + for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) { for (ShardRouting shardRouting : shardsIt) { InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder) .scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES))); @@ -370,6 +372,6 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests { for (int i = 0; i < age; i++) { multi.append(" ").append(nameValue); } - return "{ type1 : { \"id\" : \"" + id + "\", \"name\" : \"" + (nameValue + id) + "\", age : " + age + ", multi : \"" + multi.toString() + "\", _boost : " + (age * 10) + " } }"; + return "{ type1 : { \"id\" : \"" + id + "\", \"nid\" : " + id + ", \"name\" : \"" + (nameValue + id) + "\", age : " + age + ", multi : \"" + multi.toString() + "\", _boost : " + (age * 10) + " } }"; } } diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceUnbalancedShardsEmbeddedSearchTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceUnbalancedShardsEmbeddedSearchTests.java index aca9b64c34c..11e9ea28f97 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceUnbalancedShardsEmbeddedSearchTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceUnbalancedShardsEmbeddedSearchTests.java @@ -24,16 +24,14 @@ import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.routing.operation.OperationRouting; +import org.elasticsearch.cluster.routing.operation.plain.PlainOperationRouting; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.trove.ExtTIntArrayList; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.routing.OperationRouting; -import org.elasticsearch.index.routing.plain.PlainOperationRouting; -import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.internal.InternalNode; import org.elasticsearch.search.*; @@ -73,7 +71,7 @@ import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNodesTests { @@ -120,7 +118,7 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode .from(0).size(60).explain(true); List dfsResults = newArrayList(); - for (ShardsIterator shardsIt : indicesService.searchShards(clusterService.state(), new String[]{"test"}, null)) { + for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) { for (ShardRouting shardRouting : shardsIt) { InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder) .scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES))); @@ -187,7 +185,7 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode .from(0).size(60).explain(true).sort("age", SortOrder.ASC); List dfsResults = newArrayList(); - for (ShardsIterator shardsIt : indicesService.searchShards(clusterService.state(), new String[]{"test"}, null)) { + for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) { for (ShardRouting shardRouting : shardsIt) { InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder) .scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES))); @@ -277,7 +275,7 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode // do this with dfs, since we have uneven distribution of docs between shards List dfsResults = newArrayList(); - for (ShardsIterator shardsIt : indicesService.searchShards(clusterService.state(), new String[]{"test"}, null)) { + for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) { for (ShardRouting shardRouting : shardsIt) { InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder) .scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES))); @@ -332,7 +330,7 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode .facet(queryFacet("test1").query(termQuery("name", "test1"))); Map queryResults = newHashMap(); - for (ShardsIterator shardsIt : indicesService.searchShards(clusterService.state(), new String[]{"test"}, null)) { + for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) { for (ShardRouting shardRouting : shardsIt) { InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder) .scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES))); @@ -392,8 +390,8 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode */ public static class UnevenOperationRoutingStrategy extends PlainOperationRouting { - @Inject public UnevenOperationRoutingStrategy(Index index, @IndexSettings Settings indexSettings) { - super(index, indexSettings, null); + @Inject public UnevenOperationRoutingStrategy(Settings settings) { + super(settings, null); } @Override protected int hash(String type, String id) { diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceUnbalancedShardsEmbeddedSearchTests.yml b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceUnbalancedShardsEmbeddedSearchTests.yml index cfae48e0d27..c052d039521 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceUnbalancedShardsEmbeddedSearchTests.yml +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceUnbalancedShardsEmbeddedSearchTests.yml @@ -1,8 +1,8 @@ cluster: routing: schedule: 100ms + operation: + type: org.elasticsearch.test.integration.search.TwoInstanceUnbalancedShardsEmbeddedSearchTests$UnevenOperationRoutingModule index: number_of_shards: 3 number_of_replicas: 0 - routing: - type: org.elasticsearch.test.integration.search.TwoInstanceUnbalancedShardsEmbeddedSearchTests$UnevenOperationRoutingModule diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/highlight/HighlightSearchTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/highlight/HighlightSearchTests.java index 0e9dbe0697d..6288e49acb8 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/highlight/HighlightSearchTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/highlight/HighlightSearchTests.java @@ -78,7 +78,7 @@ public class HighlightSearchTests extends AbstractNodesTests { @Test public void testSimpleHighlighting() throws Exception { SearchResponse searchResponse = client.prepareSearch() .setIndices("test") - .setSearchType(QUERY_THEN_FETCH) + .setSearchType(DFS_QUERY_THEN_FETCH) .setQuery(termQuery("_all", "test")) .setFrom(0).setSize(60) .addHighlightedField("_all").setHighlighterOrder("score").setHighlighterPreTags("").setHighlighterPostTags("")