Circuit break on aggregation bucket numbers with request breaker

This adds new circuit breaking with the "request" breaker, which adds
circuit breaks based on the number of buckets created during
aggregations. It consists of incrementing during AggregatorBase creation

This also bumps the REQUEST breaker to 60% of the JVM heap now.

The output when circuit breaking an aggregation looks like:

```json
{
  "shard" : 0,
  "index" : "i",
  "node" : "a5AvjUn_TKeTNYl0FyBW2g",
  "reason" : {
    "type" : "exception",
    "reason" : "java.util.concurrent.ExecutionException: QueryPhaseExecutionException[Query Failed [Failed to execute main query]]; nested: CircuitBreakingException[[request] Data too large, data for [<agg [otherthings]>] would be larger than limit of [104857600/100mb]];",
    "caused_by" : {
      "type" : "execution_exception",
      "reason" : "QueryPhaseExecutionException[Query Failed [Failed to execute main query]]; nested: CircuitBreakingException[[request] Data too large, data for [<agg [myagg]>] would be larger than limit of [104857600/100mb]];",
      "caused_by" : {
        "type" : "circuit_breaking_exception",
        "reason" : "[request] Data too large, data for [<agg [otherthings]>] would be larger than limit of [104857600/100mb]",
        "bytes_wanted" : 104860781,
        "bytes_limit" : 104857600
      }
    }
  }
}
```

Relates to #14046
This commit is contained in:
Lee Hinman 2015-10-22 22:21:18 -06:00
parent 075cb970c0
commit 124a9fabe3
5 changed files with 80 additions and 10 deletions

View File

