Add support for filtering aliases to count

This commit is contained in:
Igor Motov 2011-05-24 21:03:11 -04:00 committed by kimchy
parent b76f5150ee
commit b979af109c
14 changed files with 85 additions and 34 deletions

View File

@ -64,8 +64,8 @@ public class TransportBroadcastPingAction extends TransportBroadcastOperationAct
return new BroadcastPingRequest();
}
@Override protected GroupShardsIterator shards(BroadcastPingRequest request, ClusterState clusterState) {
return clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint(), null, null);
@Override protected GroupShardsIterator shards(BroadcastPingRequest request, String[] concreteIndices, ClusterState clusterState) {
return clusterService.operationRouting().searchShards(clusterState, concreteIndices, request.queryHint(), null, null);
}
@Override protected BroadcastPingResponse newResponse(BroadcastPingRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {

View File

@ -142,7 +142,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
/**
* The refresh request works against *all* shards.
*/
@Override protected GroupShardsIterator shards(ClearIndicesCacheRequest request, ClusterState clusterState) {
return clusterState.routingTable().allShardsGrouped(request.indices());
@Override protected GroupShardsIterator shards(ClearIndicesCacheRequest request, String[] concreteIndices, ClusterState clusterState) {
return clusterState.routingTable().allShardsGrouped(concreteIndices);
}
}

View File

@ -118,7 +118,7 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
/**
* The refresh request works against *all* shards.
*/
@Override protected GroupShardsIterator shards(FlushRequest request, ClusterState clusterState) {
return clusterState.routingTable().allShardsGrouped(request.indices());
@Override protected GroupShardsIterator shards(FlushRequest request, String[] concreteIndices, ClusterState clusterState) {
return clusterState.routingTable().allShardsGrouped(concreteIndices);
}
}

View File

@ -116,7 +116,7 @@ public class TransportGatewaySnapshotAction extends TransportBroadcastOperationA
/**
* The snapshot request works against all primary shards.
*/
@Override protected GroupShardsIterator shards(GatewaySnapshotRequest request, ClusterState clusterState) {
return clusterState.routingTable().primaryShardsGrouped(request.indices());
@Override protected GroupShardsIterator shards(GatewaySnapshotRequest request, String[] concreteIndices, ClusterState clusterState) {
return clusterState.routingTable().primaryShardsGrouped(concreteIndices);
}
}

View File

@ -129,7 +129,7 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
/**
* The refresh request works against *all* shards.
*/
@Override protected GroupShardsIterator shards(OptimizeRequest request, ClusterState clusterState) {
return clusterState.routingTable().allShardsGrouped(request.indices());
@Override protected GroupShardsIterator shards(OptimizeRequest request, String[] concreteIndices, ClusterState clusterState) {
return clusterState.routingTable().allShardsGrouped(concreteIndices);
}
}

View File

@ -119,7 +119,7 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
/**
* The refresh request works against *all* shards.
*/
@Override protected GroupShardsIterator shards(RefreshRequest request, ClusterState clusterState) {
return clusterState.routingTable().allShardsGrouped(request.indices());
@Override protected GroupShardsIterator shards(RefreshRequest request, String[] concreteIndices, ClusterState clusterState) {
return clusterState.routingTable().allShardsGrouped(concreteIndices);
}
}

View File

@ -92,8 +92,8 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
/**
* Status goes across *all* shards.
*/
@Override protected GroupShardsIterator shards(IndicesStatusRequest request, ClusterState clusterState) {
return clusterState.routingTable().allShardsGrouped(request.indices());
@Override protected GroupShardsIterator shards(IndicesStatusRequest request, String[] concreteIndices, ClusterState clusterState) {
return clusterState.routingTable().allShardsGrouped(concreteIndices);
}
/**

View File

@ -43,11 +43,13 @@ class ShardCountRequest extends BroadcastShardOperationRequest {
private String[] types = Strings.EMPTY_ARRAY;
@Nullable private String queryParserName;
@Nullable private String[] filteringAliases;
ShardCountRequest() {
}
public ShardCountRequest(String index, int shardId, CountRequest request) {
public ShardCountRequest(String index, int shardId, @Nullable String[] filteringAliases, CountRequest request) {
super(index, shardId);
this.minScore = request.minScore();
this.querySource = request.querySource();
@ -55,6 +57,7 @@ class ShardCountRequest extends BroadcastShardOperationRequest {
this.querySourceLength = request.querySourceLength();
this.queryParserName = request.queryParserName();
this.types = request.types();
this.filteringAliases = filteringAliases;
}
public float minScore() {
@ -81,6 +84,10 @@ class ShardCountRequest extends BroadcastShardOperationRequest {
return this.types;
}
public String[] filteringAliases() {
return filteringAliases;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
minScore = in.readFloat();
@ -98,6 +105,13 @@ class ShardCountRequest extends BroadcastShardOperationRequest {
types[i] = in.readUTF();
}
}
int aliasesSize = in.readVInt();
if (aliasesSize > 0) {
filteringAliases = new String[aliasesSize];
for (int i = 0; i < aliasesSize; i++) {
filteringAliases[i] = in.readUTF();
}
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
@ -115,5 +129,13 @@ class ShardCountRequest extends BroadcastShardOperationRequest {
for (String type : types) {
out.writeUTF(type);
}
if (filteringAliases != null) {
out.writeVInt(filteringAliases.length);
for (String alias : filteringAliases) {
out.writeUTF(alias);
}
} else {
out.writeVInt(0);
}
}
}

View File

@ -75,19 +75,20 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
}
@Override protected ShardCountRequest newShardRequest(ShardRouting shard, CountRequest request) {
return new ShardCountRequest(shard.index(), shard.id(), request);
String[] filteringAliases = clusterService.state().metaData().filteringAliases(shard.index(), request.indices());
return new ShardCountRequest(shard.index(), shard.id(), filteringAliases, request);
}
@Override protected ShardCountResponse newShardResponse() {
return new ShardCountResponse();
}
@Override protected GroupShardsIterator shards(CountRequest request, ClusterState clusterState) {
return clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint(), request.routing(), null);
@Override protected GroupShardsIterator shards(CountRequest request, String[] concreteIndices, ClusterState clusterState) {
return clusterService.operationRouting().searchShards(clusterState, concreteIndices, request.queryHint(), request.routing(), null);
}
@Override protected void checkBlock(CountRequest request, ClusterState state) {
for (String index : request.indices()) {
@Override protected void checkBlock(CountRequest request, String[] concreteIndices, ClusterState state) {
for (String index : concreteIndices) {
state.blocks().indexBlocked(ClusterBlockLevel.READ, index);
}
}
@ -118,7 +119,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
@Override protected ShardCountResponse shardOperation(ShardCountRequest request) throws ElasticSearchException {
IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId());
long count = indexShard.count(request.minScore(), request.querySource(), request.querySourceOffset(), request.querySourceLength(),
request.queryParserName(), request.types());
request.queryParserName(), request.filteringAliases(), request.types());
return new ShardCountResponse(request.index(), request.shardId(), count);
}
}

View File

@ -89,7 +89,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
protected abstract ShardResponse shardOperation(ShardRequest request) throws ElasticSearchException;
protected abstract GroupShardsIterator shards(Request request, ClusterState clusterState);
protected abstract GroupShardsIterator shards(Request request, String[] concreteIndices, ClusterState clusterState);
/**
* Allows to override how shard routing is iterated over. Default implementation uses
@ -119,7 +119,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
return false;
}
protected void checkBlock(Request request, ClusterState state) {
protected void checkBlock(Request request, String[] indices, ClusterState state) {
}
@ -143,6 +143,8 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
private final AtomicReferenceArray shardsResponses;
private final String[] concreteIndices;
AsyncBroadcastAction(Request request, ActionListener<Response> listener) {
this.request = request;
this.listener = listener;
@ -150,11 +152,11 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
clusterState = clusterService.state();
// update to concrete indices
request.indices(clusterState.metaData().concreteIndices(request.indices()));
checkBlock(request, clusterState);
concreteIndices = clusterState.metaData().concreteIndices(request.indices());
checkBlock(request, concreteIndices, clusterState);
nodes = clusterState.nodes();
shardsIts = shards(request, clusterState);
shardsIts = shards(request, concreteIndices, clusterState);
expectedOps = shardsIts.size();

View File

@ -69,8 +69,12 @@ public class Lucene {
}
public static long count(IndexSearcher searcher, Query query, float minScore) throws IOException {
return count(searcher, query, null, minScore);
}
public static long count(IndexSearcher searcher, Query query, Filter filter, float minScore) throws IOException {
CountCollector countCollector = new CountCollector(minScore);
searcher.search(query, countCollector);
searcher.search(query, filter, countCollector);
return countCollector.count();
}

View File

@ -63,9 +63,9 @@ public interface IndexShard extends IndexShardComponent {
byte[] get(String type, String id) throws ElasticSearchException;
long count(float minScore, byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException;
long count(float minScore, byte[] querySource, @Nullable String queryParserName, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException;
long count(float minScore, byte[] querySource, int querySourceOffset, int querySourceLength, @Nullable String queryParserName, String... types) throws ElasticSearchException;
long count(float minScore, byte[] querySource, int querySourceOffset, int querySourceLength, @Nullable String queryParserName, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException;
void refresh(Engine.Refresh refresh) throws ElasticSearchException;

View File

@ -22,6 +22,7 @@ package org.elasticsearch.index.shard.service;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.FilteredQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.ThreadInterruptedException;
@ -37,6 +38,7 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadSafe;
import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.engine.*;
import org.elasticsearch.index.mapper.*;
@ -90,6 +92,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private final Translog translog;
private final IndexAliasesService indexAliasesService;
private final Object mutex = new Object();
@ -116,7 +120,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private final AtomicLong totalRefreshTime = new AtomicLong();
@Inject public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache) {
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService) {
super(shardId, indexSettings);
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
this.indexSettingsService = indexSettingsService;
@ -128,6 +132,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
this.mapperService = mapperService;
this.queryParserService = queryParserService;
this.indexCache = indexCache;
this.indexAliasesService = indexAliasesService;
state = IndexShardState.CREATED;
this.refreshInterval = indexSettings.getAsTime("engine.robin.refresh_interval", indexSettings.getAsTime("index.refresh_interval", engine.defaultRefreshInterval()));
@ -361,12 +366,12 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
}
@Override public long count(float minScore, byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException {
return count(minScore, querySource, 0, querySource.length, queryParserName, types);
@Override public long count(float minScore, byte[] querySource, @Nullable String queryParserName, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException {
return count(minScore, querySource, 0, querySource.length, queryParserName, filteringAliases, types);
}
@Override public long count(float minScore, byte[] querySource, int querySourceOffset, int querySourceLength,
@Nullable String queryParserName, String... types) throws ElasticSearchException {
@Nullable String queryParserName, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException {
readAllowed();
IndexQueryParser queryParser = queryParserService.defaultIndexQueryParser();
if (queryParserName != null) {
@ -380,9 +385,10 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
// Don't cache it, since it might be very different queries each time...
// query = new ConstantScoreQuery(filterCache.cache(new QueryWrapperFilter(query)));
query = filterByTypesIfNeeded(query, types);
Filter aliasFilter = indexAliasesService.aliasFilter(filteringAliases);
Engine.Searcher searcher = engine.searcher();
try {
long count = Lucene.count(searcher.searcher(), query, minScore);
long count = Lucene.count(searcher.searcher(), query, aliasFilter, minScore);
if (logger.isTraceEnabled()) {
logger.trace("count of [{}] is [{}]", query, count);
}

View File

@ -229,22 +229,32 @@ public class IndexAliasesTests extends AbstractNodesTests {
logger.info("--> checking filtering alias for two indices");
SearchResponse searchResponse = client1.prepareSearch("foos").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
assertHits(searchResponse.hits(), "1", "5");
assertThat(client1.prepareCount("foos").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(2L));
logger.info("--> checking filtering alias for one index");
searchResponse = client1.prepareSearch("bars").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
assertHits(searchResponse.hits(), "2");
assertThat(client1.prepareCount("bars").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(1L));
logger.info("--> checking filtering alias for two indices and one complete index");
searchResponse = client1.prepareSearch("foos", "test1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
assertHits(searchResponse.hits(), "1", "2", "3", "4", "5");
assertThat(client1.prepareCount("foos", "test1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(5L));
logger.info("--> checking filtering alias for two indices and non-filtering alias for one index");
searchResponse = client1.prepareSearch("foos", "aliasToTest1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
assertHits(searchResponse.hits(), "1", "2", "3", "4", "5");
assertThat(client1.prepareCount("foos", "aliasToTest1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(5L));
logger.info("--> checking filtering alias for two indices and non-filtering alias for both indices");
searchResponse = client1.prepareSearch("foos", "aliasToTests").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
assertThat(searchResponse.hits().totalHits(), equalTo(8L));
assertThat(client1.prepareCount("foos", "aliasToTests").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(8L));
logger.info("--> checking filtering alias for two indices and non-filtering alias for both indices");
searchResponse = client1.prepareSearch("foos", "aliasToTests").setQuery(QueryBuilders.termQuery("name", "something")).execute().actionGet();
assertHits(searchResponse.hits(), "4", "8");
assertThat(client1.prepareCount("foos", "aliasToTests").setQuery(QueryBuilders.termQuery("name", "something")).execute().actionGet().count(), equalTo(2L));
}
@Test public void testSearchingFilteringAliasesMultipleIndices() throws Exception {
@ -290,21 +300,27 @@ public class IndexAliasesTests extends AbstractNodesTests {
logger.info("--> checking filtering alias for multiple indices");
SearchResponse searchResponse = client1.prepareSearch("filter23", "filter13").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
assertHits(searchResponse.hits(), "21", "31", "13", "33");
assertThat(client1.prepareCount("filter23", "filter13").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(4L));
searchResponse = client1.prepareSearch("filter23", "filter1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
assertHits(searchResponse.hits(), "21", "31", "11", "12", "13");
assertThat(client1.prepareCount("filter23", "filter1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(5L));
searchResponse = client1.prepareSearch("filter13", "filter1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
assertHits(searchResponse.hits(), "11", "12", "13", "33");
assertThat(client1.prepareCount("filter13", "filter1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(4L));
searchResponse = client1.prepareSearch("filter13", "filter1", "filter23").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
assertHits(searchResponse.hits(), "11", "12", "13", "21", "31", "33");
assertThat(client1.prepareCount("filter13", "filter1", "filter23").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(6L));
searchResponse = client1.prepareSearch("filter23", "filter13", "test2").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
assertHits(searchResponse.hits(), "21", "22", "23", "31", "13", "33");
assertThat(client1.prepareCount("filter23", "filter13", "test2").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(6L));
searchResponse = client1.prepareSearch("filter23", "filter13", "test1", "test2").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
assertHits(searchResponse.hits(), "11", "12", "13", "21", "22", "23", "31", "33");
assertThat(client1.prepareCount("filter23", "filter13", "test1", "test2").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(8L));
}