Removed CachedDfSource and move the dfs logic into the ContextIndexSearcher

This commit is contained in:
Martijn van Groningen 2015-08-18 22:47:30 +02:00
parent 0fc96ede69
commit f2f95ea115
5 changed files with 55 additions and 148 deletions

View File

@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableMap;
import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocs;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
@ -54,7 +53,6 @@ import org.elasticsearch.common.xcontent.XContentLocation;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fielddata.FieldDataType; import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.fielddata.IndexFieldData;
@ -82,7 +80,6 @@ import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.Template; import org.elasticsearch.script.Template;
import org.elasticsearch.script.mustache.MustacheScriptEngineService; import org.elasticsearch.script.mustache.MustacheScriptEngineService;
import org.elasticsearch.search.dfs.CachedDfSource;
import org.elasticsearch.search.dfs.DfsPhase; import org.elasticsearch.search.dfs.DfsPhase;
import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.*; import org.elasticsearch.search.fetch.*;
@ -412,17 +409,8 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
public QuerySearchResult executeQueryPhase(QuerySearchRequest request) { public QuerySearchResult executeQueryPhase(QuerySearchRequest request) {
final SearchContext context = findContext(request.id()); final SearchContext context = findContext(request.id());
contextProcessing(context); contextProcessing(context);
context.searcher().setAggregatedDfs(request.dfs());
IndexShard indexShard = context.indexShard(); IndexShard indexShard = context.indexShard();
try {
final IndexCache indexCache = indexShard.indexService().cache();
final QueryCachingPolicy cachingPolicy = indexShard.getQueryCachingPolicy();
context.searcher().dfSource(new CachedDfSource(context.searcher().getIndexReader(), request.dfs(), context.similarityService().similarity(),
indexCache.query(), cachingPolicy));
} catch (Throwable e) {
processFailure(context, e);
cleanContext(context);
throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e);
}
ShardSearchStats shardSearchStats = indexShard.searchService(); ShardSearchStats shardSearchStats = indexShard.searchService();
try { try {
shardSearchStats.onPreQueryPhase(context); shardSearchStats.onPreQueryPhase(context);
@ -488,17 +476,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request) { public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request) {
final SearchContext context = findContext(request.id()); final SearchContext context = findContext(request.id());
contextProcessing(context); contextProcessing(context);
try { context.searcher().setAggregatedDfs(request.dfs());
final IndexShard indexShard = context.indexShard();
final IndexCache indexCache = indexShard.indexService().cache();
final QueryCachingPolicy cachingPolicy = indexShard.getQueryCachingPolicy();
context.searcher().dfSource(new CachedDfSource(context.searcher().getIndexReader(), request.dfs(), context.similarityService().similarity(),
indexCache.query(), cachingPolicy));
} catch (Throwable e) {
freeContext(context.id());
cleanContext(context);
throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e);
}
try { try {
ShardSearchStats shardSearchStats = context.indexShard().searchService(); ShardSearchStats shardSearchStats = context.indexShard().searchService();
shardSearchStats.onPreQueryPhase(context); shardSearchStats.onPreQueryPhase(context);

View File

@ -1,97 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.dfs;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.*;
import org.apache.lucene.search.*;
import org.apache.lucene.search.similarities.Similarity;
import java.io.IOException;
import java.util.List;
/**
*
*/
public class CachedDfSource extends IndexSearcher {
private final AggregatedDfs aggregatedDfs;
private final int maxDoc;
public CachedDfSource(IndexReader reader, AggregatedDfs aggregatedDfs, Similarity similarity,
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy) throws IOException {
super(reader);
this.aggregatedDfs = aggregatedDfs;
setSimilarity(similarity);
setQueryCache(queryCache);
setQueryCachingPolicy(queryCachingPolicy);
if (aggregatedDfs.maxDoc() > Integer.MAX_VALUE) {
maxDoc = Integer.MAX_VALUE;
} else {
maxDoc = (int) aggregatedDfs.maxDoc();
}
}
@Override
public TermStatistics termStatistics(Term term, TermContext context) throws IOException {
TermStatistics termStatistics = aggregatedDfs.termStatistics().get(term);
if (termStatistics == null) {
// we don't have stats for this - this might be a must_not clauses etc. that doesn't allow extract terms on the query
return super.termStatistics(term, context);
}
return termStatistics;
}
@Override
public CollectionStatistics collectionStatistics(String field) throws IOException {
CollectionStatistics collectionStatistics = aggregatedDfs.fieldStatistics().get(field);
if (collectionStatistics == null) {
// we don't have stats for this - this might be a must_not clauses etc. that doesn't allow extract terms on the query
return super.collectionStatistics(field);
}
return collectionStatistics;
}
public int maxDoc() {
return this.maxDoc;
}
@Override
public Document doc(int i) {
throw new UnsupportedOperationException();
}
@Override
public void doc(int docID, StoredFieldVisitor fieldVisitor) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public Explanation explain(Weight weight, int doc) {
throw new UnsupportedOperationException();
}
@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
throw new UnsupportedOperationException();
}
}

View File

@ -20,15 +20,13 @@
package org.elasticsearch.search.internal; package org.elasticsearch.search.internal;
import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector; import org.apache.lucene.index.Term;
import org.apache.lucene.search.Explanation; import org.apache.lucene.index.TermContext;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.*;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Weight;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.search.dfs.CachedDfSource; import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.internal.SearchContext.Lifetime; import org.elasticsearch.search.internal.SearchContext.Lifetime;
import java.io.IOException; import java.io.IOException;
@ -46,21 +44,23 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
private final SearchContext searchContext; private final SearchContext searchContext;
private CachedDfSource dfSource; private AggregatedDfs aggregatedDfs;
public ContextIndexSearcher(SearchContext searchContext, Engine.Searcher searcher) { public ContextIndexSearcher(SearchContext searchContext, Engine.Searcher searcher) {
super(searcher.reader()); super(searcher.reader());
in = searcher.searcher(); in = searcher.searcher();
this.searchContext = searchContext; this.searchContext = searchContext;
setSimilarity(searcher.searcher().getSimilarity(true)); setSimilarity(searcher.searcher().getSimilarity(true));
setQueryCache(searchContext.indexShard().indexService().cache().query());
setQueryCachingPolicy(searchContext.indexShard().getQueryCachingPolicy());
} }
@Override @Override
public void close() { public void close() {
} }
public void dfSource(CachedDfSource dfSource) { public void setAggregatedDfs(AggregatedDfs aggregatedDfs) {
this.dfSource = dfSource; this.aggregatedDfs = aggregatedDfs;
} }
@Override @Override
@ -75,10 +75,12 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
@Override @Override
public Weight createNormalizedWeight(Query query, boolean needsScores) throws IOException { public Weight createNormalizedWeight(Query query, boolean needsScores) throws IOException {
// During tests we prefer to use the wrapped IndexSearcher, because then we use the AssertingIndexSearcher
// it is hacky, because if we perform a dfs search, we don't use the wrapped IndexSearcher...
try { try {
// if scores are needed and we have dfs data then use it // if scores are needed and we have dfs data then use it
if (dfSource != null && needsScores) { if (aggregatedDfs != null && needsScores) {
return dfSource.createNormalizedWeight(query, needsScores); return super.createNormalizedWeight(query, needsScores);
} }
return in.createNormalizedWeight(query, needsScores); return in.createNormalizedWeight(query, needsScores);
} catch (Throwable t) { } catch (Throwable t) {
@ -104,4 +106,32 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
searchContext.clearReleasables(Lifetime.COLLECTION); searchContext.clearReleasables(Lifetime.COLLECTION);
} }
} }
@Override
public TermStatistics termStatistics(Term term, TermContext context) throws IOException {
if (aggregatedDfs == null) {
// we are either executing the dfs phase or the search_type doesn't include the dfs phase.
return super.termStatistics(term, context);
}
TermStatistics termStatistics = aggregatedDfs.termStatistics().get(term);
if (termStatistics == null) {
// we don't have stats for this - this might be a must_not clauses etc. that doesn't allow extract terms on the query
return super.termStatistics(term, context);
}
return termStatistics;
}
@Override
public CollectionStatistics collectionStatistics(String field) throws IOException {
if (aggregatedDfs == null) {
// we are either executing the dfs phase or the search_type doesn't include the dfs phase.
return super.collectionStatistics(field);
}
CollectionStatistics collectionStatistics = aggregatedDfs.fieldStatistics().get(field);
if (collectionStatistics == null) {
// we don't have stats for this - this might be a must_not clauses etc. that doesn't allow extract terms on the query
return super.collectionStatistics(field);
}
return collectionStatistics;
}
} }

View File

@ -35,7 +35,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
@ -46,12 +45,9 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Ignore;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
/** /**
* A test that keep a singleton node started for all tests that can be used to get * A test that keep a singleton node started for all tests that can be used to get
@ -225,7 +221,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
BigArrays bigArrays = indexService.injector().getInstance(BigArrays.class); BigArrays bigArrays = indexService.injector().getInstance(BigArrays.class);
ThreadPool threadPool = indexService.injector().getInstance(ThreadPool.class); ThreadPool threadPool = indexService.injector().getInstance(ThreadPool.class);
PageCacheRecycler pageCacheRecycler = indexService.injector().getInstance(PageCacheRecycler.class); PageCacheRecycler pageCacheRecycler = indexService.injector().getInstance(PageCacheRecycler.class);
return new TestSearchContext(threadPool, pageCacheRecycler, bigArrays, indexService, indexService.cache().query(), indexService.fieldData()); return new TestSearchContext(threadPool, pageCacheRecycler, bigArrays, indexService);
} }
/** /**

View File

@ -19,22 +19,19 @@
package org.elasticsearch.test; package org.elasticsearch.test;
import com.carrotsearch.hppc.ObjectObjectAssociativeContainer; import com.carrotsearch.hppc.ObjectObjectAssociativeContainer;
import org.apache.lucene.search.*;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.util.Counter; import org.apache.lucene.util.Counter;
import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cache.recycler.PageCacheRecycler; import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.*; import org.elasticsearch.common.HasContext;
import org.elasticsearch.common.HasContextAndHeaders;
import org.elasticsearch.common.HasHeaders;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache; import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
@ -76,6 +73,7 @@ public class TestSearchContext extends SearchContext {
final BitsetFilterCache fixedBitSetFilterCache; final BitsetFilterCache fixedBitSetFilterCache;
final ThreadPool threadPool; final ThreadPool threadPool;
final Map<Class<?>, Collector> queryCollectors = new HashMap<>(); final Map<Class<?>, Collector> queryCollectors = new HashMap<>();
final IndexShard indexShard;
ContextIndexSearcher searcher; ContextIndexSearcher searcher;
int size; int size;
@ -86,7 +84,7 @@ public class TestSearchContext extends SearchContext {
private final long originNanoTime = System.nanoTime(); private final long originNanoTime = System.nanoTime();
private final Map<String, FetchSubPhaseContext> subPhaseContexts = new HashMap<>(); private final Map<String, FetchSubPhaseContext> subPhaseContexts = new HashMap<>();
public TestSearchContext(ThreadPool threadPool,PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, IndexService indexService, QueryCache filterCache, IndexFieldDataService indexFieldDataService) { public TestSearchContext(ThreadPool threadPool,PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, IndexService indexService) {
super(ParseFieldMatcher.STRICT); super(ParseFieldMatcher.STRICT);
this.pageCacheRecycler = pageCacheRecycler; this.pageCacheRecycler = pageCacheRecycler;
this.bigArrays = bigArrays.withCircuitBreaking(); this.bigArrays = bigArrays.withCircuitBreaking();
@ -94,6 +92,7 @@ public class TestSearchContext extends SearchContext {
this.indexFieldDataService = indexService.fieldData(); this.indexFieldDataService = indexService.fieldData();
this.fixedBitSetFilterCache = indexService.bitsetFilterCache(); this.fixedBitSetFilterCache = indexService.bitsetFilterCache();
this.threadPool = threadPool; this.threadPool = threadPool;
this.indexShard = indexService.shard(0);
} }
public TestSearchContext() { public TestSearchContext() {
@ -104,6 +103,7 @@ public class TestSearchContext extends SearchContext {
this.indexFieldDataService = null; this.indexFieldDataService = null;
this.threadPool = null; this.threadPool = null;
this.fixedBitSetFilterCache = null; this.fixedBitSetFilterCache = null;
this.indexShard = null;
} }
public void setTypes(String... types) { public void setTypes(String... types) {
@ -282,7 +282,7 @@ public class TestSearchContext extends SearchContext {
@Override @Override
public IndexShard indexShard() { public IndexShard indexShard() {
return null; return indexShard;
} }
@Override @Override