@ -429,6 +429,10 @@ public class BigArrays implements Releasable {
return this.circuitBreakingInstance;
}
public CircuitBreakerService breakerService() {
return this.circuitBreakingInstance.breakerService;
}
private <T extends AbstractBigArray> T resizeInPlace(T array, long newSize) {
final long oldMemSize = array.ramBytesUsed();
array.resize(newSize);

View File

@ -57,7 +57,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
new Setting<>("indices.breaker.fielddata.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope);
public static final Setting<ByteSizeValue> REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING =
Setting.byteSizeSetting("indices.breaker.request.limit", "40%", Property.Dynamic, Property.NodeScope);
Setting.byteSizeSetting("indices.breaker.request.limit", "60%", Property.Dynamic, Property.NodeScope);
public static final Setting<Double> REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING =
Setting.doubleSetting("indices.breaker.request.overhead", 1.0d, 0.0d, Property.Dynamic, Property.NodeScope);
public static final Setting<CircuitBreaker.Type> REQUEST_CIRCUIT_BREAKER_TYPE_SETTING =
@ -98,7 +98,10 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.get(settings)
);
this.parentSettings = new BreakerSettings(CircuitBreaker.PARENT, TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).bytes(), 1.0, CircuitBreaker.Type.PARENT);
this.parentSettings = new BreakerSettings(CircuitBreaker.PARENT,
TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).bytes(), 1.0,
CircuitBreaker.Type.PARENT);
if (logger.isTraceEnabled()) {
logger.trace("parent circuit breaker with settings {}", this.parentSettings);
}
@ -137,7 +140,6 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
registerBreaker(newFielddataSettings);
HierarchyCircuitBreakerService.this.fielddataSettings = newFielddataSettings;
logger.info("Updated breaker settings field data: {}", newFielddataSettings);
}
private boolean validateTotalCircuitBreakerLimit(ByteSizeValue byteSizeValue) {

View File

@ -18,7 +18,10 @@
*/
package org.elasticsearch.search.aggregations;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.aggregations.bucket.BestBucketsDeferringCollector;
import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@ -37,6 +40,9 @@ import java.util.Map;
*/
public abstract class AggregatorBase extends Aggregator {
/** The default "weight" that a bucket takes when performing an aggregation */
public static final int DEFAULT_WEIGHT = 1024 * 5; // 5kb
protected final String name;
protected final Aggregator parent;
protected final AggregationContext context;
@ -48,6 +54,8 @@ public abstract class AggregatorBase extends Aggregator {
private Map<String, Aggregator> subAggregatorbyName;
private DeferringBucketCollector recordingWrapper;
private final List<PipelineAggregator> pipelineAggregators;
private final CircuitBreakerService breakerService;
private boolean failed = false;
/**
* Constructs a new Aggregator.
@ -65,6 +73,7 @@ public abstract class AggregatorBase extends Aggregator {
this.metaData = metaData;
this.parent = parent;
this.context = context;
this.breakerService = context.bigArrays().breakerService();
assert factories != null : "sub-factories provided to BucketAggregator must not be null, use AggragatorFactories.EMPTY instead";
this.subAggregators = factories.createSubAggregators(this);
context.searchContext().addReleasable(this, Lifetime.PHASE);
@ -96,6 +105,14 @@ public abstract class AggregatorBase extends Aggregator {
return false; // unreachable
}
};
try {
this.breakerService
.getBreaker(CircuitBreaker.REQUEST)
.addEstimateBytesAndMaybeBreak(DEFAULT_WEIGHT, "<agg [" + name + "]>");
} catch (CircuitBreakingException cbe) {
this.failed = true;
throw cbe;
}
}
/**
@ -245,7 +262,13 @@ public abstract class AggregatorBase extends Aggregator {
/** Called upon release of the aggregator. */
@Override
public void close() {
try {
doClose();
} finally {
if (!this.failed) {
this.breakerService.getBreaker(CircuitBreaker.REQUEST).addWithoutBreaking(-DEFAULT_WEIGHT);
}
}
}
/** Release instance-specific data. */

View File

@ -48,6 +48,7 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.After;
import org.junit.Before;
@ -59,6 +60,7 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.cardinality;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
@ -316,9 +318,49 @@ public class CircuitBreakerServiceIT extends ESIntegTestCase {
client.prepareSearch("cb-test").setQuery(matchAllQuery()).addAggregation(cardinality("card").field("test")).get();
fail("aggregation should have tripped the breaker");
} catch (Exception e) {
String errMsg = "CircuitBreakingException[[request] Data too large, data for [<reused_arrays>] would be larger than limit of [10/10b]]";
String errMsg = "CircuitBreakingException[[request] Data too large";
assertThat("Exception: [" + e.toString() + "] should contain a CircuitBreakingException",
e.toString(), containsString(errMsg));
errMsg = "would be larger than limit of [10/10b]]";
assertThat("Exception: [" + e.toString() + "] should contain a CircuitBreakingException", e.toString(), containsString(errMsg));
}
}
public void testBucketBreaker() throws Exception {
if (noopBreakerUsed()) {
logger.info("--> noop breakers used, skipping test");
return;
}
assertAcked(prepareCreate("cb-test", 1, Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1))));
Client client = client();
// Make request breaker limited to a small amount
Settings resetSettings = Settings.builder()
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "100b")
.build();
assertAcked(client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings));
// index some different terms so we have some field data for loading
int docCount = scaledRandomIntBetween(100, 1000);
List<IndexRequestBuilder> reqs = new ArrayList<>();
for (long id = 0; id < docCount; id++) {
reqs.add(client.prepareIndex("cb-test", "type", Long.toString(id)).setSource("test", id));
}
indexRandom(true, reqs);
// A terms aggregation on the "test" field should trip the bucket circuit breaker
try {
SearchResponse resp = client.prepareSearch("cb-test")
.setQuery(matchAllQuery())
.addAggregation(terms("my_terms").field("test"))
.get();
assertTrue("there should be shard failures", resp.getFailedShards() > 0);
fail("aggregation should have tripped the breaker");
} catch (Exception e) {
String errMsg = "CircuitBreakingException[[request] " +
"Data too large, data for [<agg [my_terms]>] would be larger than limit of [100/100b]]";
assertThat("Exception: " + e.toString() + " should contain a CircuitBreakingException",
e.toString(), containsString(errMsg));
}
}

View File

@ -47,7 +47,7 @@ request) from exceeding a certain amount of memory.
`indices.breaker.request.limit`::
Limit for request breaker, defaults to 40% of JVM heap
Limit for request breaker, defaults to 60% of JVM heap
`indices.breaker.request.overhead`::
@ -74,4 +74,3 @@ memory on a node. The memory usage is based on the content length of the request
[[http-circuit-breaker]]
[float]