diff --git a/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java b/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java index 536d0b15b5f..cc8d7bfe18f 100644 --- a/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java +++ b/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java @@ -302,6 +302,10 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { } } + public long getParentLimit() { + return this.parentSettings.getLimit(); + } + /** * Checks whether the parent breaker has been tripped */ diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 52c58171c04..d967a3f423c 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -534,7 +534,7 @@ public class Node implements Closeable { final SearchService searchService = newSearchService(clusterService, indicesService, threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(), - responseCollectorService); + responseCollectorService, circuitBreakerService); final List> tasksExecutors = pluginsService .filterPlugins(PersistentTaskPlugin.class).stream() @@ -1035,9 +1035,10 @@ public class Node implements Closeable { */ protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, - FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) { + FetchPhase fetchPhase, ResponseCollectorService responseCollectorService, + CircuitBreakerService circuitBreakerService) { return new SearchService(clusterService, indicesService, threadPool, - scriptService, bigArrays, fetchPhase, responseCollectorService); + scriptService, bigArrays, fetchPhase, responseCollectorService, circuitBreakerService); } /** diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index b175149a560..6c6f8ef3188 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -32,6 +32,7 @@ import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -59,6 +60,7 @@ import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; import org.elasticsearch.node.ResponseCollectorService; import org.elasticsearch.script.FieldScript; @@ -197,7 +199,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv public SearchService(ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase, - ResponseCollectorService responseCollectorService) { + ResponseCollectorService responseCollectorService, CircuitBreakerService circuitBreakerService) { Settings settings = clusterService.getSettings(); this.threadPool = threadPool; this.clusterService = clusterService; @@ -207,7 +209,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv this.bigArrays = bigArrays; this.queryPhase = new QueryPhase(); this.fetchPhase = fetchPhase; - this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings); + this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings, + circuitBreakerService.getBreaker(CircuitBreaker.REQUEST)); TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings); setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings)); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java b/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java index b302c40c3bd..1fede42b694 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.aggregations; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Setting; @@ -41,11 +42,14 @@ public class MultiBucketConsumerService { public static final Setting MAX_BUCKET_SETTING = Setting.intSetting("search.max_buckets", DEFAULT_MAX_BUCKETS, 0, Setting.Property.NodeScope, Setting.Property.Dynamic); + private final CircuitBreaker breaker; + private volatile int maxBucket; - public MultiBucketConsumerService(ClusterService clusterService, Settings settings) { - this.maxBucket = MAX_BUCKET_SETTING.get(settings); - clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_BUCKET_SETTING, this::setMaxBucket); + public MultiBucketConsumerService(ClusterService clusterService, Settings settings, CircuitBreaker breaker) { + this.breaker = breaker; + this.maxBucket = MAX_BUCKET_SETTING.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_BUCKET_SETTING, this::setMaxBucket); } private void setMaxBucket(int maxBucket) { @@ -94,11 +98,14 @@ public class MultiBucketConsumerService { */ public static class MultiBucketConsumer implements IntConsumer { private final int limit; + private final CircuitBreaker breaker; + // aggregations execute in a single thread so no atomic here private int count; - public MultiBucketConsumer(int limit) { + public MultiBucketConsumer(int limit, CircuitBreaker breaker) { this.limit = limit; + this.breaker = breaker; } @Override @@ -109,6 +116,11 @@ public class MultiBucketConsumerService { + "] but was [" + count + "]. This limit can be set by changing the [" + MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit); } + + // check parent circuit breaker every 1024 buckets + if (value > 0 && (count & 0x3FF) == 0) { + breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets"); + } } public void reset() { @@ -125,6 +137,6 @@ public class MultiBucketConsumerService { } public MultiBucketConsumer create() { - return new MultiBucketConsumer(maxBucket); + return new MultiBucketConsumer(maxBucket, breaker); } } diff --git a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java index 9918b0c6e61..21e9ad59e32 100644 --- a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.test.ESTestCase; import java.util.concurrent.atomic.AtomicBoolean; @@ -293,6 +294,32 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase { } } + public void testAllocationBucketsBreaker() throws Exception { + Settings clusterSettings = Settings.builder() + .put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "100b") + .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), "false") + .build(); + + try (CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings, + new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) { + + long parentLimitBytes = ((HierarchyCircuitBreakerService) service).getParentLimit(); + assertEquals(new ByteSizeValue(100, ByteSizeUnit.BYTES).getBytes(), parentLimitBytes); + + CircuitBreaker breaker = service.getBreaker(CircuitBreaker.REQUEST); + MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer = + new MultiBucketConsumerService.MultiBucketConsumer(10000, breaker); + + // make sure used bytes is greater than the total circuit breaker limit + breaker.addWithoutBreaking(200); + + CircuitBreakingException exception = + expectThrows(CircuitBreakingException.class, () -> multiBucketConsumer.accept(1024)); + assertThat(exception.getMessage(), containsString("[parent] Data too large, data for [allocated_buckets] would be")); + assertThat(exception.getMessage(), containsString("which is larger than the limit of [100/100b]")); + } + } + private long mb(long size) { return new ByteSizeValue(size, ByteSizeUnit.MB).getBytes(); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 667dfef0dce..8c54d8405e1 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1362,7 +1362,7 @@ public class SnapshotResiliencyTests extends ESTestCase { final SearchTransportService searchTransportService = new SearchTransportService(transportService, SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); final SearchService searchService = new SearchService(clusterService, indicesService, threadPool, scriptService, - bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService); + bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService, new NoneCircuitBreakerService()); actions.put(SearchAction.INSTANCE, new TransportSearchAction(threadPool, transportService, searchService, searchTransportService, new SearchPhaseController(searchService::createReduceContext), clusterService, diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index 201abfac573..3e719960084 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -123,12 +123,14 @@ public class MockNode extends Node { @Override protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, - FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) { + FetchPhase fetchPhase, ResponseCollectorService responseCollectorService, + CircuitBreakerService circuitBreakerService) { if (getPluginsService().filterPlugins(MockSearchService.TestPlugin.class).isEmpty()) { return super.newSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, - responseCollectorService); + responseCollectorService, circuitBreakerService); } - return new MockSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase); + return new MockSearchService(clusterService, indicesService, threadPool, scriptService, + bigArrays, fetchPhase, circuitBreakerService); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java index c1ebb121349..9653f3b66a5 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java @@ -22,6 +22,7 @@ package org.elasticsearch.search; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.node.MockNode; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; @@ -68,8 +69,8 @@ public class MockSearchService extends SearchService { public MockSearchService(ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService, - BigArrays bigArrays, FetchPhase fetchPhase) { - super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, null); + BigArrays bigArrays, FetchPhase fetchPhase, CircuitBreakerService circuitBreakerService) { + super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, null, circuitBreakerService); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 843ee063834..353772050d7 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -34,6 +34,7 @@ import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; @@ -130,7 +131,7 @@ public abstract class AggregatorTestCase extends ESTestCase { IndexSearcher indexSearcher, MappedFieldType... fieldTypes) throws IOException { return createAggregator(aggregationBuilder, indexSearcher, createIndexSettings(), - new MultiBucketConsumer(DEFAULT_MAX_BUCKETS), fieldTypes); + new MultiBucketConsumer(DEFAULT_MAX_BUCKETS, new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)), fieldTypes); } protected A createAggregator(Query query, @@ -139,7 +140,7 @@ public abstract class AggregatorTestCase extends ESTestCase { IndexSettings indexSettings, MappedFieldType... fieldTypes) throws IOException { return createAggregator(query, aggregationBuilder, indexSearcher, indexSettings, - new MultiBucketConsumer(DEFAULT_MAX_BUCKETS), fieldTypes); + new MultiBucketConsumer(DEFAULT_MAX_BUCKETS, new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)), fieldTypes); } protected A createAggregator(Query query, AggregationBuilder aggregationBuilder, @@ -329,7 +330,8 @@ public abstract class AggregatorTestCase extends ESTestCase { AggregationBuilder builder, int maxBucket, MappedFieldType... fieldTypes) throws IOException { - MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket); + MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); C a = createAggregator(query, builder, searcher, indexSettings, bucketConsumer, fieldTypes); a.preCollection(); searcher.search(query, a); @@ -393,11 +395,13 @@ public abstract class AggregatorTestCase extends ESTestCase { List aggs = new ArrayList<> (); Query rewritten = searcher.rewrite(query); Weight weight = searcher.createWeight(rewritten, ScoreMode.COMPLETE, 1f); - MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket); + MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); C root = createAggregator(query, builder, searcher, bucketConsumer, fieldTypes); for (ShardSearcher subSearcher : subSearchers) { - MultiBucketConsumer shardBucketConsumer = new MultiBucketConsumer(maxBucket); + MultiBucketConsumer shardBucketConsumer = new MultiBucketConsumer(maxBucket, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); C a = createAggregator(query, builder, subSearcher, indexSettings, shardBucketConsumer, fieldTypes); a.preCollection(); subSearcher.search(weight, a); @@ -415,7 +419,8 @@ public abstract class AggregatorTestCase extends ESTestCase { Collections.shuffle(aggs, random()); int r = randomIntBetween(1, toReduceSize); List toReduce = aggs.subList(0, r); - MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket); + MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(root.context().bigArrays(), getMockScriptService(), reduceBucketConsumer, false); @@ -425,7 +430,8 @@ public abstract class AggregatorTestCase extends ESTestCase { aggs.add(reduced); } // now do the final reduce - MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket); + MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(root.context().bigArrays(), getMockScriptService(), reduceBucketConsumer, true); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index f9d72e38044..3cb7636c64d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -21,6 +21,7 @@ package org.elasticsearch.test; import org.apache.lucene.util.SetOnce; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; @@ -269,7 +270,8 @@ public abstract class InternalAggregationTestCase Collections.shuffle(toReduce, random()); int r = randomIntBetween(1, toReduceSize); List internalAggregations = toReduce.subList(0, r); - MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS); + MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(bigArrays, mockScriptService, bucketConsumer,false); @SuppressWarnings("unchecked") @@ -285,7 +287,8 @@ public abstract class InternalAggregationTestCase toReduce = new ArrayList<>(toReduce.subList(r, toReduceSize)); toReduce.add(reduced); } - MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS); + MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(bigArrays, mockScriptService, bucketConsumer, true); @SuppressWarnings("unchecked")