unify more count and search implementation
This commit is contained in:
parent
bc2887344e
commit
e3a9271000
|
@ -30,10 +30,18 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
|
|||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.query.QueryParseContext;
|
||||
import org.elasticsearch.index.service.IndexService;
|
||||
import org.elasticsearch.index.shard.service.IndexShard;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.search.internal.InternalSearchRequest;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
import org.elasticsearch.search.query.QueryPhaseExecutionException;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
|
@ -51,10 +59,14 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
|
|||
|
||||
private final IndicesService indicesService;
|
||||
|
||||
private final ScriptService scriptService;
|
||||
|
||||
@Inject
|
||||
public TransportCountAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, IndicesService indicesService) {
|
||||
public TransportCountAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
|
||||
IndicesService indicesService, ScriptService scriptService) {
|
||||
super(settings, threadPool, clusterService, transportService);
|
||||
this.indicesService = indicesService;
|
||||
this.scriptService = scriptService;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -130,9 +142,40 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
|
|||
|
||||
@Override
|
||||
protected ShardCountResponse shardOperation(ShardCountRequest request) throws ElasticSearchException {
|
||||
IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId());
|
||||
long count = indexShard.count(request.minScore(), request.querySource(),
|
||||
request.filteringAliases(), request.types());
|
||||
return new ShardCountResponse(request.index(), request.shardId(), count);
|
||||
IndexService indexService = indicesService.indexServiceSafe(request.index());
|
||||
IndexShard indexShard = indexService.shardSafe(request.shardId());
|
||||
|
||||
SearchContext context = new SearchContext(0,
|
||||
new InternalSearchRequest().types(request.types()).filteringAliases(request.filteringAliases()),
|
||||
null, indexShard.searcher(), indexService, indexShard,
|
||||
scriptService);
|
||||
SearchContext.setCurrent(context);
|
||||
|
||||
try {
|
||||
// TODO: min score should move to be "null" as a value that is not initialized...
|
||||
if (request.minScore() != -1) {
|
||||
context.minimumScore(request.minScore());
|
||||
}
|
||||
BytesReference querySource = request.querySource();
|
||||
if (querySource != null && querySource.length() > 0) {
|
||||
try {
|
||||
QueryParseContext.setTypes(request.types());
|
||||
context.parsedQuery(indexService.queryParserService().parse(querySource));
|
||||
} finally {
|
||||
QueryParseContext.removeTypes();
|
||||
}
|
||||
}
|
||||
context.preProcess();
|
||||
try {
|
||||
long count = Lucene.count(context.searcher(), context.query());
|
||||
return new ShardCountResponse(request.index(), request.shardId(), count);
|
||||
} catch (Exception e) {
|
||||
throw new QueryPhaseExecutionException(context, "failed to execute count", e);
|
||||
}
|
||||
} finally {
|
||||
// this will also release the index searcher
|
||||
context.release();
|
||||
SearchContext.removeCurrent();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -96,4 +96,9 @@ public class DefaultShardOperationFailedException implements ShardOperationFaile
|
|||
out.writeVInt(shardId);
|
||||
out.writeUTF(reason);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "[" + index + "][" + shardId + "] failed, reason [" + reason + "]";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,14 +82,14 @@ public class Lucene {
|
|||
return defaultVersion;
|
||||
}
|
||||
|
||||
public static long count(IndexSearcher searcher, Query query, float minScore) throws IOException {
|
||||
return count(searcher, query, null, minScore);
|
||||
}
|
||||
|
||||
public static long count(IndexSearcher searcher, Query query, Filter filter, float minScore) throws IOException {
|
||||
CountCollector countCollector = new CountCollector(minScore);
|
||||
searcher.search(query, filter, countCollector);
|
||||
return countCollector.count();
|
||||
public static long count(IndexSearcher searcher, Query query) throws IOException {
|
||||
TotalHitCountCollector countCollector = new TotalHitCountCollector();
|
||||
// we don't need scores, so wrap it in a constant score query
|
||||
if (!(query instanceof ConstantScoreQuery)) {
|
||||
query = new ConstantScoreQuery(query);
|
||||
}
|
||||
searcher.search(query, countCollector);
|
||||
return countCollector.getTotalHits();
|
||||
}
|
||||
|
||||
public static int docId(IndexReader reader, Term term) throws IOException {
|
||||
|
@ -320,42 +320,6 @@ public class Lucene {
|
|||
}
|
||||
}
|
||||
|
||||
public static class CountCollector extends Collector {
|
||||
|
||||
private final float minScore;
|
||||
private Scorer scorer;
|
||||
private long count;
|
||||
|
||||
public CountCollector(float minScore) {
|
||||
this.minScore = minScore;
|
||||
}
|
||||
|
||||
public long count() {
|
||||
return this.count;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setScorer(Scorer scorer) throws IOException {
|
||||
this.scorer = scorer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collect(int doc) throws IOException {
|
||||
if (scorer.score() > minScore) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNextReader(IndexReader reader, int docBase) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean acceptsDocsOutOfOrder() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public static class ExistsCollector extends Collector {
|
||||
|
||||
private boolean exists;
|
||||
|
|
|
@ -110,6 +110,9 @@ public class Queries {
|
|||
}
|
||||
|
||||
public static boolean isMatchAllQuery(Query query) {
|
||||
if (query == Queries.MATCH_ALL_QUERY) {
|
||||
return true;
|
||||
}
|
||||
if (query instanceof MatchAllDocsQuery) {
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -398,6 +398,7 @@ public class MapperService extends AbstractIndexComponent implements Iterable<Do
|
|||
/**
|
||||
* A filter for search. If a filter is required, will return it, otherwise, will return <tt>null</tt>.
|
||||
*/
|
||||
@Nullable
|
||||
public Filter searchFilter(String... types) {
|
||||
if (types == null || types.length == 0) {
|
||||
if (hasNested) {
|
||||
|
|
|
@ -96,8 +96,6 @@ public interface IndexShard extends IndexShardComponent {
|
|||
|
||||
Engine.GetResult get(Engine.Get get) throws ElasticSearchException;
|
||||
|
||||
long count(float minScore, BytesReference querySource, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException;
|
||||
|
||||
void refresh(Engine.Refresh refresh) throws ElasticSearchException;
|
||||
|
||||
void flush(Engine.Flush flush) throws ElasticSearchException;
|
||||
|
|
|
@ -37,8 +37,6 @@ import org.elasticsearch.common.Strings;
|
|||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.FastByteArrayOutputStream;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.search.Queries;
|
||||
import org.elasticsearch.common.metrics.MeanMetric;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
@ -54,7 +52,6 @@ import org.elasticsearch.index.mapper.*;
|
|||
import org.elasticsearch.index.merge.MergeStats;
|
||||
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
|
||||
import org.elasticsearch.index.query.IndexQueryParserService;
|
||||
import org.elasticsearch.index.query.QueryParseContext;
|
||||
import org.elasticsearch.index.refresh.RefreshStats;
|
||||
import org.elasticsearch.index.search.nested.IncludeAllChildrenQuery;
|
||||
import org.elasticsearch.index.search.nested.NonNestedDocsFilter;
|
||||
|
@ -402,39 +399,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
return engine.get(get);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long count(float minScore, BytesReference querySource, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException {
|
||||
readAllowed();
|
||||
Query query;
|
||||
if (querySource == null || querySource.length() == 0) {
|
||||
query = Queries.MATCH_ALL_QUERY;
|
||||
} else {
|
||||
try {
|
||||
QueryParseContext.setTypes(types);
|
||||
query = queryParserService.parse(querySource).query();
|
||||
} finally {
|
||||
QueryParseContext.removeTypes();
|
||||
}
|
||||
}
|
||||
// wrap it in filter, cache it, and constant score it
|
||||
// Don't cache it, since it might be very different queries each time...
|
||||
// query = new ConstantScoreQuery(filterCache.cache(new QueryWrapperFilter(query)));
|
||||
query = filterQueryIfNeeded(query, types);
|
||||
Filter aliasFilter = indexAliasesService.aliasFilter(filteringAliases);
|
||||
Engine.Searcher searcher = engine.searcher();
|
||||
try {
|
||||
long count = Lucene.count(searcher.searcher(), query, aliasFilter, minScore);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("count of [{}] is [{}]", query, count);
|
||||
}
|
||||
return count;
|
||||
} catch (IOException e) {
|
||||
throw new ElasticSearchException("Failed to count query [" + query + "]", e);
|
||||
} finally {
|
||||
searcher.release();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refresh(Engine.Refresh refresh) throws ElasticSearchException {
|
||||
verifyStarted();
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.search;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.lucene.search.Filter;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
|
@ -490,9 +489,6 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
context.size(10);
|
||||
}
|
||||
|
||||
Filter aliasFilter = indexService.aliasesService().aliasFilter(request.filteringAliases());
|
||||
context.aliasFilter(aliasFilter);
|
||||
|
||||
// pre process
|
||||
dfsPhase.preProcess(context);
|
||||
queryPhase.preProcess(context);
|
||||
|
|
|
@ -140,8 +140,9 @@ public class InternalSearchRequest implements Streamable {
|
|||
return filteringAliases;
|
||||
}
|
||||
|
||||
public void filteringAliases(String[] filteringAliases) {
|
||||
public InternalSearchRequest filteringAliases(String[] filteringAliases) {
|
||||
this.filteringAliases = filteringAliases;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String[] types() {
|
||||
|
|
|
@ -21,13 +21,14 @@ package org.elasticsearch.search.internal;
|
|||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.lucene.search.Filter;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.Sort;
|
||||
import org.apache.lucene.search.*;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lucene.search.Queries;
|
||||
import org.elasticsearch.common.lucene.search.function.BoostScoreFunction;
|
||||
import org.elasticsearch.common.lucene.search.function.FunctionScoreQuery;
|
||||
import org.elasticsearch.index.analysis.AnalysisService;
|
||||
import org.elasticsearch.index.cache.field.data.FieldDataCache;
|
||||
import org.elasticsearch.index.cache.filter.FilterCache;
|
||||
|
@ -182,6 +183,9 @@ public class SearchContext implements Releasable {
|
|||
this.indexService = indexService;
|
||||
|
||||
this.searcher = new ContextIndexSearcher(this, engineSearcher);
|
||||
|
||||
// initialize the filtering alias based on the provided filters
|
||||
aliasFilter = indexService.aliasesService().aliasFilter(request.filteringAliases());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -205,6 +209,28 @@ public class SearchContext implements Releasable {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Should be called before executing the main query and after all other parameters have been set.
|
||||
*/
|
||||
public void preProcess() {
|
||||
if (query() == null) {
|
||||
parsedQuery(ParsedQuery.MATCH_ALL_PARSED_QUERY);
|
||||
}
|
||||
if (queryBoost() != 1.0f) {
|
||||
parsedQuery(new ParsedQuery(new FunctionScoreQuery(query(), new BoostScoreFunction(queryBoost)), parsedQuery()));
|
||||
}
|
||||
Filter searchFilter = mapperService().searchFilter(types());
|
||||
if (searchFilter != null) {
|
||||
if (Queries.isMatchAllQuery(query())) {
|
||||
Query q = new DeletionAwareConstantScoreQuery(filterCache().cache(searchFilter));
|
||||
q.setBoost(query().getBoost());
|
||||
parsedQuery(new ParsedQuery(q, parsedQuery()));
|
||||
} else {
|
||||
parsedQuery(new ParsedQuery(new FilteredQuery(query(), filterCache().cache(searchFilter)), parsedQuery()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public long id() {
|
||||
return this.id;
|
||||
}
|
||||
|
@ -383,11 +409,6 @@ public class SearchContext implements Releasable {
|
|||
return this.filter;
|
||||
}
|
||||
|
||||
public SearchContext aliasFilter(Filter aliasFilter) {
|
||||
this.aliasFilter = aliasFilter;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Filter aliasFilter() {
|
||||
return aliasFilter;
|
||||
}
|
||||
|
|
|
@ -20,14 +20,13 @@
|
|||
package org.elasticsearch.search.query;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.lucene.search.*;
|
||||
import org.apache.lucene.search.Collector;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.apache.lucene.search.TotalHitCountCollector;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.search.Queries;
|
||||
import org.elasticsearch.common.lucene.search.function.BoostScoreFunction;
|
||||
import org.elasticsearch.common.lucene.search.function.FunctionScoreQuery;
|
||||
import org.elasticsearch.index.query.ParsedQuery;
|
||||
import org.elasticsearch.search.SearchParseElement;
|
||||
import org.elasticsearch.search.SearchPhase;
|
||||
import org.elasticsearch.search.facet.FacetPhase;
|
||||
|
@ -75,22 +74,7 @@ public class QueryPhase implements SearchPhase {
|
|||
|
||||
@Override
|
||||
public void preProcess(SearchContext context) {
|
||||
if (context.query() == null) {
|
||||
context.parsedQuery(ParsedQuery.MATCH_ALL_PARSED_QUERY);
|
||||
}
|
||||
if (context.queryBoost() != 1.0f) {
|
||||
context.parsedQuery(new ParsedQuery(new FunctionScoreQuery(context.query(), new BoostScoreFunction(context.queryBoost())), context.parsedQuery()));
|
||||
}
|
||||
Filter searchFilter = context.mapperService().searchFilter(context.types());
|
||||
if (searchFilter != null) {
|
||||
if (Queries.isMatchAllQuery(context.query())) {
|
||||
Query q = new DeletionAwareConstantScoreQuery(context.filterCache().cache(searchFilter));
|
||||
q.setBoost(context.query().getBoost());
|
||||
context.parsedQuery(new ParsedQuery(q, context.parsedQuery()));
|
||||
} else {
|
||||
context.parsedQuery(new ParsedQuery(new FilteredQuery(context.query(), context.filterCache().cache(searchFilter)), context.parsedQuery()));
|
||||
}
|
||||
}
|
||||
context.preProcess();
|
||||
facetPhase.preProcess(context);
|
||||
}
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@ public class MatchAllDocsFilterTests {
|
|||
IndexSearcher searcher = new IndexSearcher(reader);
|
||||
|
||||
DeletionAwareConstantScoreQuery query = new DeletionAwareConstantScoreQuery(Queries.MATCH_ALL_FILTER);
|
||||
long count = Lucene.count(searcher, query, -1);
|
||||
long count = Lucene.count(searcher, query);
|
||||
assertThat(count, equalTo(2l));
|
||||
|
||||
reader.close();
|
||||
|
|
|
@ -57,7 +57,7 @@ public class MoreLikeThisQueryTests {
|
|||
mltQuery.setLikeText("lucene");
|
||||
mltQuery.setMinTermFrequency(1);
|
||||
mltQuery.setMinDocFreq(1);
|
||||
long count = Lucene.count(searcher, mltQuery, -1);
|
||||
long count = Lucene.count(searcher, mltQuery);
|
||||
assertThat(count, equalTo(2l));
|
||||
|
||||
reader.close();
|
||||
|
|
|
@ -48,22 +48,22 @@ public class MultiPhrasePrefixQueryTests {
|
|||
|
||||
MultiPhrasePrefixQuery query = new MultiPhrasePrefixQuery();
|
||||
query.add(new Term("field", "aa"));
|
||||
assertThat(Lucene.count(searcher, query, 0), equalTo(1l));
|
||||
assertThat(Lucene.count(searcher, query), equalTo(1l));
|
||||
|
||||
query = new MultiPhrasePrefixQuery();
|
||||
query.add(new Term("field", "aaa"));
|
||||
query.add(new Term("field", "bb"));
|
||||
assertThat(Lucene.count(searcher, query, 0), equalTo(1l));
|
||||
assertThat(Lucene.count(searcher, query), equalTo(1l));
|
||||
|
||||
query = new MultiPhrasePrefixQuery();
|
||||
query.setSlop(1);
|
||||
query.add(new Term("field", "aaa"));
|
||||
query.add(new Term("field", "cc"));
|
||||
assertThat(Lucene.count(searcher, query, 0), equalTo(1l));
|
||||
assertThat(Lucene.count(searcher, query), equalTo(1l));
|
||||
|
||||
query = new MultiPhrasePrefixQuery();
|
||||
query.setSlop(1);
|
||||
query.add(new Term("field", "xxx"));
|
||||
assertThat(Lucene.count(searcher, query, 0), equalTo(0l));
|
||||
assertThat(Lucene.count(searcher, query), equalTo(0l));
|
||||
}
|
||||
}
|
|
@ -65,8 +65,8 @@ public class FilterCacheTests {
|
|||
|
||||
reader = refreshReader(reader);
|
||||
IndexSearcher searcher = new IndexSearcher(reader);
|
||||
assertThat(Lucene.count(searcher, new ConstantScoreQuery(filterCache.cache(new TermFilter(new Term("id", "1")))), -1), equalTo(1l));
|
||||
assertThat(Lucene.count(searcher, new FilteredQuery(new MatchAllDocsQuery(), filterCache.cache(new TermFilter(new Term("id", "1")))), -1), equalTo(1l));
|
||||
assertThat(Lucene.count(searcher, new ConstantScoreQuery(filterCache.cache(new TermFilter(new Term("id", "1"))))), equalTo(1l));
|
||||
assertThat(Lucene.count(searcher, new FilteredQuery(new MatchAllDocsQuery(), filterCache.cache(new TermFilter(new Term("id", "1"))))), equalTo(1l));
|
||||
|
||||
indexWriter.deleteDocuments(new Term("id", "1"));
|
||||
reader = refreshReader(reader);
|
||||
|
@ -75,9 +75,9 @@ public class FilterCacheTests {
|
|||
Filter cachedFilter = filterCache.cache(filter);
|
||||
long constantScoreCount = filter == cachedFilter ? 0 : 1;
|
||||
// sadly, when caching based on cacheKey with NRT, this fails, that's why we have DeletionAware one
|
||||
assertThat(Lucene.count(searcher, new ConstantScoreQuery(cachedFilter), -1), equalTo(constantScoreCount));
|
||||
assertThat(Lucene.count(searcher, new DeletionAwareConstantScoreQuery(cachedFilter), -1), equalTo(0l));
|
||||
assertThat(Lucene.count(searcher, new FilteredQuery(new MatchAllDocsQuery(), cachedFilter), -1), equalTo(0l));
|
||||
assertThat(Lucene.count(searcher, new ConstantScoreQuery(cachedFilter)), equalTo(constantScoreCount));
|
||||
assertThat(Lucene.count(searcher, new DeletionAwareConstantScoreQuery(cachedFilter)), equalTo(0l));
|
||||
assertThat(Lucene.count(searcher, new FilteredQuery(new MatchAllDocsQuery(), cachedFilter)), equalTo(0l));
|
||||
|
||||
indexWriter.close();
|
||||
}
|
||||
|
|
|
@ -46,7 +46,7 @@ public final class EngineSearcherTotalHitsMatcher extends TypeSafeMatcher<Engine
|
|||
@Override
|
||||
public boolean matchesSafely(Engine.Searcher searcher) {
|
||||
try {
|
||||
long count = Lucene.count(searcher.searcher(), query, -1f);
|
||||
long count = Lucene.count(searcher.searcher(), query);
|
||||
return count == totalHits;
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
|
|
Loading…
Reference in New Issue