diff --git a/core/src/main/java/org/elasticsearch/common/util/BigArrays.java b/core/src/main/java/org/elasticsearch/common/util/BigArrays.java index da49aac5ed0..6a15a3d9000 100644 --- a/core/src/main/java/org/elasticsearch/common/util/BigArrays.java +++ b/core/src/main/java/org/elasticsearch/common/util/BigArrays.java @@ -429,6 +429,10 @@ public class BigArrays implements Releasable { return this.circuitBreakingInstance; } + public CircuitBreakerService breakerService() { + return this.circuitBreakingInstance.breakerService; + } + private T resizeInPlace(T array, long newSize) { final long oldMemSize = array.ramBytesUsed(); array.resize(newSize); diff --git a/core/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java b/core/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java index 328b9564a5d..65571482093 100644 --- a/core/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java +++ b/core/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java @@ -57,7 +57,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { new Setting<>("indices.breaker.fielddata.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope); public static final Setting REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING = - Setting.byteSizeSetting("indices.breaker.request.limit", "40%", Property.Dynamic, Property.NodeScope); + Setting.byteSizeSetting("indices.breaker.request.limit", "60%", Property.Dynamic, Property.NodeScope); public static final Setting REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING = Setting.doubleSetting("indices.breaker.request.overhead", 1.0d, 0.0d, Property.Dynamic, Property.NodeScope); public static final Setting REQUEST_CIRCUIT_BREAKER_TYPE_SETTING = @@ -98,7 +98,10 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.get(settings) ); - this.parentSettings = new BreakerSettings(CircuitBreaker.PARENT, TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).bytes(), 1.0, CircuitBreaker.Type.PARENT); + this.parentSettings = new BreakerSettings(CircuitBreaker.PARENT, + TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).bytes(), 1.0, + CircuitBreaker.Type.PARENT); + if (logger.isTraceEnabled()) { logger.trace("parent circuit breaker with settings {}", this.parentSettings); } @@ -137,7 +140,6 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { registerBreaker(newFielddataSettings); HierarchyCircuitBreakerService.this.fielddataSettings = newFielddataSettings; logger.info("Updated breaker settings field data: {}", newFielddataSettings); - } private boolean validateTotalCircuitBreakerLimit(ByteSizeValue byteSizeValue) { @@ -184,7 +186,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { } // Manually add the parent breaker settings since they aren't part of the breaker map allStats.add(new CircuitBreakerStats(CircuitBreaker.PARENT, parentSettings.getLimit(), - parentEstimated, 1.0, parentTripCount.get())); + parentEstimated, 1.0, parentTripCount.get())); return new AllCircuitBreakerStats(allStats.toArray(new CircuitBreakerStats[allStats.size()])); } @@ -207,8 +209,8 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { if (totalUsed > parentLimit) { this.parentTripCount.incrementAndGet(); throw new CircuitBreakingException("[parent] Data too large, data for [" + - label + "] would be larger than limit of [" + - parentLimit + "/" + new ByteSizeValue(parentLimit) + "]", + label + "] would be larger than limit of [" + + parentLimit + "/" + new ByteSizeValue(parentLimit) + "]", totalUsed, parentLimit); } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java index 04023b04977..c99da85f331 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java @@ -18,7 +18,10 @@ */ package org.elasticsearch.search.aggregations; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.search.aggregations.bucket.BestBucketsDeferringCollector; import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -37,6 +40,9 @@ import java.util.Map; */ public abstract class AggregatorBase extends Aggregator { + /** The default "weight" that a bucket takes when performing an aggregation */ + public static final int DEFAULT_WEIGHT = 1024 * 5; // 5kb + protected final String name; protected final Aggregator parent; protected final AggregationContext context; @@ -48,6 +54,8 @@ public abstract class AggregatorBase extends Aggregator { private Map subAggregatorbyName; private DeferringBucketCollector recordingWrapper; private final List pipelineAggregators; + private final CircuitBreakerService breakerService; + private boolean failed = false; /** * Constructs a new Aggregator. @@ -65,6 +73,7 @@ public abstract class AggregatorBase extends Aggregator { this.metaData = metaData; this.parent = parent; this.context = context; + this.breakerService = context.bigArrays().breakerService(); assert factories != null : "sub-factories provided to BucketAggregator must not be null, use AggragatorFactories.EMPTY instead"; this.subAggregators = factories.createSubAggregators(this); context.searchContext().addReleasable(this, Lifetime.PHASE); @@ -96,6 +105,14 @@ public abstract class AggregatorBase extends Aggregator { return false; // unreachable } }; + try { + this.breakerService + .getBreaker(CircuitBreaker.REQUEST) + .addEstimateBytesAndMaybeBreak(DEFAULT_WEIGHT, ""); + } catch (CircuitBreakingException cbe) { + this.failed = true; + throw cbe; + } } /** @@ -245,7 +262,13 @@ public abstract class AggregatorBase extends Aggregator { /** Called upon release of the aggregator. */ @Override public void close() { - doClose(); + try { + doClose(); + } finally { + if (!this.failed) { + this.breakerService.getBreaker(CircuitBreaker.REQUEST).addWithoutBreaking(-DEFAULT_WEIGHT); + } + } } /** Release instance-specific data. */ diff --git a/core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java b/core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java index b448f35c21b..361bca6076a 100644 --- a/core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java +++ b/core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java @@ -48,6 +48,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.junit.After; import org.junit.Before; @@ -59,6 +60,7 @@ import java.util.concurrent.TimeUnit; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.search.aggregations.AggregationBuilders.cardinality; +import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures; @@ -316,9 +318,49 @@ public class CircuitBreakerServiceIT extends ESIntegTestCase { client.prepareSearch("cb-test").setQuery(matchAllQuery()).addAggregation(cardinality("card").field("test")).get(); fail("aggregation should have tripped the breaker"); } catch (Exception e) { - String errMsg = "CircuitBreakingException[[request] Data too large, data for [] would be larger than limit of [10/10b]]"; + String errMsg = "CircuitBreakingException[[request] Data too large"; assertThat("Exception: [" + e.toString() + "] should contain a CircuitBreakingException", e.toString(), containsString(errMsg)); + errMsg = "would be larger than limit of [10/10b]]"; + assertThat("Exception: [" + e.toString() + "] should contain a CircuitBreakingException", e.toString(), containsString(errMsg)); + } + } + + public void testBucketBreaker() throws Exception { + if (noopBreakerUsed()) { + logger.info("--> noop breakers used, skipping test"); + return; + } + assertAcked(prepareCreate("cb-test", 1, Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1)))); + Client client = client(); + + // Make request breaker limited to a small amount + Settings resetSettings = Settings.builder() + .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "100b") + .build(); + assertAcked(client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings)); + + // index some different terms so we have some field data for loading + int docCount = scaledRandomIntBetween(100, 1000); + List reqs = new ArrayList<>(); + for (long id = 0; id < docCount; id++) { + reqs.add(client.prepareIndex("cb-test", "type", Long.toString(id)).setSource("test", id)); + } + indexRandom(true, reqs); + + // A terms aggregation on the "test" field should trip the bucket circuit breaker + try { + SearchResponse resp = client.prepareSearch("cb-test") + .setQuery(matchAllQuery()) + .addAggregation(terms("my_terms").field("test")) + .get(); + assertTrue("there should be shard failures", resp.getFailedShards() > 0); + fail("aggregation should have tripped the breaker"); + } catch (Exception e) { + String errMsg = "CircuitBreakingException[[request] " + + "Data too large, data for [] would be larger than limit of [100/100b]]"; + assertThat("Exception: " + e.toString() + " should contain a CircuitBreakingException", + e.toString(), containsString(errMsg)); } } diff --git a/docs/reference/modules/indices/circuit_breaker.asciidoc b/docs/reference/modules/indices/circuit_breaker.asciidoc index 762833d527b..fed1c350274 100644 --- a/docs/reference/modules/indices/circuit_breaker.asciidoc +++ b/docs/reference/modules/indices/circuit_breaker.asciidoc @@ -47,7 +47,7 @@ request) from exceeding a certain amount of memory. `indices.breaker.request.limit`:: - Limit for request breaker, defaults to 40% of JVM heap + Limit for request breaker, defaults to 60% of JVM heap `indices.breaker.request.overhead`:: @@ -74,4 +74,3 @@ memory on a node. The memory usage is based on the content length of the request [[http-circuit-breaker]] [float] -