diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/metrics/CardinalityWithRequestBreakerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/metrics/CardinalityWithRequestBreakerIT.java new file mode 100644 index 00000000000..b81adf5dc57 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/metrics/CardinalityWithRequestBreakerIT.java @@ -0,0 +1,76 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.metrics; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.BucketOrder; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.stream.IntStream; + +import static org.elasticsearch.search.aggregations.AggregationBuilders.cardinality; +import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; + +public class CardinalityWithRequestBreakerIT extends ESIntegTestCase { + + /** + * Test that searches using cardinality aggregations returns all request breaker memory. + */ + public void testRequestBreaker() throws Exception { + final String requestBreaker = randomIntBetween(1, 10000) + "kb"; + logger.info("--> Using request breaker setting: {}", requestBreaker); + + indexRandom(true, IntStream.range(0, randomIntBetween(10, 1000)) + .mapToObj(i -> + client().prepareIndex("test", "_doc").setId("id_" + i) + .setSource(org.elasticsearch.common.collect.Map.of("field0", randomAlphaOfLength(5), "field1", randomAlphaOfLength(5))) + ).toArray(IndexRequestBuilder[]::new)); + + client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), + requestBreaker)) + .get(); + + try { + client().prepareSearch("test") + .addAggregation(terms("terms").field("field0.keyword") + .collectMode(randomFrom(Aggregator.SubAggCollectionMode.values())) + .order(BucketOrder.aggregation("cardinality", randomBoolean())) + .subAggregation(cardinality("cardinality").precisionThreshold(randomLongBetween(1, 40000)).field("field1.keyword"))) + .get(); + } catch (ElasticsearchException e) { + if (ExceptionsHelper.unwrap(e, CircuitBreakingException.class) == null) { + throw e; + } + } + + client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder().putNull(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey())) + .get(); + + // validation done by InternalTestCluster.ensureEstimatedStats() + } +} diff --git a/server/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java b/server/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java index 81b6cbbb56e..205ba2660a7 100644 --- a/server/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java +++ b/server/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java @@ -114,6 +114,7 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker { this.addWithoutBreaking(-bytes); throw e; } + assert newUsed >= 0 : "Used bytes: [" + newUsed + "] must be >= 0"; return newUsed; } diff --git a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index 1f71e2d65ab..33843636509 100644 --- a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -191,7 +191,7 @@ final class DefaultSearchContext extends SearchContext { @Override public void doClose() { - Releasables.close(engineSearcher); + Releasables.close(engineSearcher, searcher); } /** diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregator.java index cd1cbb92a7f..50315b9f939 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregator.java @@ -124,8 +124,8 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue if (collector != null) { try { collector.postCollect(); - collector.close(); } finally { + collector.close(); collector = null; } } diff --git a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java index f30bd14184d..043753e1389 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java @@ -49,6 +49,7 @@ import org.apache.lucene.util.BitSetIterator; import org.apache.lucene.util.Bits; import org.apache.lucene.util.CombinedBitSet; import org.apache.lucene.util.SparseFixedBitSet; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.dfs.AggregatedDfs; @@ -70,7 +71,7 @@ import java.util.Set; /** * Context-aware extension of {@link IndexSearcher}. */ -public class ContextIndexSearcher extends IndexSearcher { +public class ContextIndexSearcher extends IndexSearcher implements Releasable { /** * The interval at which we check for search cancellation when we cannot use * a {@link CancellableBulkScorer}. See {@link #intersectScorerAndBitSet}. @@ -118,6 +119,19 @@ public class ContextIndexSearcher extends IndexSearcher { this.cancellable.remove(action); } + @Override + public void close() { + // clear the list of cancellables when closing the owning search context, since the ExitableDirectoryReader might be cached (for + // instance in fielddata cache). + // A cancellable can contain an indirect reference to the search context, which potentially retains a significant amount + // of memory. + this.cancellable.clear(); + } + + public boolean hasCancellations() { + return this.cancellable.isEnabled(); + } + public void setAggregatedDfs(AggregatedDfs aggregatedDfs) { this.aggregatedDfs = aggregatedDfs; } @@ -361,5 +375,9 @@ public class ContextIndexSearcher extends IndexSearcher { public boolean isEnabled() { return runnables.isEmpty() == false; } + + public void clear() { + runnables.clear(); + } } } diff --git a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java index 9abca647522..bdd8e3fb943 100644 --- a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java @@ -58,10 +58,14 @@ import org.elasticsearch.search.rescore.RescoreContext; import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.search.sort.SortAndFormats; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; import java.util.UUID; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; @@ -203,4 +207,47 @@ public class DefaultSearchContextTests extends ESTestCase { } } + + public void testClearQueryCancellationsOnClose() throws IOException { + TimeValue timeout = new TimeValue(randomIntBetween(1, 100)); + ShardSearchRequest shardSearchRequest = mock(ShardSearchRequest.class); + when(shardSearchRequest.searchType()).thenReturn(SearchType.DEFAULT); + ShardId shardId = new ShardId("index", UUID.randomUUID().toString(), 1); + when(shardSearchRequest.shardId()).thenReturn(shardId); + + ThreadPool threadPool = new TestThreadPool(this.getClass().getName()); + IndexShard indexShard = mock(IndexShard.class); + QueryCachingPolicy queryCachingPolicy = mock(QueryCachingPolicy.class); + when(indexShard.getQueryCachingPolicy()).thenReturn(queryCachingPolicy); + when(indexShard.getThreadPool()).thenReturn(threadPool); + + IndexService indexService = mock(IndexService.class); + QueryShardContext queryShardContext = mock(QueryShardContext.class); + when(indexService.newQueryShardContext(eq(shardId.id()), anyObject(), anyObject(), anyString())).thenReturn(queryShardContext); + + BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + + try (Directory dir = newDirectory(); + RandomIndexWriter w = new RandomIndexWriter(random(), dir); + IndexReader reader = w.getReader(); + Engine.Searcher searcher = new Engine.Searcher("test", reader, + IndexSearcher.getDefaultSimilarity(), IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), reader)) { + + + SearchShardTarget target = new SearchShardTarget("node", shardId, null, OriginalIndices.NONE); + DefaultSearchContext context = new DefaultSearchContext(new SearchContextId(UUIDs.randomBase64UUID(), 1L), shardSearchRequest, + target, searcher, null, indexService, indexShard, bigArrays, null, timeout, null, false, Version.CURRENT); + + assertThat(context.searcher().hasCancellations(), is(false)); + context.searcher().addQueryCancellation(() -> {}); + assertThat(context.searcher().hasCancellations(), is(true)); + + context.close(); + assertThat(context.searcher().hasCancellations(), is(false)); + + } finally { + threadPool.shutdown(); + } + } }