From 1c4d07c96f060fd5fbd9e27a799071c178642985 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Fri, 17 Oct 2014 13:03:36 +0200 Subject: [PATCH] Allow setting individual breakers to "noop" breakers This adds a NoopCircuitBreaker, and then adds the settings `indices.breaker.fielddata.type` and `indices.breaker.request.type`, which can be set to "noop" in order to use a breaker that will never break, and incurs no overhead during computation. This also refactors the tests for the CircuitBreakerService to use @Before and @After functions as well as adding settings in ElasticsearchIntegrationTest to occasionally use NOOP breakers for all tests. --- .../ClusterDynamicSettingsModule.java | 1 + .../common/breaker/CircuitBreaker.java | 23 + .../common/breaker/NoopCircuitBreaker.java | 73 ++++ .../indices/breaker/BreakerSettings.java | 11 + .../indices/breaker/CircuitBreakerModule.java | 2 +- .../HierarchyCircuitBreakerService.java | 89 +++- .../breaker/NoneCircuitBreakerService.java | 12 +- .../breaker/CircuitBreakerServiceTests.java | 407 +++++++++--------- .../test/ElasticsearchIntegrationTest.java | 11 + 9 files changed, 409 insertions(+), 220 deletions(-) create mode 100644 src/main/java/org/elasticsearch/common/breaker/NoopCircuitBreaker.java diff --git a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java index 8701eb8abcc..7dd30118866 100644 --- a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java @@ -94,6 +94,7 @@ public class ClusterDynamicSettingsModule extends AbstractModule { clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE); clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE); clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE); + clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING); } public void addDynamicSettings(String... settings) { diff --git a/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java b/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java index 541f6888a15..206a363410c 100644 --- a/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java +++ b/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; +import java.util.Locale; /** * Interface for an object that can be incremented, breaking after some @@ -68,6 +69,28 @@ public interface CircuitBreaker { } } + public static enum Type { + // A regular or child MemoryCircuitBreaker + MEMORY, + // A special parent-type for the hierarchy breaker service + PARENT, + // A breaker where every action is a noop, it never breaks + NOOP; + + public static Type parseValue(String value) { + switch(value.toLowerCase(Locale.ROOT)) { + case "noop": + return Type.NOOP; + case "parent": + return Type.PARENT; + case "memory": + return Type.MEMORY; + default: + throw new ElasticsearchIllegalArgumentException("No CircuitBreaker with type: " + value); + } + } + } + /** * Trip the circuit breaker * @param fieldName name of the field responsible for tripping the breaker diff --git a/src/main/java/org/elasticsearch/common/breaker/NoopCircuitBreaker.java b/src/main/java/org/elasticsearch/common/breaker/NoopCircuitBreaker.java new file mode 100644 index 00000000000..c05ec917ff2 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/breaker/NoopCircuitBreaker.java @@ -0,0 +1,73 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.breaker; + +/** + * A CircuitBreaker that doesn't increment or adjust, and all operations are + * basically noops + */ +public class NoopCircuitBreaker implements CircuitBreaker { + + private final Name name; + + public NoopCircuitBreaker(Name name) { + this.name = name; + } + + @Override + public void circuitBreak(String fieldName, long bytesNeeded) { + // noop + } + + @Override + public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException { + return 0; + } + + @Override + public long addWithoutBreaking(long bytes) { + return 0; + } + + @Override + public long getUsed() { + return 0; + } + + @Override + public long getLimit() { + return 0; + } + + @Override + public double getOverhead() { + return 0; + } + + @Override + public long getTrippedCount() { + return 0; + } + + @Override + public Name getName() { + return this.name; + } +} diff --git a/src/main/java/org/elasticsearch/indices/breaker/BreakerSettings.java b/src/main/java/org/elasticsearch/indices/breaker/BreakerSettings.java index b76e4b45dee..de218ad9918 100644 --- a/src/main/java/org/elasticsearch/indices/breaker/BreakerSettings.java +++ b/src/main/java/org/elasticsearch/indices/breaker/BreakerSettings.java @@ -30,11 +30,17 @@ public class BreakerSettings { private final CircuitBreaker.Name name; private final long limitBytes; private final double overhead; + private final CircuitBreaker.Type type; public BreakerSettings(CircuitBreaker.Name name, long limitBytes, double overhead) { + this(name, limitBytes, overhead, CircuitBreaker.Type.MEMORY); + } + + public BreakerSettings(CircuitBreaker.Name name, long limitBytes, double overhead, CircuitBreaker.Type type) { this.name = name; this.limitBytes = limitBytes; this.overhead = overhead; + this.type = type; } public CircuitBreaker.Name getName() { @@ -49,9 +55,14 @@ public class BreakerSettings { return this.overhead; } + public CircuitBreaker.Type getType() { + return this.type; + } + @Override public String toString() { return "[" + this.name.toString() + + ",type=" + this.type.toString() + ",limit=" + this.limitBytes + "/" + new ByteSizeValue(this.limitBytes) + ",overhead=" + this.overhead + "]"; } diff --git a/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerModule.java b/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerModule.java index 6033015ce54..66a1e746a6d 100644 --- a/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerModule.java +++ b/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerModule.java @@ -24,7 +24,7 @@ import org.elasticsearch.common.settings.Settings; public class CircuitBreakerModule extends AbstractModule { - public static final String IMPL = "indices.breaker.breaker_impl"; + public static final String IMPL = "indices.breaker.type"; private final Settings settings; diff --git a/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java b/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java index 2a7ae00b1c1..8135592c862 100644 --- a/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java +++ b/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java @@ -24,6 +24,7 @@ import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; @@ -53,18 +54,21 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { public static final String FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING = "indices.breaker.fielddata.limit"; public static final String FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.breaker.fielddata.overhead"; + public static final String FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING = "indices.breaker.fielddata.type"; public static final String DEFAULT_FIELDDATA_BREAKER_LIMIT = "60%"; public static final double DEFAULT_FIELDDATA_OVERHEAD_CONSTANT = 1.03; public static final String REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING = "indices.breaker.request.limit"; public static final String REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.breaker.request.overhead"; + public static final String REQUEST_CIRCUIT_BREAKER_TYPE_SETTING = "indices.breaker.request.type"; public static final String DEFAULT_REQUEST_BREAKER_LIMIT = "40%"; + public static final String DEFAULT_BREAKER_TYPE = "memory"; + private volatile BreakerSettings parentSettings; private volatile BreakerSettings fielddataSettings; private volatile BreakerSettings requestSettings; - // Tripped count for when redistribution was attempted but wasn't successful private final AtomicLong parentTripCount = new AtomicLong(0); @@ -92,24 +96,43 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { this.fielddataSettings = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, settings.getAsMemory(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, compatibilityFielddataLimitDefault).bytes(), - settings.getAsDouble(FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, compatibilityFielddataOverheadDefault)); + settings.getAsDouble(FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, compatibilityFielddataOverheadDefault), + CircuitBreaker.Type.parseValue(settings.get(FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING, DEFAULT_BREAKER_TYPE)) + ); this.requestSettings = new BreakerSettings(CircuitBreaker.Name.REQUEST, settings.getAsMemory(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, DEFAULT_REQUEST_BREAKER_LIMIT).bytes(), - settings.getAsDouble(REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.0)); + settings.getAsDouble(REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.0), + CircuitBreaker.Type.parseValue(settings.get(REQUEST_CIRCUIT_BREAKER_TYPE_SETTING, DEFAULT_BREAKER_TYPE)) + ); // Validate the configured settings validateSettings(new BreakerSettings[] {this.requestSettings, this.fielddataSettings}); this.parentSettings = new BreakerSettings(CircuitBreaker.Name.PARENT, - settings.getAsMemory(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, DEFAULT_TOTAL_CIRCUIT_BREAKER_LIMIT).bytes(), 1.0); + settings.getAsMemory(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, DEFAULT_TOTAL_CIRCUIT_BREAKER_LIMIT).bytes(), 1.0, CircuitBreaker.Type.PARENT); if (logger.isTraceEnabled()) { logger.trace("parent circuit breaker with settings {}", this.parentSettings); } Map tempBreakers = new HashMap<>(); - tempBreakers.put(CircuitBreaker.Name.FIELDDATA, new ChildMemoryCircuitBreaker(fielddataSettings, logger, this, CircuitBreaker.Name.FIELDDATA)); - tempBreakers.put(CircuitBreaker.Name.REQUEST, new ChildMemoryCircuitBreaker(requestSettings, logger, this, CircuitBreaker.Name.REQUEST)); + + CircuitBreaker fielddataBreaker; + if (fielddataSettings.getType() == CircuitBreaker.Type.NOOP) { + fielddataBreaker = new NoopCircuitBreaker(CircuitBreaker.Name.FIELDDATA); + } else { + fielddataBreaker = new ChildMemoryCircuitBreaker(fielddataSettings, logger, this, CircuitBreaker.Name.FIELDDATA); + } + + CircuitBreaker requestBreaker; + if (requestSettings.getType() == CircuitBreaker.Type.NOOP) { + requestBreaker = new NoopCircuitBreaker(CircuitBreaker.Name.REQUEST); + } else { + requestBreaker = new ChildMemoryCircuitBreaker(requestSettings, logger, this, CircuitBreaker.Name.REQUEST); + } + + tempBreakers.put(CircuitBreaker.Name.FIELDDATA, fielddataBreaker); + tempBreakers.put(CircuitBreaker.Name.REQUEST, requestBreaker); this.breakers = ImmutableMap.copyOf(tempBreakers); nodeSettingsService.addListener(new ApplySettings()); @@ -121,6 +144,8 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { public void onRefreshSettings(Settings settings) { boolean changed = false; + String newRequestType = settings.get(REQUEST_CIRCUIT_BREAKER_TYPE_SETTING, null); + // Fielddata settings BreakerSettings newFielddataSettings = HierarchyCircuitBreakerService.this.fielddataSettings; ByteSizeValue newFielddataMax = settings.getAsMemory(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, null); @@ -130,19 +155,21 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { long newFielddataLimitBytes = newFielddataMax == null ? HierarchyCircuitBreakerService.this.fielddataSettings.getLimit() : newFielddataMax.bytes(); newFielddataOverhead = newFielddataOverhead == null ? HierarchyCircuitBreakerService.this.fielddataSettings.getOverhead() : newFielddataOverhead; - newFielddataSettings = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, newFielddataLimitBytes, newFielddataOverhead); + newFielddataSettings = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, newFielddataLimitBytes, newFielddataOverhead, + HierarchyCircuitBreakerService.this.fielddataSettings.getType()); } // Request settings BreakerSettings newRequestSettings = HierarchyCircuitBreakerService.this.requestSettings; ByteSizeValue newRequestMax = settings.getAsMemory(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, null); Double newRequestOverhead = settings.getAsDouble(REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, null); - if (newRequestMax != null || newRequestOverhead != null) { + if (newRequestMax != null || newRequestOverhead != null || newRequestType != null) { changed = true; long newRequestLimitBytes = newRequestMax == null ? HierarchyCircuitBreakerService.this.requestSettings.getLimit() : newRequestMax.bytes(); newRequestOverhead = newRequestOverhead == null ? HierarchyCircuitBreakerService.this.requestSettings.getOverhead() : newRequestOverhead; + CircuitBreaker.Type newType = newRequestType == null ? HierarchyCircuitBreakerService.this.requestSettings.getType() : CircuitBreaker.Type.parseValue(newRequestType); - newRequestSettings = new BreakerSettings(CircuitBreaker.Name.REQUEST, newRequestLimitBytes, newRequestOverhead); + newRequestSettings = new BreakerSettings(CircuitBreaker.Name.REQUEST, newRequestLimitBytes, newRequestOverhead, newType); } // Parent settings @@ -151,23 +178,51 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { ByteSizeValue newParentMax = settings.getAsMemory(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, null); if (newParentMax != null && (newParentMax.bytes() != oldParentMax)) { changed = true; - newParentSettings = new BreakerSettings(CircuitBreaker.Name.PARENT, newParentMax.bytes(), 1.0); + newParentSettings = new BreakerSettings(CircuitBreaker.Name.PARENT, newParentMax.bytes(), 1.0, CircuitBreaker.Type.PARENT); } if (changed) { // change all the things - validateSettings(new BreakerSettings[] {newFielddataSettings, newRequestSettings}); + validateSettings(new BreakerSettings[]{newFielddataSettings, newRequestSettings}); logger.info("Updating settings parent: {}, fielddata: {}, request: {}", newParentSettings, newFielddataSettings, newRequestSettings); + CircuitBreaker.Type previousFielddataType = HierarchyCircuitBreakerService.this.fielddataSettings.getType(); + CircuitBreaker.Type previousRequestType = HierarchyCircuitBreakerService.this.requestSettings.getType(); HierarchyCircuitBreakerService.this.parentSettings = newParentSettings; HierarchyCircuitBreakerService.this.fielddataSettings = newFielddataSettings; HierarchyCircuitBreakerService.this.requestSettings = newRequestSettings; + Map tempBreakers = new HashMap<>(); - tempBreakers.put(CircuitBreaker.Name.FIELDDATA, new ChildMemoryCircuitBreaker(newFielddataSettings, - (ChildMemoryCircuitBreaker)HierarchyCircuitBreakerService.this.breakers.get(CircuitBreaker.Name.FIELDDATA), - logger, HierarchyCircuitBreakerService.this, CircuitBreaker.Name.FIELDDATA)); - tempBreakers.put(CircuitBreaker.Name.REQUEST, new ChildMemoryCircuitBreaker(newRequestSettings, - (ChildMemoryCircuitBreaker)HierarchyCircuitBreakerService.this.breakers.get(CircuitBreaker.Name.REQUEST), - logger, HierarchyCircuitBreakerService.this, CircuitBreaker.Name.REQUEST)); + CircuitBreaker fielddataBreaker; + if (newFielddataSettings.getType() == CircuitBreaker.Type.NOOP) { + fielddataBreaker = new NoopCircuitBreaker(CircuitBreaker.Name.FIELDDATA); + } else { + if (previousFielddataType == CircuitBreaker.Type.MEMORY) { + fielddataBreaker = new ChildMemoryCircuitBreaker(newFielddataSettings, + (ChildMemoryCircuitBreaker) HierarchyCircuitBreakerService.this.breakers.get(CircuitBreaker.Name.FIELDDATA), + logger, HierarchyCircuitBreakerService.this, CircuitBreaker.Name.FIELDDATA); + } else { + fielddataBreaker = new ChildMemoryCircuitBreaker(newFielddataSettings, + logger, HierarchyCircuitBreakerService.this, CircuitBreaker.Name.FIELDDATA); + + } + } + + CircuitBreaker requestBreaker; + if (newRequestSettings.getType() == CircuitBreaker.Type.NOOP) { + requestBreaker = new NoopCircuitBreaker(CircuitBreaker.Name.REQUEST); + } else { + if (previousRequestType == CircuitBreaker.Type.MEMORY) { + requestBreaker = new ChildMemoryCircuitBreaker(newRequestSettings, + (ChildMemoryCircuitBreaker)HierarchyCircuitBreakerService.this.breakers.get(CircuitBreaker.Name.REQUEST), + logger, HierarchyCircuitBreakerService.this, CircuitBreaker.Name.REQUEST); + } else { + requestBreaker = new ChildMemoryCircuitBreaker(newRequestSettings, + logger, HierarchyCircuitBreakerService.this, CircuitBreaker.Name.REQUEST); + } + } + + tempBreakers.put(CircuitBreaker.Name.FIELDDATA, fielddataBreaker); + tempBreakers.put(CircuitBreaker.Name.REQUEST, requestBreaker); HierarchyCircuitBreakerService.this.breakers = ImmutableMap.copyOf(tempBreakers); } } diff --git a/src/main/java/org/elasticsearch/indices/breaker/NoneCircuitBreakerService.java b/src/main/java/org/elasticsearch/indices/breaker/NoneCircuitBreakerService.java index c8d76ee9bfd..335216f3bfa 100644 --- a/src/main/java/org/elasticsearch/indices/breaker/NoneCircuitBreakerService.java +++ b/src/main/java/org/elasticsearch/indices/breaker/NoneCircuitBreakerService.java @@ -20,28 +20,22 @@ package org.elasticsearch.indices.breaker; import org.elasticsearch.common.breaker.CircuitBreaker; -import org.elasticsearch.common.breaker.MemoryCircuitBreaker; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.unit.ByteSizeValue; /** * Class that returns a breaker that never breaks */ public class NoneCircuitBreakerService extends CircuitBreakerService { - private final ESLogger logger = Loggers.getLogger(NoneCircuitBreakerService.class); - - private final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue(Long.MAX_VALUE), 0.0, logger); + private final CircuitBreaker breaker = new NoopCircuitBreaker(CircuitBreaker.Name.FIELDDATA); public NoneCircuitBreakerService() { super(ImmutableSettings.EMPTY); } - @Override - public MemoryCircuitBreaker getBreaker(CircuitBreaker.Name name) { + public CircuitBreaker getBreaker(CircuitBreaker.Name name) { return breaker; } diff --git a/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceTests.java b/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceTests.java index 40aaf2899bd..65cf70344ef 100644 --- a/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceTests.java +++ b/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceTests.java @@ -22,21 +22,26 @@ package org.elasticsearch.indices.memory.breaker; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.common.breaker.CircuitBreaker; -import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.indices.breaker.CircuitBreakerStats; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; -import org.elasticsearch.test.junit.annotations.TestLogging; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.util.Arrays; +import java.util.List; +import static com.google.common.collect.Lists.newArrayList; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; @@ -45,7 +50,8 @@ import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures; import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; /** * Integration tests for InternalCircuitBreakerService @@ -53,127 +59,123 @@ import static org.hamcrest.Matchers.*; @ClusterScope(scope = TEST, randomDynamicTemplates = false) public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest { + /** Reset all breaker settings back to their defaults */ + private void reset() { + logger.info("--> resetting breaker settings"); + Settings resetSettings = settingsBuilder() + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, + HierarchyCircuitBreakerService.DEFAULT_FIELDDATA_BREAKER_LIMIT) + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, + HierarchyCircuitBreakerService.DEFAULT_FIELDDATA_OVERHEAD_CONSTANT) + .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, + HierarchyCircuitBreakerService.DEFAULT_REQUEST_BREAKER_LIMIT) + .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.0) + .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING, + HierarchyCircuitBreakerService.DEFAULT_BREAKER_TYPE) + .build(); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + } + + @Before + public void setup() { + reset(); + } + + @After + public void teardown() { + reset(); + } + private String randomRidiculouslySmallLimit() { - // 3 different ways to say 100 bytes return randomFrom(Arrays.asList("100b", "100")); - //, (10000. / JvmInfo.jvmInfo().getMem().getHeapMax().bytes()) + "%")); // this is prone to rounding errors and will fail if JVM memory changes! } @Test - @TestLogging("org.elasticsearch.indices.memory.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") - public void testMemoryBreaker() { + //@TestLogging("indices.breaker:TRACE,index.fielddata:TRACE,common.breaker:TRACE") + public void testMemoryBreaker() throws Exception { assertAcked(prepareCreate("cb-test", 1, settingsBuilder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1)))); final Client client = client(); - try { - // index some different terms so we have some field data for loading - int docCount = scaledRandomIntBetween(300, 1000); - for (long id = 0; id < docCount; id++) { - client.prepareIndex("cb-test", "type", Long.toString(id)) - .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); - } - - // refresh - refresh(); - - // execute a search that loads field data (sorting on the "test" field) - client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}") - .execute().actionGet(); - - // clear field data cache (thus setting the loaded field data back to 0) - client.admin().indices().prepareClearCache("cb-test").setFieldDataCache(true).execute().actionGet(); - - // Update circuit breaker settings - Settings settings = settingsBuilder() - .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, randomRidiculouslySmallLimit()) - .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05) - .build(); - client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet(); - - // execute a search that loads field data (sorting on the "test" field) - // again, this time it should trip the breaker - assertFailures(client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"), - RestStatus.INTERNAL_SERVER_ERROR, - containsString("Data too large, data for [test] would be larger than limit of [100/100b]")); - - NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get(); - int breaks = 0; - for (NodeStats stat : stats.getNodes()) { - CircuitBreakerStats breakerStats = stat.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA); - breaks += breakerStats.getTrippedCount(); - } - assertThat(breaks, greaterThanOrEqualTo(1)); - } finally { - // Reset settings - Settings resetSettings = settingsBuilder() - .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "-1") - .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, - HierarchyCircuitBreakerService.DEFAULT_FIELDDATA_OVERHEAD_CONSTANT) - .build(); - client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + // index some different terms so we have some field data for loading + int docCount = scaledRandomIntBetween(300, 1000); + List reqs = newArrayList(); + for (long id = 0; id < docCount; id++) { + reqs.add(client.prepareIndex("cb-test", "type", Long.toString(id)).setSource("test", "value" + id)); } + indexRandom(true, false, true, reqs); + + // execute a search that loads field data (sorting on the "test" field) + SearchRequestBuilder searchRequest = client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC); + searchRequest.get(); + + // clear field data cache (thus setting the loaded field data back to 0) + client.admin().indices().prepareClearCache("cb-test").setFieldDataCache(true).execute().actionGet(); + + // Update circuit breaker settings + Settings settings = settingsBuilder() + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, randomRidiculouslySmallLimit()) + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet(); + + // execute a search that loads field data (sorting on the "test" field) + // again, this time it should trip the breaker + assertFailures(searchRequest, RestStatus.INTERNAL_SERVER_ERROR, + containsString("Data too large, data for [test] would be larger than limit of [100/100b]")); + + NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get(); + int breaks = 0; + for (NodeStats stat : stats.getNodes()) { + CircuitBreakerStats breakerStats = stat.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA); + breaks += breakerStats.getTrippedCount(); + } + assertThat(breaks, greaterThanOrEqualTo(1)); } @Test - @TestLogging("org.elasticsearch.indices.memory.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") - public void testRamAccountingTermsEnum() { + public void testRamAccountingTermsEnum() throws Exception { final Client client = client(); - try { - // Create an index where the mappings have a field data filter - assertAcked(prepareCreate("ramtest").setSource("{\"mappings\": {\"type\": {\"properties\": {\"test\": " + - "{\"type\": \"string\",\"fielddata\": {\"filter\": {\"regex\": {\"pattern\": \"^value.*\"}}}}}}}}")); + // Create an index where the mappings have a field data filter + assertAcked(prepareCreate("ramtest").setSource("{\"mappings\": {\"type\": {\"properties\": {\"test\": " + + "{\"type\": \"string\",\"fielddata\": {\"filter\": {\"regex\": {\"pattern\": \"^value.*\"}}}}}}}}")); - // Wait 10 seconds for green - client.admin().cluster().prepareHealth("ramtest").setWaitForGreenStatus().setTimeout("10s").execute().actionGet(); + ensureGreen(TimeValue.timeValueSeconds(10), "ramtest"); - // index some different terms so we have some field data for loading - int docCount = scaledRandomIntBetween(300, 1000); - for (long id = 0; id < docCount; id++) { - client.prepareIndex("ramtest", "type", Long.toString(id)) - .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); - } - - // refresh - refresh(); - - // execute a search that loads field data (sorting on the "test" field) - client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}") - .execute().actionGet(); - - // clear field data cache (thus setting the loaded field data back to 0) - client.admin().indices().prepareClearCache("ramtest").setFieldDataCache(true).execute().actionGet(); - - // Update circuit breaker settings - Settings settings = settingsBuilder() - .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, randomRidiculouslySmallLimit()) - .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05) - .build(); - client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet(); - - // execute a search that loads field data (sorting on the "test" field) - // again, this time it should trip the breaker - assertFailures(client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"), - RestStatus.INTERNAL_SERVER_ERROR, - containsString("Data too large, data for [test] would be larger than limit of [100/100b]")); - - NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get(); - int breaks = 0; - for (NodeStats stat : stats.getNodes()) { - CircuitBreakerStats breakerStats = stat.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA); - breaks += breakerStats.getTrippedCount(); - } - assertThat(breaks, greaterThanOrEqualTo(1)); - - } finally { - // Reset settings - Settings resetSettings = settingsBuilder() - .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "-1") - .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, - HierarchyCircuitBreakerService.DEFAULT_FIELDDATA_OVERHEAD_CONSTANT) - .build(); - client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + // index some different terms so we have some field data for loading + int docCount = scaledRandomIntBetween(300, 1000); + List reqs = newArrayList(); + for (long id = 0; id < docCount; id++) { + reqs.add(client.prepareIndex("ramtest", "type", Long.toString(id)).setSource("test", "value" + id)); } + indexRandom(true, reqs); + + // execute a search that loads field data (sorting on the "test" field) + client.prepareSearch("ramtest").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get(); + + // clear field data cache (thus setting the loaded field data back to 0) + client.admin().indices().prepareClearCache("ramtest").setFieldDataCache(true).execute().actionGet(); + + // Update circuit breaker settings + Settings settings = settingsBuilder() + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, randomRidiculouslySmallLimit()) + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet(); + + // execute a search that loads field data (sorting on the "test" field) + // again, this time it should trip the breaker + assertFailures(client.prepareSearch("ramtest").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC), + RestStatus.INTERNAL_SERVER_ERROR, + containsString("Data too large, data for [test] would be larger than limit of [100/100b]")); + + NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get(); + int breaks = 0; + for (NodeStats stat : stats.getNodes()) { + CircuitBreakerStats breakerStats = stat.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA); + breaks += breakerStats.getTrippedCount(); + } + assertThat(breaks, greaterThanOrEqualTo(1)); } /** @@ -181,108 +183,127 @@ public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest { * this case, the fielddata breaker borrows space from the request breaker */ @Test - @TestLogging("org.elasticsearch.indices.memory.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") - public void testParentChecking() { + public void testParentChecking() throws Exception { assertAcked(prepareCreate("cb-test", 1, settingsBuilder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1)))); Client client = client(); + // index some different terms so we have some field data for loading + int docCount = scaledRandomIntBetween(300, 1000); + List reqs = newArrayList(); + for (long id = 0; id < docCount; id++) { + reqs.add(client.prepareIndex("cb-test", "type", Long.toString(id)).setSource("test", "value" + id)); + } + indexRandom(true, reqs); + + // We need the request limit beforehand, just from a single node because the limit should always be the same + long beforeReqLimit = client.admin().cluster().prepareNodesStats().setBreaker(true).get() + .getNodes()[0].getBreaker().getStats(CircuitBreaker.Name.REQUEST).getLimit(); + + Settings resetSettings = settingsBuilder() + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "10b") + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.0) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + + // Perform a search to load field data for the "test" field try { - // index some different terms so we have some field data for loading - int docCount = scaledRandomIntBetween(300, 1000); - for (long id = 0; id < docCount; id++) { - client.prepareIndex("cb-test", "type", Long.toString(id)) - .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); - } - refresh(); + client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get(); + fail("should have thrown an exception"); + } catch (Exception e) { + String errMsg = "[FIELDDATA] Data too large, data for [test] would be larger than limit of [10/10b]"; + assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException", + ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true)); + } - // We need the request limit beforehand, just from a single node because the limit should always be the same - long beforeReqLimit = client.admin().cluster().prepareNodesStats().setBreaker(true).get() - .getNodes()[0].getBreaker().getStats(CircuitBreaker.Name.REQUEST).getLimit(); + assertFailures(client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC), + RestStatus.INTERNAL_SERVER_ERROR, + containsString("Data too large, data for [test] would be larger than limit of [10/10b]")); - Settings resetSettings = settingsBuilder() - .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "10b") - .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.0) - .build(); - client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + // Adjust settings so the parent breaker will fail, but the fielddata breaker doesn't + resetSettings = settingsBuilder() + .put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, "15b") + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "90%") + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.0) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); - // Perform a search to load field data for the "test" field - try { - client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get(); - } catch (Exception e) { - String errMsg = "[FIELDDATA] Data too large, data for [test] would be larger than limit of [10/10b]"; - assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException", - ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true)); - } - - assertFailures(client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"), - RestStatus.INTERNAL_SERVER_ERROR, - containsString("Data too large, data for [test] would be larger than limit of [10/10b]")); - - // Adjust settings so the parent breaker will fail, but the fielddata breaker doesn't - resetSettings = settingsBuilder() - .put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, "15b") - .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "90%") - .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.0) - .build(); - client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); - - // Perform a search to load field data for the "test" field - try { - client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get(); - } catch (Exception e) { - String errMsg = "[PARENT] Data too large, data for [test] would be larger than limit of [15/15b]"; - assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException", - ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true)); - } - - } finally { - Settings resetSettings = settingsBuilder() - .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "-1") - .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, HierarchyCircuitBreakerService.DEFAULT_REQUEST_BREAKER_LIMIT) - .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, - HierarchyCircuitBreakerService.DEFAULT_FIELDDATA_OVERHEAD_CONSTANT) - .build(); - client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + // Perform a search to load field data for the "test" field + try { + client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get(); + fail("should have thrown an exception"); + } catch (Exception e) { + String errMsg = "[PARENT] Data too large, data for [test] would be larger than limit of [15/15b]"; + assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException", + ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true)); } } @Test - @TestLogging("org.elasticsearch.indices.memory.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") - public void testRequestBreaker() { + public void testRequestBreaker() throws Exception { assertAcked(prepareCreate("cb-test", 1, settingsBuilder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1)))); Client client = client(); + // Make request breaker limited to a small amount + Settings resetSettings = settingsBuilder() + .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, "10b") + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + + // index some different terms so we have some field data for loading + int docCount = scaledRandomIntBetween(300, 1000); + List reqs = newArrayList(); + for (long id = 0; id < docCount; id++) { + reqs.add(client.prepareIndex("cb-test", "type", Long.toString(id)).setSource("test", id)); + } + indexRandom(true, reqs); + + // A cardinality aggregation uses BigArrays and thus the REQUEST breaker try { - // Make request breaker limited to a small amount - Settings resetSettings = settingsBuilder() - .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, "10b") - .build(); - client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); - - // index some different terms so we have some field data for loading - int docCount = scaledRandomIntBetween(300, 1000); - for (long id = 0; id < docCount; id++) { - client.prepareIndex("cb-test", "type", Long.toString(id)) - .setSource(MapBuilder.newMapBuilder().put("test", id).map()).execute().actionGet(); - } - refresh(); - - // A cardinality aggregation uses BigArrays and thus the REQUEST breaker - try { - client.prepareSearch("cb-test").setQuery(matchAllQuery()).addAggregation(cardinality("card").field("test")).get(); - fail("aggregation should have tripped the breaker"); - } catch (Exception e) { - String errMsg = "CircuitBreakingException[[REQUEST] Data too large, data for [] would be larger than limit of [10/10b]]"; - assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException", - ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true)); - } - } finally { - Settings resetSettings = settingsBuilder() - .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, - HierarchyCircuitBreakerService.DEFAULT_REQUEST_BREAKER_LIMIT) - .build(); - client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + client.prepareSearch("cb-test").setQuery(matchAllQuery()).addAggregation(cardinality("card").field("test")).get(); + fail("aggregation should have tripped the breaker"); + } catch (Exception e) { + String errMsg = "CircuitBreakingException[[REQUEST] Data too large, data for [] would be larger than limit of [10/10b]]"; + assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException", + ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true)); } } + @Test + public void testNoopRequestBreaker() throws Exception { + assertAcked(prepareCreate("cb-test", 1, settingsBuilder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1)))); + Client client = client(); + + // Make request breaker limited to a small amount + Settings resetSettings = settingsBuilder() + .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, "10b") + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + + // index some different terms so we have some field data for loading + int docCount = scaledRandomIntBetween(300, 1000); + List reqs = newArrayList(); + for (long id = 0; id < docCount; id++) { + reqs.add(client.prepareIndex("cb-test", "type", Long.toString(id)).setSource("test", id)); + } + indexRandom(true, reqs); + + // A cardinality aggregation uses BigArrays and thus the REQUEST breaker + try { + client.prepareSearch("cb-test").setQuery(matchAllQuery()).addAggregation(cardinality("card").field("test")).get(); + fail("aggregation should have tripped the breaker"); + } catch (Exception e) { + String errMsg = "CircuitBreakingException[[REQUEST] Data too large, data for [] would be larger than limit of [10/10b]]"; + assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException", + ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true)); + } + + // Make request breaker into a noop breaker + resetSettings = settingsBuilder() + .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING, "noop") + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + + // A cardinality aggregation uses BigArrays and thus the REQUEST breaker + client.prepareSearch("cb-test").setQuery(matchAllQuery()).addAggregation(cardinality("card").field("test")).get(); + } } diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 3ab54d19017..429b49d5be4 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -60,6 +60,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.ImmutableSettings; @@ -90,6 +91,7 @@ import org.elasticsearch.index.translog.TranslogService; import org.elasticsearch.index.translog.fs.FsTranslog; import org.elasticsearch.index.translog.fs.FsTranslogFile; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.store.IndicesStore; @@ -447,11 +449,20 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase return compatibilityVersion().onOrAfter(Version.V_1_1_0); } + /** Rarely set the request breaker to a Noop breaker */ + protected static void setRandomBreakerSettings(Random random, ImmutableSettings.Builder builder) { + // Rarely + if (RandomInts.randomInt(random, 100) >= 90) { + builder.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING, CircuitBreaker.Type.NOOP); + } + } + private static ImmutableSettings.Builder setRandomSettings(Random random, ImmutableSettings.Builder builder) { setRandomMerge(random, builder); setRandomTranslogSettings(random, builder); setRandomNormsLoading(random, builder); setRandomScriptingSettings(random, builder); + setRandomBreakerSettings(random, builder); if (random.nextBoolean()) { if (random.nextInt(10) == 0) { // do something crazy slow here builder.put(IndicesStore.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, new ByteSizeValue(RandomInts.randomIntBetween(random, 1, 10), ByteSizeUnit.MB));