Search could leak memory if global ordinals were calculated as part of a search with low level cancellation enabled. QueryPhase registers a cancellation on the reader that is never removed, which ends up being referenced from the global ordinals cache entry. This keeps an indirect reference to the search context. A significant leak can occur when a heavy aggregation (cardinality for instance) is used and a failure occurs during search, in particular if the pages backing the hyperlog++ structure are not recycled when it is closed. This commit also fixes an issue with an unclosed resource and request breaker adjustment in the cardinality aggregation.
This commit is contained in:
parent
f6b3148e5e
commit
867d5f1c68
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.metrics;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.common.breaker.CircuitBreakingException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.search.aggregations.BucketOrder;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.cardinality;
|
||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
|
||||
|
||||
public class CardinalityWithRequestBreakerIT extends ESIntegTestCase {
|
||||
|
||||
/**
|
||||
* Test that searches using cardinality aggregations returns all request breaker memory.
|
||||
*/
|
||||
public void testRequestBreaker() throws Exception {
|
||||
final String requestBreaker = randomIntBetween(1, 10000) + "kb";
|
||||
logger.info("--> Using request breaker setting: {}", requestBreaker);
|
||||
|
||||
indexRandom(true, IntStream.range(0, randomIntBetween(10, 1000))
|
||||
.mapToObj(i ->
|
||||
client().prepareIndex("test", "_doc").setId("id_" + i)
|
||||
.setSource(org.elasticsearch.common.collect.Map.of("field0", randomAlphaOfLength(5), "field1", randomAlphaOfLength(5)))
|
||||
).toArray(IndexRequestBuilder[]::new));
|
||||
|
||||
client().admin().cluster().prepareUpdateSettings()
|
||||
.setTransientSettings(Settings.builder().put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(),
|
||||
requestBreaker))
|
||||
.get();
|
||||
|
||||
try {
|
||||
client().prepareSearch("test")
|
||||
.addAggregation(terms("terms").field("field0.keyword")
|
||||
.collectMode(randomFrom(Aggregator.SubAggCollectionMode.values()))
|
||||
.order(BucketOrder.aggregation("cardinality", randomBoolean()))
|
||||
.subAggregation(cardinality("cardinality").precisionThreshold(randomLongBetween(1, 40000)).field("field1.keyword")))
|
||||
.get();
|
||||
} catch (ElasticsearchException e) {
|
||||
if (ExceptionsHelper.unwrap(e, CircuitBreakingException.class) == null) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
client().admin().cluster().prepareUpdateSettings()
|
||||
.setTransientSettings(Settings.builder().putNull(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey()))
|
||||
.get();
|
||||
|
||||
// validation done by InternalTestCluster.ensureEstimatedStats()
|
||||
}
|
||||
}
|
|
@ -114,6 +114,7 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker {
|
|||
this.addWithoutBreaking(-bytes);
|
||||
throw e;
|
||||
}
|
||||
assert newUsed >= 0 : "Used bytes: [" + newUsed + "] must be >= 0";
|
||||
return newUsed;
|
||||
}
|
||||
|
||||
|
|
|
@ -191,7 +191,7 @@ final class DefaultSearchContext extends SearchContext {
|
|||
|
||||
@Override
|
||||
public void doClose() {
|
||||
Releasables.close(engineSearcher);
|
||||
Releasables.close(engineSearcher, searcher);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -124,8 +124,8 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue
|
|||
if (collector != null) {
|
||||
try {
|
||||
collector.postCollect();
|
||||
collector.close();
|
||||
} finally {
|
||||
collector.close();
|
||||
collector = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.lucene.util.BitSetIterator;
|
|||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.CombinedBitSet;
|
||||
import org.apache.lucene.util.SparseFixedBitSet;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
|
||||
import org.elasticsearch.search.DocValueFormat;
|
||||
import org.elasticsearch.search.dfs.AggregatedDfs;
|
||||
|
@ -70,7 +71,7 @@ import java.util.Set;
|
|||
/**
|
||||
* Context-aware extension of {@link IndexSearcher}.
|
||||
*/
|
||||
public class ContextIndexSearcher extends IndexSearcher {
|
||||
public class ContextIndexSearcher extends IndexSearcher implements Releasable {
|
||||
/**
|
||||
* The interval at which we check for search cancellation when we cannot use
|
||||
* a {@link CancellableBulkScorer}. See {@link #intersectScorerAndBitSet}.
|
||||
|
@ -118,6 +119,19 @@ public class ContextIndexSearcher extends IndexSearcher {
|
|||
this.cancellable.remove(action);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// clear the list of cancellables when closing the owning search context, since the ExitableDirectoryReader might be cached (for
|
||||
// instance in fielddata cache).
|
||||
// A cancellable can contain an indirect reference to the search context, which potentially retains a significant amount
|
||||
// of memory.
|
||||
this.cancellable.clear();
|
||||
}
|
||||
|
||||
public boolean hasCancellations() {
|
||||
return this.cancellable.isEnabled();
|
||||
}
|
||||
|
||||
public void setAggregatedDfs(AggregatedDfs aggregatedDfs) {
|
||||
this.aggregatedDfs = aggregatedDfs;
|
||||
}
|
||||
|
@ -361,5 +375,9 @@ public class ContextIndexSearcher extends IndexSearcher {
|
|||
public boolean isEnabled() {
|
||||
return runnables.isEmpty() == false;
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
runnables.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,10 +58,14 @@ import org.elasticsearch.search.rescore.RescoreContext;
|
|||
import org.elasticsearch.search.slice.SliceBuilder;
|
||||
import org.elasticsearch.search.sort.SortAndFormats;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Matchers.eq;
|
||||
|
@ -203,4 +207,47 @@ public class DefaultSearchContextTests extends ESTestCase {
|
|||
|
||||
}
|
||||
}
|
||||
|
||||
public void testClearQueryCancellationsOnClose() throws IOException {
|
||||
TimeValue timeout = new TimeValue(randomIntBetween(1, 100));
|
||||
ShardSearchRequest shardSearchRequest = mock(ShardSearchRequest.class);
|
||||
when(shardSearchRequest.searchType()).thenReturn(SearchType.DEFAULT);
|
||||
ShardId shardId = new ShardId("index", UUID.randomUUID().toString(), 1);
|
||||
when(shardSearchRequest.shardId()).thenReturn(shardId);
|
||||
|
||||
ThreadPool threadPool = new TestThreadPool(this.getClass().getName());
|
||||
IndexShard indexShard = mock(IndexShard.class);
|
||||
QueryCachingPolicy queryCachingPolicy = mock(QueryCachingPolicy.class);
|
||||
when(indexShard.getQueryCachingPolicy()).thenReturn(queryCachingPolicy);
|
||||
when(indexShard.getThreadPool()).thenReturn(threadPool);
|
||||
|
||||
IndexService indexService = mock(IndexService.class);
|
||||
QueryShardContext queryShardContext = mock(QueryShardContext.class);
|
||||
when(indexService.newQueryShardContext(eq(shardId.id()), anyObject(), anyObject(), anyString())).thenReturn(queryShardContext);
|
||||
|
||||
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
|
||||
|
||||
try (Directory dir = newDirectory();
|
||||
RandomIndexWriter w = new RandomIndexWriter(random(), dir);
|
||||
IndexReader reader = w.getReader();
|
||||
Engine.Searcher searcher = new Engine.Searcher("test", reader,
|
||||
IndexSearcher.getDefaultSimilarity(), IndexSearcher.getDefaultQueryCache(),
|
||||
IndexSearcher.getDefaultQueryCachingPolicy(), reader)) {
|
||||
|
||||
|
||||
SearchShardTarget target = new SearchShardTarget("node", shardId, null, OriginalIndices.NONE);
|
||||
DefaultSearchContext context = new DefaultSearchContext(new SearchContextId(UUIDs.randomBase64UUID(), 1L), shardSearchRequest,
|
||||
target, searcher, null, indexService, indexShard, bigArrays, null, timeout, null, false, Version.CURRENT);
|
||||
|
||||
assertThat(context.searcher().hasCancellations(), is(false));
|
||||
context.searcher().addQueryCancellation(() -> {});
|
||||
assertThat(context.searcher().hasCancellations(), is(true));
|
||||
|
||||
context.close();
|
||||
assertThat(context.searcher().hasCancellations(), is(false));
|
||||
|
||||
} finally {
|
||||
threadPool.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue