[percolator] Fix memory leak when percolator uses bitset or field data cache.
The percolator doesn't close the IndexReader of the memory index any more. Prior to 2.x the percolator had its own SearchContext (PercolatorContext) that did this, but that was removed when the percolator was refactored as part of the 5.0 release. I think an alternative way to fix this is to let percolator not use the bitset and fielddata caches, that way we prevent the memory leak. Closes #24108
This commit is contained in:
parent
51b33f1fd5
commit
c17de49a6d
|
@ -121,8 +121,7 @@ public final class BitsetFilterCache extends AbstractIndexComponent implements I
|
|||
}
|
||||
final IndexReader.CacheKey coreCacheReader = cacheHelper.getKey();
|
||||
final ShardId shardId = ShardUtils.extractShardId(context.reader());
|
||||
if (shardId != null // can't require it because of the percolator
|
||||
&& indexSettings.getIndex().equals(shardId.getIndex()) == false) {
|
||||
if (indexSettings.getIndex().equals(shardId.getIndex()) == false) {
|
||||
// insanity
|
||||
throw new IllegalStateException("Trying to load bit set for index " + shardId.getIndex()
|
||||
+ " with cache of index " + indexSettings.getIndex());
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.elasticsearch.search.aggregations;
|
||||
|
||||
import org.apache.lucene.index.CompositeReaderContext;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.IndexReaderContext;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.search.Collector;
|
||||
|
@ -31,8 +32,10 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.MockBigArrays;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
|
||||
import org.elasticsearch.index.cache.bitset.BitsetFilterCache.Listener;
|
||||
|
@ -48,6 +51,7 @@ import org.elasticsearch.index.mapper.ObjectMapper;
|
|||
import org.elasticsearch.index.mapper.ObjectMapper.Nested;
|
||||
import org.elasticsearch.index.query.QueryShardContext;
|
||||
import org.elasticsearch.index.query.support.NestedScope;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
||||
|
@ -289,4 +293,8 @@ public abstract class AggregatorTestCase extends ESTestCase {
|
|||
return "ShardSearcher(" + ctx.get(0) + ")";
|
||||
}
|
||||
}
|
||||
|
||||
protected static DirectoryReader wrap(DirectoryReader directoryReader) throws IOException {
|
||||
return ElasticsearchDirectoryReader.wrap(directoryReader, new ShardId(new Index("_index", "_na_"), 0));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ public class NestedAggregatorTests extends AggregatorTestCase {
|
|||
try (RandomIndexWriter iw = new RandomIndexWriter(random(), directory)) {
|
||||
// intentionally not writing any docs
|
||||
}
|
||||
try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
||||
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
|
||||
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG,
|
||||
NESTED_OBJECT);
|
||||
MaxAggregationBuilder maxAgg = new MaxAggregationBuilder(MAX_AGG_NAME)
|
||||
|
@ -112,7 +112,7 @@ public class NestedAggregatorTests extends AggregatorTestCase {
|
|||
}
|
||||
iw.commit();
|
||||
}
|
||||
try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
||||
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
|
||||
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG,
|
||||
NESTED_OBJECT);
|
||||
MaxAggregationBuilder maxAgg = new MaxAggregationBuilder(MAX_AGG_NAME)
|
||||
|
@ -160,7 +160,7 @@ public class NestedAggregatorTests extends AggregatorTestCase {
|
|||
}
|
||||
iw.commit();
|
||||
}
|
||||
try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
||||
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
|
||||
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG,
|
||||
NESTED_OBJECT + "." + NESTED_OBJECT2);
|
||||
MaxAggregationBuilder maxAgg = new MaxAggregationBuilder(MAX_AGG_NAME)
|
||||
|
@ -213,7 +213,7 @@ public class NestedAggregatorTests extends AggregatorTestCase {
|
|||
iw.addDocuments(documents);
|
||||
iw.commit();
|
||||
}
|
||||
try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
||||
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
|
||||
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG,
|
||||
NESTED_OBJECT);
|
||||
SumAggregationBuilder sumAgg = new SumAggregationBuilder(SUM_AGG_NAME)
|
||||
|
@ -292,7 +292,7 @@ public class NestedAggregatorTests extends AggregatorTestCase {
|
|||
iw.commit();
|
||||
iw.close();
|
||||
}
|
||||
try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
||||
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
|
||||
|
||||
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG,
|
||||
"nested_field");
|
||||
|
|
|
@ -54,7 +54,7 @@ public class ReverseNestedAggregatorTests extends AggregatorTestCase {
|
|||
try (RandomIndexWriter iw = new RandomIndexWriter(random(), directory)) {
|
||||
// intentionally not writing any docs
|
||||
}
|
||||
try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
||||
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
|
||||
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG,
|
||||
NESTED_OBJECT);
|
||||
ReverseNestedAggregationBuilder reverseNestedBuilder
|
||||
|
@ -117,7 +117,7 @@ public class ReverseNestedAggregatorTests extends AggregatorTestCase {
|
|||
}
|
||||
iw.commit();
|
||||
}
|
||||
try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
||||
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
|
||||
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG,
|
||||
NESTED_OBJECT);
|
||||
ReverseNestedAggregationBuilder reverseNestedBuilder
|
||||
|
|
|
@ -23,16 +23,22 @@ import org.apache.lucene.analysis.Analyzer;
|
|||
import org.apache.lucene.analysis.DelegatingAnalyzerWrapper;
|
||||
import org.apache.lucene.index.BinaryDocValues;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.IndexReaderContext;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.IndexWriterConfig;
|
||||
import org.apache.lucene.index.LeafReader;
|
||||
import org.apache.lucene.index.ReaderUtil;
|
||||
import org.apache.lucene.index.memory.MemoryIndex;
|
||||
import org.apache.lucene.search.BooleanClause;
|
||||
import org.apache.lucene.search.BooleanQuery;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.Scorer;
|
||||
import org.apache.lucene.search.Weight;
|
||||
import org.apache.lucene.search.join.BitSetProducer;
|
||||
import org.apache.lucene.store.RAMDirectory;
|
||||
import org.apache.lucene.util.BitDocIdSet;
|
||||
import org.apache.lucene.util.BitSet;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
|
@ -51,6 +57,8 @@ import org.elasticsearch.common.xcontent.XContentFactory;
|
|||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.analysis.FieldNameAnalyzer;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldData;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
|
||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||
import org.elasticsearch.index.mapper.DocumentMapperForType;
|
||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||
|
@ -62,6 +70,8 @@ import org.elasticsearch.index.query.QueryParseContext;
|
|||
import org.elasticsearch.index.query.QueryRewriteContext;
|
||||
import org.elasticsearch.index.query.QueryShardContext;
|
||||
import org.elasticsearch.index.query.QueryShardException;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
@ -412,12 +422,9 @@ public class PercolateQueryBuilder extends AbstractQueryBuilder<PercolateQueryBu
|
|||
docSearcher.setQueryCache(null);
|
||||
}
|
||||
|
||||
Version indexVersionCreated = context.getIndexSettings().getIndexVersionCreated();
|
||||
boolean mapUnmappedFieldsAsString = context.getIndexSettings()
|
||||
.getValue(PercolatorFieldMapper.INDEX_MAP_UNMAPPED_FIELDS_AS_STRING_SETTING);
|
||||
// We have to make a copy of the QueryShardContext here so we can have a unfrozen version for parsing the legacy
|
||||
// percolator queries
|
||||
QueryShardContext percolateShardContext = new QueryShardContext(context);
|
||||
QueryShardContext percolateShardContext = wrap(context);
|
||||
MappedFieldType fieldType = context.fieldMapper(field);
|
||||
if (fieldType == null) {
|
||||
throw new QueryShardException(context, "field [" + field + "] does not exist");
|
||||
|
@ -503,4 +510,36 @@ public class PercolateQueryBuilder extends AbstractQueryBuilder<PercolateQueryBu
|
|||
};
|
||||
}
|
||||
|
||||
static QueryShardContext wrap(QueryShardContext shardContext) {
|
||||
return new QueryShardContext(shardContext) {
|
||||
|
||||
@Override
|
||||
public BitSetProducer bitsetFilter(Query query) {
|
||||
return context -> {
|
||||
final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context);
|
||||
final IndexSearcher searcher = new IndexSearcher(topLevelContext);
|
||||
searcher.setQueryCache(null);
|
||||
final Weight weight = searcher.createNormalizedWeight(query, false);
|
||||
final Scorer s = weight.scorer(context);
|
||||
|
||||
if (s != null) {
|
||||
return new BitDocIdSet(BitSet.of(s.iterator(), context.reader().maxDoc())).bits();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public <IFD extends IndexFieldData<?>> IFD getForField(MappedFieldType fieldType) {
|
||||
IndexFieldData.Builder builder = fieldType.fielddataBuilder();
|
||||
IndexFieldDataCache cache = new IndexFieldDataCache.None();
|
||||
CircuitBreakerService circuitBreaker = new NoneCircuitBreakerService();
|
||||
return (IFD) builder.build(shardContext.getIndexSettings(), fieldType, cache, circuitBreaker,
|
||||
shardContext.getMapperService());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -26,9 +26,12 @@ import org.elasticsearch.action.search.SearchResponse;
|
|||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
|
||||
import org.elasticsearch.index.fielddata.ScriptDocValues;
|
||||
import org.elasticsearch.index.mapper.MapperParsingException;
|
||||
import org.elasticsearch.index.query.MatchPhraseQueryBuilder;
|
||||
import org.elasticsearch.index.query.MultiMatchQueryBuilder;
|
||||
|
@ -39,6 +42,7 @@ import org.elasticsearch.script.MockScriptPlugin;
|
|||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.script.ScriptType;
|
||||
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
|
||||
import org.elasticsearch.search.lookup.LeafDocLookup;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
|
||||
|
@ -83,6 +87,11 @@ public class PercolatorQuerySearchIT extends ESSingleNodeTestCase {
|
|||
protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
|
||||
Map<String, Function<Map<String, Object>, Object>> scripts = new HashMap<>();
|
||||
scripts.put("1==1", vars -> Boolean.TRUE);
|
||||
scripts.put("use_fielddata_please", vars -> {
|
||||
LeafDocLookup leafDocLookup = (LeafDocLookup) vars.get("_doc");
|
||||
ScriptDocValues scriptDocValues = leafDocLookup.get("employees.name");
|
||||
return "virginia_potts".equals(scriptDocValues.get(0));
|
||||
});
|
||||
return scripts;
|
||||
}
|
||||
}
|
||||
|
@ -606,6 +615,119 @@ public class PercolatorQuerySearchIT extends ESSingleNodeTestCase {
|
|||
assertHitCount(response, 0);
|
||||
}
|
||||
|
||||
public void testPercolateQueryWithNestedDocuments_doNotLeakBitsetCacheEntries() throws Exception {
|
||||
XContentBuilder mapping = XContentFactory.jsonBuilder();
|
||||
mapping.startObject().startObject("properties").startObject("companyname").field("type", "text").endObject()
|
||||
.startObject("employee").field("type", "nested").startObject("properties")
|
||||
.startObject("name").field("type", "text").endObject().endObject().endObject().endObject()
|
||||
.endObject();
|
||||
createIndex("test", client().admin().indices().prepareCreate("test")
|
||||
// to avoid normal document from being cached by BitsetFilterCache
|
||||
.setSettings(Settings.builder().put(BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING.getKey(), false))
|
||||
.addMapping("employee", mapping)
|
||||
.addMapping("queries", "query", "type=percolator")
|
||||
);
|
||||
client().prepareIndex("test", "queries", "q1").setSource(jsonBuilder().startObject()
|
||||
.field("query", QueryBuilders.nestedQuery("employee",
|
||||
QueryBuilders.matchQuery("employee.name", "virginia potts").operator(Operator.AND), ScoreMode.Avg)
|
||||
).endObject())
|
||||
.get();
|
||||
client().admin().indices().prepareRefresh().get();
|
||||
|
||||
for (int i = 0; i < 32; i++) {
|
||||
SearchResponse response = client().prepareSearch()
|
||||
.setQuery(new PercolateQueryBuilder("query", "employee",
|
||||
XContentFactory.jsonBuilder()
|
||||
.startObject().field("companyname", "stark")
|
||||
.startArray("employee")
|
||||
.startObject().field("name", "virginia potts").endObject()
|
||||
.startObject().field("name", "tony stark").endObject()
|
||||
.endArray()
|
||||
.endObject().bytes(), XContentType.JSON))
|
||||
.addSort("_doc", SortOrder.ASC)
|
||||
// size 0, because other wise load bitsets for normal document in FetchPhase#findRootDocumentIfNested(...)
|
||||
.setSize(0)
|
||||
.get();
|
||||
assertHitCount(response, 1);
|
||||
}
|
||||
|
||||
// We can't check via api... because BitsetCacheListener requires that it can extract shardId from index reader
|
||||
// and for percolator it can't do that, but that means we don't keep track of
|
||||
// memory for BitsetCache in case of percolator
|
||||
long bitsetSize = client().admin().cluster().prepareClusterStats().get()
|
||||
.getIndicesStats().getSegments().getBitsetMemoryInBytes();
|
||||
assertEquals("The percolator works with in-memory index and therefor shouldn't use bitset cache", 0L, bitsetSize);
|
||||
}
|
||||
|
||||
public void testPercolateQueryWithNestedDocuments_doLeakFieldDataCacheEntries() throws Exception {
|
||||
XContentBuilder mapping = XContentFactory.jsonBuilder();
|
||||
mapping.startObject();
|
||||
{
|
||||
mapping.startObject("properties");
|
||||
{
|
||||
mapping.startObject("companyname");
|
||||
mapping.field("type", "text");
|
||||
mapping.endObject();
|
||||
}
|
||||
{
|
||||
mapping.startObject("employees");
|
||||
mapping.field("type", "nested");
|
||||
{
|
||||
mapping.startObject("properties");
|
||||
{
|
||||
mapping.startObject("name");
|
||||
mapping.field("type", "text");
|
||||
mapping.field("fielddata", true);
|
||||
mapping.endObject();
|
||||
}
|
||||
mapping.endObject();
|
||||
}
|
||||
mapping.endObject();
|
||||
}
|
||||
mapping.endObject();
|
||||
}
|
||||
mapping.endObject();
|
||||
createIndex("test", client().admin().indices().prepareCreate("test")
|
||||
.addMapping("employee", mapping)
|
||||
.addMapping("queries", "query", "type=percolator")
|
||||
);
|
||||
Script script = new Script(ScriptType.INLINE, MockScriptPlugin.NAME, "use_fielddata_please", Collections.emptyMap());
|
||||
client().prepareIndex("test", "queries", "q1").setSource(jsonBuilder().startObject()
|
||||
.field("query", QueryBuilders.nestedQuery("employees",
|
||||
QueryBuilders.scriptQuery(script), ScoreMode.Avg)
|
||||
).endObject()).get();
|
||||
client().admin().indices().prepareRefresh().get();
|
||||
XContentBuilder doc = jsonBuilder();
|
||||
doc.startObject();
|
||||
{
|
||||
doc.field("companyname", "stark");
|
||||
doc.startArray("employees");
|
||||
{
|
||||
doc.startObject();
|
||||
doc.field("name", "virginia_potts");
|
||||
doc.endObject();
|
||||
}
|
||||
{
|
||||
doc.startObject();
|
||||
doc.field("name", "tony_stark");
|
||||
doc.endObject();
|
||||
}
|
||||
doc.endArray();
|
||||
}
|
||||
doc.endObject();
|
||||
for (int i = 0; i < 32; i++) {
|
||||
SearchResponse response = client().prepareSearch()
|
||||
.setQuery(new PercolateQueryBuilder("query", "employee", doc.bytes(), XContentType.JSON))
|
||||
.addSort("_doc", SortOrder.ASC)
|
||||
.get();
|
||||
assertHitCount(response, 1);
|
||||
}
|
||||
|
||||
long fieldDataSize = client().admin().cluster().prepareClusterStats().get()
|
||||
.getIndicesStats().getFieldData().getMemorySizeInBytes();
|
||||
assertEquals("The percolator works with in-memory index and therefor shouldn't use field-data cache", 0L, fieldDataSize);
|
||||
}
|
||||
|
||||
public void testPercolatorQueryViaMultiSearch() throws Exception {
|
||||
createIndex("test", client().admin().indices().prepareCreate("test")
|
||||
.addMapping("type", "field1", "type=text")
|
||||
|
|
Loading…
Reference in New Issue