From 15aba60c02c04257243bcc4cec95cff9272ac24d Mon Sep 17 00:00:00 2001 From: Benjamin Trent <ben.w.trent@gmail.com> Date: Fri, 29 May 2020 12:13:46 -0400 Subject: [PATCH] [7.x] Add new circuitbreaker plugin and refactor CircuitBreakerService (#55695) (#57359) * Add new circuitbreaker plugin and refactor CircuitBreakerService (#55695) This commit lays the ground work for plugins supplying their own circuit breakers. It adds a new interface: `CircuitBreakerPlugin`. This interface provides methods for providing custom child CircuitBreaker objects. There are also facilities for allowing dynamic settings for the custom breakers. With the refactor, circuit breakers are no longer replaced on setting changes. Instead, the two mutable settings themselves are `volatile`. Plugins that want to use their custom circuit breaker should keep a reference of their constructed breaker. --- .../breaker/CircuitBreakerServiceIT.java | 35 +--- .../client/transport/TransportClient.java | 1 + .../breaker/ChildMemoryCircuitBreaker.java | 79 ++++---- .../common/breaker/CircuitBreaker.java | 8 + .../common/breaker/NoopCircuitBreaker.java | 3 + .../common/settings/ClusterSettings.java | 4 + .../indices/breaker/BreakerSettings.java | 62 +++++- .../breaker/CircuitBreakerService.java | 5 - .../HierarchyCircuitBreakerService.java | 179 +++++++----------- .../breaker/NoneCircuitBreakerService.java | 4 - .../java/org/elasticsearch/node/Node.java | 20 +- .../plugins/CircuitBreakerPlugin.java | 59 ++++++ .../common/util/BigArraysTests.java | 3 + .../discovery/ZenFaultDetectionTests.java | 2 +- .../indices/breaker/BreakerSettingsTests.java | 60 ++++++ .../HierarchyCircuitBreakerServiceTests.java | 57 +++++- .../breaker/CircuitBreakerUnitTests.java | 17 -- .../org/elasticsearch/node/NodeTests.java | 50 +++++ .../rest/RestControllerTests.java | 1 + .../rest/RestHttpResponseHeadersTests.java | 1 + .../index/shard/IndexShardTestCase.java | 4 +- .../index/engine/FrozenEngineTests.java | 3 + .../bucket/geogrid/GeoGridTilerTests.java | 7 +- 23 files changed, 431 insertions(+), 233 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/plugins/CircuitBreakerPlugin.java create mode 100644 server/src/test/java/org/elasticsearch/indices/breaker/BreakerSettingsTests.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java index 01de6cbd2eb..9d9770e8554 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java @@ -41,8 +41,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.indices.breaker.BreakerSettings; -import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.CircuitBreakerStats; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.rest.RestStatus; @@ -316,37 +314,6 @@ public class CircuitBreakerServiceIT extends ESIntegTestCase { }, 30, TimeUnit.SECONDS); } - public void testCustomCircuitBreakerRegistration() throws Exception { - Iterable<CircuitBreakerService> serviceIter = internalCluster().getInstances(CircuitBreakerService.class); - - final String breakerName = "customBreaker"; - BreakerSettings breakerSettings = new BreakerSettings(breakerName, 8, 1.03); - CircuitBreaker breaker = null; - - for (CircuitBreakerService s : serviceIter) { - s.registerBreaker(breakerSettings); - breaker = s.getBreaker(breakerSettings.getName()); - } - - if (breaker != null) { - try { - breaker.addEstimateBytesAndMaybeBreak(16, "test"); - } catch (CircuitBreakingException e) { - // ignore, we forced a circuit break - } - } - - NodesStatsResponse stats = client().admin().cluster().prepareNodesStats().clear() - .addMetric(BREAKER.metricName()) - .get(); - int breaks = 0; - for (NodeStats stat : stats.getNodes()) { - CircuitBreakerStats breakerStats = stat.getBreaker().getStats(breakerName); - breaks += breakerStats.getTrippedCount(); - } - assertThat(breaks, greaterThanOrEqualTo(1)); - } - public void testCanResetUnreasonableSettings() { if (noopBreakerUsed()) { logger.info("--> noop breakers used, skipping test"); @@ -367,7 +334,7 @@ public class CircuitBreakerServiceIT extends ESIntegTestCase { } - public void testLimitsRequestSize() throws Exception { + public void testLimitsRequestSize() { ByteSizeValue inFlightRequestsLimit = new ByteSizeValue(8, ByteSizeUnit.KB); if (noopBreakerUsed()) { logger.info("--> noop breakers used, skipping test"); diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java index 8bb936aacec..4ed808dd5b7 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -191,6 +191,7 @@ public abstract class TransportClient extends AbstractClient { modules.add(actionModule); CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(), + Collections.emptyList(), settingsModule.getClusterSettings()); resourcesToClose.add(circuitBreakerService); PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings); diff --git a/server/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java b/server/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java index 2aa6cde9691..81b6cbbb56e 100644 --- a/server/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java +++ b/server/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java @@ -20,6 +20,7 @@ package org.elasticsearch.common.breaker; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.indices.breaker.BreakerSettings; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; @@ -31,8 +32,7 @@ import java.util.concurrent.atomic.AtomicLong; */ public class ChildMemoryCircuitBreaker implements CircuitBreaker { - private final long memoryBytesLimit; - private final double overheadConstant; + private volatile LimitAndOverhead limitAndOverhead; private final Durability durability; private final AtomicLong used; private final AtomicLong trippedCount; @@ -40,19 +40,6 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker { private final HierarchyCircuitBreakerService parent; private final String name; - /** - * Create a circuit breaker that will break if the number of estimated - * bytes grows above the limit. All estimations will be multiplied by - * the given overheadConstant. This breaker starts with 0 bytes used. - * @param settings settings to configure this breaker - * @param parent parent circuit breaker service to delegate tripped breakers to - * @param name the name of the breaker - */ - public ChildMemoryCircuitBreaker(BreakerSettings settings, Logger logger, - HierarchyCircuitBreakerService parent, String name) { - this(settings, null, logger, parent, name); - } - /** * Create a circuit breaker that will break if the number of estimated * bytes grows above the limit. All estimations will be multiplied by @@ -61,25 +48,15 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker { * @param settings settings to configure this breaker * @param parent parent circuit breaker service to delegate tripped breakers to * @param name the name of the breaker - * @param oldBreaker the previous circuit breaker to inherit the used value from (starting offset) */ - public ChildMemoryCircuitBreaker(BreakerSettings settings, ChildMemoryCircuitBreaker oldBreaker, - Logger logger, HierarchyCircuitBreakerService parent, String name) { + public ChildMemoryCircuitBreaker(BreakerSettings settings, Logger logger, HierarchyCircuitBreakerService parent, String name) { this.name = name; - this.memoryBytesLimit = settings.getLimit(); - this.overheadConstant = settings.getOverhead(); + this.limitAndOverhead = new LimitAndOverhead(settings.getLimit(), settings.getOverhead()); this.durability = settings.getDurability(); - if (oldBreaker == null) { - this.used = new AtomicLong(0); - this.trippedCount = new AtomicLong(0); - } else { - this.used = oldBreaker.used; - this.trippedCount = oldBreaker.trippedCount; - } + this.used = new AtomicLong(0); + this.trippedCount = new AtomicLong(0); this.logger = logger; - if (logger.isTraceEnabled()) { - logger.trace("creating ChildCircuitBreaker with settings {}", settings); - } + logger.trace(() -> new ParameterizedMessage("creating ChildCircuitBreaker with settings {}", settings)); this.parent = parent; } @@ -89,12 +66,13 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker { */ @Override public void circuitBreak(String fieldName, long bytesNeeded) { + final long memoryBytesLimit = this.limitAndOverhead.limit; this.trippedCount.incrementAndGet(); final String message = "[" + this.name + "] Data too large, data for [" + fieldName + "]" + " would be [" + bytesNeeded + "/" + new ByteSizeValue(bytesNeeded) + "]" + ", which is larger than the limit of [" + memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]"; - logger.debug("{}", message); + logger.debug(() -> new ParameterizedMessage("{}", message)); throw new CircuitBreakingException(message, bytesNeeded, memoryBytesLimit, durability); } @@ -108,6 +86,9 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker { */ @Override public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException { + final LimitAndOverhead limitAndOverhead = this.limitAndOverhead; + final long memoryBytesLimit = limitAndOverhead.limit; + final double overheadConstant = limitAndOverhead.overhead; // short-circuit on no data allowed, immediately throwing an exception if (memoryBytesLimit == 0) { circuitBreak(label, bytes); @@ -117,10 +98,10 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker { // If there is no limit (-1), we can optimize a bit by using // .addAndGet() instead of looping (because we don't have to check a // limit), which makes the RamAccountingTermsEnum case faster. - if (this.memoryBytesLimit == -1) { + if (memoryBytesLimit == -1) { newUsed = noLimit(bytes, label); } else { - newUsed = limit(bytes, label); + newUsed = limit(bytes, label, overheadConstant, memoryBytesLimit); } // Additionally, we need to check that we haven't exceeded the parent's limit @@ -139,14 +120,12 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker { private long noLimit(long bytes, String label) { long newUsed; newUsed = this.used.addAndGet(bytes); - if (logger.isTraceEnabled()) { - logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]", - this.name, new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed)); - } + logger.trace(() -> new ParameterizedMessage("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]", + this.name, new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed))); return newUsed; } - private long limit(long bytes, String label) { + private long limit(long bytes, String label, double overheadConstant, long memoryBytesLimit) { long newUsed;// Otherwise, check the addition and commit the addition, looping if // there are conflicts. May result in additional logging, but it's // trace logging and shouldn't be counted on for additions. @@ -188,9 +167,7 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker { @Override public long addWithoutBreaking(long bytes) { long u = used.addAndGet(bytes); - if (logger.isTraceEnabled()) { - logger.trace("[{}] Adjusted breaker by [{}] bytes, now [{}]", this.name, bytes, u); - } + logger.trace(() -> new ParameterizedMessage("[{}] Adjusted breaker by [{}] bytes, now [{}]", this.name, bytes, u)); assert u >= 0 : "Used bytes: [" + u + "] must be >= 0"; return u; } @@ -208,7 +185,7 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker { */ @Override public long getLimit() { - return this.memoryBytesLimit; + return this.limitAndOverhead.limit; } /** @@ -216,7 +193,7 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker { */ @Override public double getOverhead() { - return this.overheadConstant; + return this.limitAndOverhead.overhead; } /** @@ -242,4 +219,20 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker { public Durability getDurability() { return this.durability; } + + @Override + public void setLimitAndOverhead(long limit, double overhead) { + this.limitAndOverhead = new LimitAndOverhead(limit, overhead); + } + + private static class LimitAndOverhead { + + private final long limit; + private final double overhead; + + LimitAndOverhead(long limit, double overhead) { + this.limit = limit; + this.overhead = overhead; + } + } } diff --git a/server/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java b/server/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java index 6cc818e03ac..218e096644f 100644 --- a/server/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java +++ b/server/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java @@ -139,4 +139,12 @@ public interface CircuitBreaker { * @return whether a tripped circuit breaker will reset itself (transient) or requires manual intervention (permanent). */ Durability getDurability(); + + /** + * sets the new limit and overhead values for the circuit breaker. + * The resulting write should be readable by other threads. + * @param limit the desired limit + * @param overhead the desired overhead constant + */ + void setLimitAndOverhead(long limit, double overhead); } diff --git a/server/src/main/java/org/elasticsearch/common/breaker/NoopCircuitBreaker.java b/server/src/main/java/org/elasticsearch/common/breaker/NoopCircuitBreaker.java index 84b9e91e4a8..8b403fcb87d 100644 --- a/server/src/main/java/org/elasticsearch/common/breaker/NoopCircuitBreaker.java +++ b/server/src/main/java/org/elasticsearch/common/breaker/NoopCircuitBreaker.java @@ -76,4 +76,7 @@ public class NoopCircuitBreaker implements CircuitBreaker { public Durability getDurability() { return Durability.PERMANENT; } + + @Override + public void setLimitAndOverhead(long limit, double overhead) { } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index e2f5e8fec36..32ab04d9d7f 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -90,6 +90,7 @@ import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.IndicesRequestCache; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.analysis.HunspellService; +import org.elasticsearch.indices.breaker.BreakerSettings; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.recovery.RecoverySettings; @@ -200,6 +201,9 @@ public final class ClusterSettings extends AbstractScopedSettings { BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, + BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, + BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING, + BreakerSettings.CIRCUIT_BREAKER_TYPE, ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING, DanglingIndicesState.AUTO_IMPORT_DANGLING_INDICES_SETTING, diff --git a/server/src/main/java/org/elasticsearch/indices/breaker/BreakerSettings.java b/server/src/main/java/org/elasticsearch/indices/breaker/BreakerSettings.java index 8fd3aa35e9c..7fc974b13a5 100644 --- a/server/src/main/java/org/elasticsearch/indices/breaker/BreakerSettings.java +++ b/server/src/main/java/org/elasticsearch/indices/breaker/BreakerSettings.java @@ -20,6 +20,8 @@ package org.elasticsearch.indices.breaker; import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; /** @@ -27,12 +29,70 @@ import org.elasticsearch.common.unit.ByteSizeValue; */ public final class BreakerSettings { + private static final String BREAKER_SETTING_PREFIX = "breaker."; + private static final String BREAKER_LIMIT_SUFFIX = "limit"; + private static final String BREAKER_OVERHEAD_SUFFIX = "overhead"; + private static final String BREAKER_TYPE_SUFFIX = "type"; + + public static final Setting.AffixSetting<ByteSizeValue> CIRCUIT_BREAKER_LIMIT_SETTING = + Setting.affixKeySetting(BREAKER_SETTING_PREFIX, + BREAKER_LIMIT_SUFFIX, + name -> Setting.memorySizeSetting(name, "100%", Setting.Property.Dynamic, Setting.Property.NodeScope)); + static String breakerLimitSettingKey(String breakerName) { + return BREAKER_SETTING_PREFIX + breakerName + "." + BREAKER_LIMIT_SUFFIX; + } + + public static final Setting.AffixSetting<Double> CIRCUIT_BREAKER_OVERHEAD_SETTING = + Setting.affixKeySetting(BREAKER_SETTING_PREFIX, + BREAKER_OVERHEAD_SUFFIX, + name -> Setting.doubleSetting(name, 2.0d, 0.0d, Setting.Property.Dynamic, Setting.Property.NodeScope)); + static String breakerOverheadSettingKey(String breakerName) { + return BREAKER_SETTING_PREFIX + breakerName + "." + BREAKER_OVERHEAD_SUFFIX; + } + + public static final Setting.AffixSetting<CircuitBreaker.Type> CIRCUIT_BREAKER_TYPE = + Setting.affixKeySetting(BREAKER_SETTING_PREFIX, + BREAKER_TYPE_SUFFIX, + name -> new Setting<>(name, + "noop", + CircuitBreaker.Type::parseValue, + (type) -> { + if (CircuitBreaker.Type.PARENT.equals(type)) { + throw new IllegalArgumentException( + "Invalid circuit breaker type [parent]. Only [memory] or [noop] are configurable" + ); + } + }, + Setting.Property.NodeScope)); + static String breakerTypeSettingKey(String breakerName) { + return BREAKER_SETTING_PREFIX + breakerName + "." + BREAKER_TYPE_SUFFIX; + } + private final String name; private final long limitBytes; private final double overhead; private final CircuitBreaker.Type type; private final CircuitBreaker.Durability durability; + public static BreakerSettings updateFromSettings(BreakerSettings defaultSettings, Settings currentSettings) { + final String breakerName = defaultSettings.name; + return new BreakerSettings(breakerName, + getOrDefault(CIRCUIT_BREAKER_LIMIT_SETTING.getConcreteSetting(breakerLimitSettingKey(breakerName)), + new ByteSizeValue(defaultSettings.limitBytes), + currentSettings).getBytes(), + getOrDefault(CIRCUIT_BREAKER_OVERHEAD_SETTING.getConcreteSetting(breakerOverheadSettingKey(breakerName)), + defaultSettings.overhead, + currentSettings), + getOrDefault(CIRCUIT_BREAKER_TYPE.getConcreteSetting(breakerTypeSettingKey(breakerName)), + defaultSettings.type, + currentSettings), + defaultSettings.durability); + } + + private static <T> T getOrDefault(Setting<T> concreteSetting, T defaultValue, Settings settings) { + return concreteSetting.exists(settings) ? concreteSetting.get(settings) : defaultValue; + } + public BreakerSettings(String name, long limitBytes, double overhead) { this(name, limitBytes, overhead, CircuitBreaker.Type.MEMORY, CircuitBreaker.Durability.PERMANENT); } @@ -69,7 +129,7 @@ public final class BreakerSettings { public String toString() { return "[" + this.name + ",type=" + this.type.toString() + - ",durability=" + this.durability.toString() + + ",durability=" + (this.durability == null ? "null" : this.durability.toString()) + ",limit=" + this.limitBytes + "/" + new ByteSizeValue(this.limitBytes) + ",overhead=" + this.overhead + "]"; } diff --git a/server/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerService.java b/server/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerService.java index 90a814b29b3..ce74e512b3a 100644 --- a/server/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerService.java +++ b/server/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerService.java @@ -34,11 +34,6 @@ public abstract class CircuitBreakerService extends AbstractLifecycleComponent { protected CircuitBreakerService() { } - /** - * Allows to register of a custom circuit breaker. - */ - public abstract void registerBreaker(BreakerSettings breakerSettings); - /** * @return the breaker that can be used to register estimates against */ diff --git a/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java b/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java index cc8d7bfe18f..86c407f8040 100644 --- a/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java +++ b/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java @@ -21,6 +21,7 @@ package org.elasticsearch.indices.breaker; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; @@ -34,12 +35,16 @@ import org.elasticsearch.common.unit.ByteSizeValue; import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import static org.elasticsearch.indices.breaker.BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING; +import static org.elasticsearch.indices.breaker.BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING; + /** * CircuitBreakerService that attempts to redistribute space between breakers * if tripped @@ -51,7 +56,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { private static final MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean(); - private final ConcurrentMap<String, CircuitBreaker> breakers = new ConcurrentHashMap<>(); + private final Map<String, CircuitBreaker> breakers; public static final Setting<Boolean> USE_REAL_MEMORY_USAGE_SETTING = Setting.boolSetting("indices.breaker.total.use_real_memory", true, Property.NodeScope); @@ -95,120 +100,93 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { private final boolean trackRealMemoryUsage; private volatile BreakerSettings parentSettings; - private volatile BreakerSettings fielddataSettings; - private volatile BreakerSettings inFlightRequestsSettings; - private volatile BreakerSettings requestSettings; - private volatile BreakerSettings accountingSettings; // Tripped count for when redistribution was attempted but wasn't successful private final AtomicLong parentTripCount = new AtomicLong(0); - public HierarchyCircuitBreakerService(Settings settings, ClusterSettings clusterSettings) { + public HierarchyCircuitBreakerService(Settings settings, List<BreakerSettings> customBreakers, ClusterSettings clusterSettings) { super(); - this.fielddataSettings = new BreakerSettings(CircuitBreaker.FIELDDATA, + HashMap<String, CircuitBreaker> childCircuitBreakers = new HashMap<>(); + childCircuitBreakers.put(CircuitBreaker.FIELDDATA, validateAndCreateBreaker( + new BreakerSettings(CircuitBreaker.FIELDDATA, FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(), FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings), FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING.get(settings), CircuitBreaker.Durability.PERMANENT - ); - - this.inFlightRequestsSettings = new BreakerSettings(CircuitBreaker.IN_FLIGHT_REQUESTS, + ))); + childCircuitBreakers.put(CircuitBreaker.IN_FLIGHT_REQUESTS, validateAndCreateBreaker( + new BreakerSettings(CircuitBreaker.IN_FLIGHT_REQUESTS, IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(), IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings), IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_TYPE_SETTING.get(settings), CircuitBreaker.Durability.TRANSIENT - ); - - this.requestSettings = new BreakerSettings(CircuitBreaker.REQUEST, + ))); + childCircuitBreakers.put(CircuitBreaker.REQUEST, validateAndCreateBreaker( + new BreakerSettings(CircuitBreaker.REQUEST, REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(), REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings), REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.get(settings), CircuitBreaker.Durability.TRANSIENT - ); - - this.accountingSettings = new BreakerSettings(CircuitBreaker.ACCOUNTING, + ))); + childCircuitBreakers.put(CircuitBreaker.ACCOUNTING, validateAndCreateBreaker(new BreakerSettings(CircuitBreaker.ACCOUNTING, ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(), ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings), ACCOUNTING_CIRCUIT_BREAKER_TYPE_SETTING.get(settings), CircuitBreaker.Durability.PERMANENT - ); - + ))); + for (BreakerSettings breakerSettings : customBreakers) { + if (childCircuitBreakers.containsKey(breakerSettings.getName())) { + throw new IllegalArgumentException("More than one circuit breaker with the name [" + + breakerSettings.getName() + +"] exists. Circuit breaker names must be unique"); + } + childCircuitBreakers.put(breakerSettings.getName(), validateAndCreateBreaker(breakerSettings)); + } + this.breakers = Collections.unmodifiableMap(childCircuitBreakers); this.parentSettings = new BreakerSettings(CircuitBreaker.PARENT, TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(), 1.0, CircuitBreaker.Type.PARENT, null); - - if (logger.isTraceEnabled()) { - logger.trace("parent circuit breaker with settings {}", this.parentSettings); - } + logger.trace(() -> new ParameterizedMessage("parent circuit breaker with settings {}", this.parentSettings)); this.trackRealMemoryUsage = USE_REAL_MEMORY_USAGE_SETTING.get(settings); - registerBreaker(this.requestSettings); - registerBreaker(this.fielddataSettings); - registerBreaker(this.inFlightRequestsSettings); - registerBreaker(this.accountingSettings); - clusterSettings.addSettingsUpdateConsumer(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, this::setTotalCircuitBreakerLimit, this::validateTotalCircuitBreakerLimit); - clusterSettings.addSettingsUpdateConsumer(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, - this::setFieldDataBreakerLimit); + clusterSettings.addSettingsUpdateConsumer(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, + FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, + (limit, overhead) -> updateCircuitBreakerSettings(CircuitBreaker.FIELDDATA, limit, overhead)); clusterSettings.addSettingsUpdateConsumer(IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING, - IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setInFlightRequestsBreakerLimit); - clusterSettings.addSettingsUpdateConsumer(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, - this::setRequestBreakerLimit); - clusterSettings.addSettingsUpdateConsumer(ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING, ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING, - this::setAccountingBreakerLimit); + IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING, + (limit, overhead) -> updateCircuitBreakerSettings(CircuitBreaker.IN_FLIGHT_REQUESTS, limit, overhead)); + clusterSettings.addSettingsUpdateConsumer(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, + REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, + (limit, overhead) -> updateCircuitBreakerSettings(CircuitBreaker.REQUEST, limit, overhead)); + clusterSettings.addSettingsUpdateConsumer(ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING, + ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING, + (limit, overhead) -> updateCircuitBreakerSettings(CircuitBreaker.ACCOUNTING, limit, overhead)); + clusterSettings.addAffixUpdateConsumer(CIRCUIT_BREAKER_LIMIT_SETTING, + CIRCUIT_BREAKER_OVERHEAD_SETTING, + (name, updatedValues) -> updateCircuitBreakerSettings(name, updatedValues.v1(), updatedValues.v2()), + (s, t) -> {}); } - private void setRequestBreakerLimit(ByteSizeValue newRequestMax, Double newRequestOverhead) { - BreakerSettings newRequestSettings = new BreakerSettings(CircuitBreaker.REQUEST, newRequestMax.getBytes(), newRequestOverhead, - this.requestSettings.getType(), this.requestSettings.getDurability()); - registerBreaker(newRequestSettings); - this.requestSettings = newRequestSettings; - logger.info("Updated breaker settings request: {}", newRequestSettings); + private void updateCircuitBreakerSettings(String name, ByteSizeValue newLimit, Double newOverhead) { + CircuitBreaker childBreaker = breakers.get(name); + if (childBreaker != null) { + childBreaker.setLimitAndOverhead(newLimit.getBytes(), newOverhead); + logger.info("Updated limit {} and overhead {} for {}", newLimit.getStringRep(), newOverhead, name); + } } - private void setInFlightRequestsBreakerLimit(ByteSizeValue newInFlightRequestsMax, Double newInFlightRequestsOverhead) { - BreakerSettings newInFlightRequestsSettings = new BreakerSettings(CircuitBreaker.IN_FLIGHT_REQUESTS, - newInFlightRequestsMax.getBytes(), newInFlightRequestsOverhead, this.inFlightRequestsSettings.getType(), - this.inFlightRequestsSettings.getDurability()); - registerBreaker(newInFlightRequestsSettings); - this.inFlightRequestsSettings = newInFlightRequestsSettings; - logger.info("Updated breaker settings for in-flight requests: {}", newInFlightRequestsSettings); - } - - private void setFieldDataBreakerLimit(ByteSizeValue newFielddataMax, Double newFielddataOverhead) { - long newFielddataLimitBytes = newFielddataMax == null ? - HierarchyCircuitBreakerService.this.fielddataSettings.getLimit() : newFielddataMax.getBytes(); - newFielddataOverhead = newFielddataOverhead == null ? - HierarchyCircuitBreakerService.this.fielddataSettings.getOverhead() : newFielddataOverhead; - BreakerSettings newFielddataSettings = new BreakerSettings(CircuitBreaker.FIELDDATA, newFielddataLimitBytes, newFielddataOverhead, - this.fielddataSettings.getType(), this.fielddataSettings.getDurability()); - registerBreaker(newFielddataSettings); - HierarchyCircuitBreakerService.this.fielddataSettings = newFielddataSettings; - logger.info("Updated breaker settings field data: {}", newFielddataSettings); - } - - private void setAccountingBreakerLimit(ByteSizeValue newAccountingMax, Double newAccountingOverhead) { - BreakerSettings newAccountingSettings = new BreakerSettings(CircuitBreaker.ACCOUNTING, newAccountingMax.getBytes(), - newAccountingOverhead, HierarchyCircuitBreakerService.this.accountingSettings.getType(), - this.accountingSettings.getDurability()); - registerBreaker(newAccountingSettings); - HierarchyCircuitBreakerService.this.accountingSettings = newAccountingSettings; - logger.info("Updated breaker settings for accounting requests: {}", newAccountingSettings); - } - - private boolean validateTotalCircuitBreakerLimit(ByteSizeValue byteSizeValue) { + private void validateTotalCircuitBreakerLimit(ByteSizeValue byteSizeValue) { BreakerSettings newParentSettings = new BreakerSettings(CircuitBreaker.PARENT, byteSizeValue.getBytes(), 1.0, CircuitBreaker.Type.PARENT, null); validateSettings(new BreakerSettings[]{newParentSettings}); - return true; } private void setTotalCircuitBreakerLimit(ByteSizeValue byteSizeValue) { - BreakerSettings newParentSettings = new BreakerSettings(CircuitBreaker.PARENT, byteSizeValue.getBytes(), 1.0, + this.parentSettings = new BreakerSettings(CircuitBreaker.PARENT, byteSizeValue.getBytes(), 1.0, CircuitBreaker.Type.PARENT, null); - this.parentSettings = newParentSettings; } /** @@ -243,7 +221,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { // Manually add the parent breaker settings since they aren't part of the breaker map allStats.add(new CircuitBreakerStats(CircuitBreaker.PARENT, parentSettings.getLimit(), memoryUsed(0L).totalUsage, 1.0, parentTripCount.get())); - return new AllCircuitBreakerStats(allStats.toArray(new CircuitBreakerStats[allStats.size()])); + return new AllCircuitBreakerStats(allStats.toArray(new CircuitBreakerStats[0])); } @Override @@ -331,56 +309,29 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { message.append("]"); } message.append(", usages ["); - message.append(String.join(", ", - this.breakers.entrySet().stream().map(e -> { + message.append(this.breakers.entrySet().stream().map(e -> { final CircuitBreaker breaker = e.getValue(); final long breakerUsed = (long)(breaker.getUsed() * breaker.getOverhead()); return e.getKey() + "=" + breakerUsed + "/" + new ByteSizeValue(breakerUsed); - }) - .collect(Collectors.toList()))); + }).collect(Collectors.joining(", "))); message.append("]"); // derive durability of a tripped parent breaker depending on whether the majority of memory tracked by // child circuit breakers is categorized as transient or permanent. CircuitBreaker.Durability durability = memoryUsed.transientChildUsage >= memoryUsed.permanentChildUsage ? CircuitBreaker.Durability.TRANSIENT : CircuitBreaker.Durability.PERMANENT; - logger.debug("{}", message); + logger.debug(() -> new ParameterizedMessage("{}", message.toString())); throw new CircuitBreakingException(message.toString(), memoryUsed.totalUsage, parentLimit, durability); } } - /** - * Allows to register a custom circuit breaker. - * Warning: Will overwrite any existing custom breaker with the same name. - */ - @Override - public void registerBreaker(BreakerSettings breakerSettings) { + private CircuitBreaker validateAndCreateBreaker(BreakerSettings breakerSettings) { // Validate the settings validateSettings(new BreakerSettings[] {breakerSettings}); - - if (breakerSettings.getType() == CircuitBreaker.Type.NOOP) { - CircuitBreaker breaker = new NoopCircuitBreaker(breakerSettings.getName()); - breakers.put(breakerSettings.getName(), breaker); - } else { - CircuitBreaker oldBreaker; - CircuitBreaker breaker = new ChildMemoryCircuitBreaker(breakerSettings, - LogManager.getLogger(CHILD_LOGGER_PREFIX + breakerSettings.getName()), - this, breakerSettings.getName()); - - for (;;) { - oldBreaker = breakers.putIfAbsent(breakerSettings.getName(), breaker); - if (oldBreaker == null) { - return; - } - breaker = new ChildMemoryCircuitBreaker(breakerSettings, - (ChildMemoryCircuitBreaker)oldBreaker, - LogManager.getLogger(CHILD_LOGGER_PREFIX + breakerSettings.getName()), - this, breakerSettings.getName()); - - if (breakers.replace(breakerSettings.getName(), oldBreaker, breaker)) { - return; - } - } - } - + return breakerSettings.getType() == CircuitBreaker.Type.NOOP ? + new NoopCircuitBreaker(breakerSettings.getName()) : + new ChildMemoryCircuitBreaker(breakerSettings, + LogManager.getLogger(CHILD_LOGGER_PREFIX + breakerSettings.getName()), + this, + breakerSettings.getName()); } } diff --git a/server/src/main/java/org/elasticsearch/indices/breaker/NoneCircuitBreakerService.java b/server/src/main/java/org/elasticsearch/indices/breaker/NoneCircuitBreakerService.java index 69de18e380b..831e8ccb23f 100644 --- a/server/src/main/java/org/elasticsearch/indices/breaker/NoneCircuitBreakerService.java +++ b/server/src/main/java/org/elasticsearch/indices/breaker/NoneCircuitBreakerService.java @@ -48,8 +48,4 @@ public class NoneCircuitBreakerService extends CircuitBreakerService { return new CircuitBreakerStats(CircuitBreaker.FIELDDATA, -1, -1, 0, 0); } - @Override - public void registerBreaker(BreakerSettings breakerSettings) { - // ignore - } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 1debc06beae..f7e1ac1580c 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -110,6 +110,7 @@ import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.indices.analysis.AnalysisModule; +import org.elasticsearch.indices.breaker.BreakerSettings; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -127,6 +128,7 @@ import org.elasticsearch.persistent.PersistentTasksExecutorRegistry; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.AnalysisPlugin; +import org.elasticsearch.plugins.CircuitBreakerPlugin; import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.DiscoveryPlugin; import org.elasticsearch.plugins.EnginePlugin; @@ -400,8 +402,18 @@ public class Node implements Closeable { modules.add(indicesModule); SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class)); - CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(), + List<BreakerSettings> pluginCircuitBreakers = pluginsService.filterPlugins(CircuitBreakerPlugin.class) + .stream() + .map(plugin -> plugin.getCircuitBreaker(settings)) + .collect(Collectors.toList()); + final CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(), + pluginCircuitBreakers, settingsModule.getClusterSettings()); + pluginsService.filterPlugins(CircuitBreakerPlugin.class) + .forEach(plugin -> { + CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName()); + plugin.setCircuitBreaker(breaker); + }); resourcesToClose.add(circuitBreakerService); modules.add(new GatewayModule()); @@ -1031,10 +1043,12 @@ public class Node implements Closeable { * Creates a new {@link CircuitBreakerService} based on the settings provided. * @see #BREAKER_TYPE_KEY */ - public static CircuitBreakerService createCircuitBreakerService(Settings settings, ClusterSettings clusterSettings) { + public static CircuitBreakerService createCircuitBreakerService(Settings settings, + List<BreakerSettings> breakerSettings, + ClusterSettings clusterSettings) { String type = BREAKER_TYPE_KEY.get(settings); if (type.equals("hierarchy")) { - return new HierarchyCircuitBreakerService(settings, clusterSettings); + return new HierarchyCircuitBreakerService(settings, breakerSettings, clusterSettings); } else if (type.equals("none")) { return new NoneCircuitBreakerService(); } else { diff --git a/server/src/main/java/org/elasticsearch/plugins/CircuitBreakerPlugin.java b/server/src/main/java/org/elasticsearch/plugins/CircuitBreakerPlugin.java new file mode 100644 index 00000000000..d7682a4667e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/plugins/CircuitBreakerPlugin.java @@ -0,0 +1,59 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugins; + +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.breaker.BreakerSettings; +import org.elasticsearch.indices.breaker.CircuitBreakerService; + + +/** + * An extension point for {@link Plugin} implementations to add custom circuit breakers + */ +public interface CircuitBreakerPlugin { + + /** + * Each of the factory functions are passed to the configured {@link CircuitBreakerService}. + * + * The service then constructs a {@link CircuitBreaker} given the resulting {@link BreakerSettings}. + * + * Custom circuit breakers settings can be found in {@link BreakerSettings}. + * See: + * - limit (example: `breaker.foo.limit`) {@link BreakerSettings#CIRCUIT_BREAKER_LIMIT_SETTING} + * - overhead (example: `breaker.foo.overhead`) {@link BreakerSettings#CIRCUIT_BREAKER_OVERHEAD_SETTING} + * - type (example: `breaker.foo.type`) {@link BreakerSettings#CIRCUIT_BREAKER_TYPE} + * + * The `limit` and `overhead` settings will be dynamically updated in the circuit breaker service iff a {@link BreakerSettings} + * object with the same name is provided at node startup. + */ + BreakerSettings getCircuitBreaker(Settings settings); + + /** + * The passed {@link CircuitBreaker} object is the same one that was constructed by the {@link BreakerSettings} + * provided by {@link CircuitBreakerPlugin#getCircuitBreaker(Settings)}. + * + * This reference should never change throughout the lifetime of the node. + * + * @param circuitBreaker The constructed {@link CircuitBreaker} object from the {@link BreakerSettings} + * provided by {@link CircuitBreakerPlugin#getCircuitBreaker(Settings)} + */ + void setCircuitBreaker(CircuitBreaker circuitBreaker); +} diff --git a/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java b/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java index d671ad3dfad..15276f1ee4c 100644 --- a/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java @@ -33,6 +33,7 @@ import org.junit.Before; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.function.Function; @@ -357,6 +358,7 @@ public class BigArraysTests extends ESTestCase { .put(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), maxSize, ByteSizeUnit.BYTES) .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false) .build(), + Collections.emptyList(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); BigArrays bigArrays = new BigArrays(null, hcbs, CircuitBreaker.REQUEST).withCircuitBreaking(); Method create = BigArrays.class.getMethod("new" + type + "Array", long.class); @@ -421,6 +423,7 @@ public class BigArraysTests extends ESTestCase { .put(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), maxSize, ByteSizeUnit.BYTES) .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false) .build(), + Collections.emptyList(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); BigArrays bigArrays = new BigArrays(null, hcbs, CircuitBreaker.REQUEST); return (withBreaking ? bigArrays.withCircuitBreaking() : bigArrays); diff --git a/server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java b/server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java index 610cf4c0221..298dcf098b0 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java @@ -89,7 +89,7 @@ public class ZenFaultDetectionTests extends ESTestCase { .build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); threadPool = new TestThreadPool(getClass().getName()); - circuitBreakerService = new HierarchyCircuitBreakerService(settings, clusterSettings); + circuitBreakerService = new HierarchyCircuitBreakerService(settings, Collections.emptyList(), clusterSettings); settingsA = Settings.builder().put("node.name", "TS_A").put(settings).build(); serviceA = build(settingsA, version0); nodeA = serviceA.getLocalDiscoNode(); diff --git a/server/src/test/java/org/elasticsearch/indices/breaker/BreakerSettingsTests.java b/server/src/test/java/org/elasticsearch/indices/breaker/BreakerSettingsTests.java new file mode 100644 index 00000000000..d64b645a12c --- /dev/null +++ b/server/src/test/java/org/elasticsearch/indices/breaker/BreakerSettingsTests.java @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.breaker; + +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class BreakerSettingsTests extends ESTestCase { + + public void testFromSettings() { + Settings clusterSettings = Settings.builder() + .put(BreakerSettings.breakerLimitSettingKey("foo"), "100b") + .put(BreakerSettings.breakerLimitSettingKey("bar"), "150b") + .put(BreakerSettings.breakerOverheadSettingKey("bar"), 2.5) + .put(BreakerSettings.breakerTypeSettingKey("bar"), CircuitBreaker.Type.MEMORY) + .build(); + + BreakerSettings breakerFoo = BreakerSettings.updateFromSettings(new BreakerSettings("foo", + 10L, + 1.2d, + CircuitBreaker.Type.NOOP, + CircuitBreaker.Durability.TRANSIENT), + clusterSettings); + assertThat(breakerFoo.getDurability(), equalTo(CircuitBreaker.Durability.TRANSIENT)); + assertThat(breakerFoo.getLimit(), equalTo(100L)); + assertThat(breakerFoo.getOverhead(), equalTo(1.2)); + assertThat(breakerFoo.getType(), equalTo(CircuitBreaker.Type.NOOP)); + + BreakerSettings breakerBar = BreakerSettings.updateFromSettings(new BreakerSettings("bar", + 5L, + 0.5d, + CircuitBreaker.Type.NOOP, + CircuitBreaker.Durability.PERMANENT), + clusterSettings); + assertThat(breakerBar.getDurability(), equalTo(CircuitBreaker.Durability.PERMANENT)); + assertThat(breakerBar.getLimit(), equalTo(150L)); + assertThat(breakerBar.getOverhead(), equalTo(2.5)); + assertThat(breakerBar.getType(), equalTo(CircuitBreaker.Type.MEMORY)); + } +} diff --git a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java index 21e9ad59e32..388f203ad71 100644 --- a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java @@ -30,16 +30,22 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.test.ESTestCase; +import java.util.Arrays; +import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; public class HierarchyCircuitBreakerServiceTests extends ESTestCase { + public void testThreadedUpdatesToChildBreaker() throws Exception { final int NUM_THREADS = scaledRandomIntBetween(3, 15); final int BYTES_PER_THREAD = scaledRandomIntBetween(500, 4500); @@ -49,6 +55,7 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase { final AtomicReference<ChildMemoryCircuitBreaker> breakerRef = new AtomicReference<>(null); final CircuitBreakerService service = new HierarchyCircuitBreakerService(Settings.EMPTY, + Collections.emptyList(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) { @Override @@ -107,6 +114,7 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase { final AtomicInteger parentTripped = new AtomicInteger(0); final AtomicReference<ChildMemoryCircuitBreaker> breakerRef = new AtomicReference<>(null); final CircuitBreakerService service = new HierarchyCircuitBreakerService(Settings.EMPTY, + Collections.emptyList(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) { @Override @@ -166,12 +174,11 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase { assertThat("total breaker was tripped at least once", tripped.get(), greaterThanOrEqualTo(1)); } - /** * Test that a breaker correctly redistributes to a different breaker, in * this case, the request breaker borrows space from the fielddata breaker */ - public void testBorrowingSiblingBreakerMemory() throws Exception { + public void testBorrowingSiblingBreakerMemory() { Settings clusterSettings = Settings.builder() .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false) .put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "200mb") @@ -179,6 +186,7 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase { .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "150mb") .build(); try (CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings, + Collections.emptyList(), new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) { CircuitBreaker requestCircuitBreaker = service.getBreaker(CircuitBreaker.REQUEST); CircuitBreaker fieldDataCircuitBreaker = service.getBreaker(CircuitBreaker.FIELDDATA); @@ -207,7 +215,7 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase { } } - public void testParentBreaksOnRealMemoryUsage() throws Exception { + public void testParentBreaksOnRealMemoryUsage() { Settings clusterSettings = Settings.builder() .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), Boolean.TRUE) .put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "200b") @@ -217,6 +225,7 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase { AtomicLong memoryUsage = new AtomicLong(); final CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings, + Collections.emptyList(), new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) { @Override long currentMemoryUsage() { @@ -269,6 +278,7 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase { .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "150mb") .build(); try (CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings, + Collections.emptyList(), new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) { CircuitBreaker requestCircuitBreaker = service.getBreaker(CircuitBreaker.REQUEST); CircuitBreaker fieldDataCircuitBreaker = service.getBreaker(CircuitBreaker.FIELDDATA); @@ -294,16 +304,17 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase { } } - public void testAllocationBucketsBreaker() throws Exception { + public void testAllocationBucketsBreaker() { Settings clusterSettings = Settings.builder() .put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "100b") .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), "false") .build(); - try (CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings, + try (HierarchyCircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings, + Collections.emptyList(), new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) { - long parentLimitBytes = ((HierarchyCircuitBreakerService) service).getParentLimit(); + long parentLimitBytes = service.getParentLimit(); assertEquals(new ByteSizeValue(100, ByteSizeUnit.BYTES).getBytes(), parentLimitBytes); CircuitBreaker breaker = service.getBreaker(CircuitBreaker.REQUEST); @@ -320,7 +331,39 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase { } } - private long mb(long size) { + public void testRegisterCustomCircuitBreakers_WithDuplicates() { + IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, + () -> new HierarchyCircuitBreakerService( + Settings.EMPTY, + Collections.singletonList(new BreakerSettings(CircuitBreaker.FIELDDATA, 100, 1.2)), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); + assertThat(iae.getMessage(), + containsString("More than one circuit breaker with the name [fielddata] exists. Circuit breaker names must be unique")); + + iae = expectThrows(IllegalArgumentException.class, + () -> new HierarchyCircuitBreakerService( + Settings.EMPTY, + Arrays.asList(new BreakerSettings("foo", 100, 1.2), new BreakerSettings("foo", 200, 0.1)), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); + assertThat(iae.getMessage(), + containsString("More than one circuit breaker with the name [foo] exists. Circuit breaker names must be unique")); + } + + public void testCustomCircuitBreakers() { + try (CircuitBreakerService service = new HierarchyCircuitBreakerService( + Settings.EMPTY, + Arrays.asList(new BreakerSettings("foo", 100, 1.2), new BreakerSettings("bar", 200, 0.1)), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) { + assertThat(service.getBreaker("foo"), is(not(nullValue()))); + assertThat(service.getBreaker("foo").getOverhead(), equalTo(1.2)); + assertThat(service.getBreaker("foo").getLimit(), equalTo(100L)); + assertThat(service.getBreaker("bar"), is(not(nullValue()))); + assertThat(service.getBreaker("bar").getOverhead(), equalTo(0.1)); + assertThat(service.getBreaker("bar").getLimit(), equalTo(200L)); + } + } + + private static long mb(long size) { return new ByteSizeValue(size, ByteSizeUnit.MB).getBytes(); } } diff --git a/server/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerUnitTests.java b/server/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerUnitTests.java index d01e32d78ac..f428607d151 100644 --- a/server/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerUnitTests.java +++ b/server/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerUnitTests.java @@ -20,17 +20,12 @@ package org.elasticsearch.indices.memory.breaker; import org.elasticsearch.common.breaker.CircuitBreaker; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.indices.breaker.BreakerSettings; -import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; /** * Unit tests for the circuit breaker @@ -65,16 +60,4 @@ public class CircuitBreakerUnitTests extends ESTestCase { } } - public void testRegisterCustomBreaker() throws Exception { - CircuitBreakerService service = new HierarchyCircuitBreakerService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, - ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); - String customName = "custom"; - BreakerSettings settings = new BreakerSettings(customName, 20, 1.0); - service.registerBreaker(settings); - - CircuitBreaker breaker = service.getBreaker(customName); - assertThat(breaker, notNullValue()); - assertThat(breaker, instanceOf(CircuitBreaker.class)); - assertThat(breaker.getName(), is(customName)); - } } diff --git a/server/src/test/java/org/elasticsearch/node/NodeTests.java b/server/src/test/java/org/elasticsearch/node/NodeTests.java index 3713dfdb52b..c04a8b7fe9a 100644 --- a/server/src/test/java/org/elasticsearch/node/NodeTests.java +++ b/server/src/test/java/org/elasticsearch/node/NodeTests.java @@ -19,9 +19,11 @@ package org.elasticsearch.node; import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.bootstrap.BootstrapCheck; import org.elasticsearch.bootstrap.BootstrapContext; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; @@ -30,6 +32,9 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine.Searcher; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.breaker.BreakerSettings; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.plugins.CircuitBreakerPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalTestCluster; @@ -50,6 +55,10 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; @LuceneTestCase.SuppressFileSystems(value = "ExtrasFS") public class NodeTests extends ESTestCase { @@ -288,4 +297,45 @@ public class NodeTests extends ESTestCase { shard.store().decRef(); assertThat(e.getMessage(), containsString("Something is leaking index readers or store references")); } + + public void testCreateWithCircuitBreakerPlugins() throws IOException { + Settings.Builder settings = baseSettings() + .put("breaker.test_breaker.limit", "50b"); + List<Class<? extends Plugin>> plugins = basePlugins(); + plugins.add(MockCircuitBreakerPlugin.class); + try (Node node = new MockNode(settings.build(), plugins)) { + CircuitBreakerService service = node.injector().getInstance(CircuitBreakerService.class); + assertThat(service.getBreaker("test_breaker"), is(not(nullValue()))); + assertThat(service.getBreaker("test_breaker").getLimit(), equalTo(50L)); + CircuitBreakerPlugin breakerPlugin = node.getPluginsService().filterPlugins(CircuitBreakerPlugin.class).get(0); + assertTrue(breakerPlugin instanceof MockCircuitBreakerPlugin); + assertSame("plugin circuit breaker instance is not the same as breaker service's instance", + ((MockCircuitBreakerPlugin)breakerPlugin).myCircuitBreaker.get(), + service.getBreaker("test_breaker")); + } + } + + public static class MockCircuitBreakerPlugin extends Plugin implements CircuitBreakerPlugin { + + private SetOnce<CircuitBreaker> myCircuitBreaker = new SetOnce<>(); + + public MockCircuitBreakerPlugin() {} + + @Override + public BreakerSettings getCircuitBreaker(Settings settings) { + return BreakerSettings.updateFromSettings( + new BreakerSettings("test_breaker", + 100L, + 1.0d, + CircuitBreaker.Type.MEMORY, + CircuitBreaker.Durability.TRANSIENT), + settings); + } + + @Override + public void setCircuitBreaker(CircuitBreaker circuitBreaker) { + assertThat(circuitBreaker.getName(), equalTo("test_breaker")); + myCircuitBreaker.set(circuitBreaker); + } + } } diff --git a/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java b/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java index 484ba127d1d..00f883977af 100644 --- a/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java +++ b/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java @@ -84,6 +84,7 @@ public class RestControllerTests extends ESTestCase { // We want to have reproducible results in this test, hence we disable real memory usage accounting .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false) .build(), + Collections.emptyList(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); usageService = new UsageService(); // we can do this here only because we know that we don't adjust breaker settings dynamically in the test diff --git a/server/src/test/java/org/elasticsearch/rest/RestHttpResponseHeadersTests.java b/server/src/test/java/org/elasticsearch/rest/RestHttpResponseHeadersTests.java index 6a4a8749397..ce54b896ef3 100644 --- a/server/src/test/java/org/elasticsearch/rest/RestHttpResponseHeadersTests.java +++ b/server/src/test/java/org/elasticsearch/rest/RestHttpResponseHeadersTests.java @@ -84,6 +84,7 @@ public class RestHttpResponseHeadersTests extends ESTestCase { // Initialize test candidate RestController CircuitBreakerService circuitBreakerService = new HierarchyCircuitBreakerService(Settings.EMPTY, + Collections.emptyList(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); final Settings settings = Settings.EMPTY; diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 8c08d7213f0..7d1bb2a0901 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -377,7 +377,9 @@ public abstract class IndexShardTestCase extends ESTestCase { SimilarityService similarityService = new SimilarityService(indexSettings, null, Collections.emptyMap()); final Engine.Warmer warmer = createTestWarmer(indexSettings); ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - CircuitBreakerService breakerService = new HierarchyCircuitBreakerService(nodeSettings, clusterSettings); + CircuitBreakerService breakerService = new HierarchyCircuitBreakerService(nodeSettings, + Collections.emptyList(), + clusterSettings); indexShard = new IndexShard( routing, indexSettings, diff --git a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java index 90489834477..468a3846dad 100644 --- a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java +++ b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -28,6 +28,7 @@ import org.hamcrest.Matchers; import java.io.IOException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicInteger; @@ -147,6 +148,7 @@ public class FrozenEngineTests extends EngineTestCase { EngineConfig config = config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, // we don't merge we want no background merges to happen to ensure we have consistent breaker stats null, listener, null, globalCheckpoint::get, new HierarchyCircuitBreakerService(defaultSettings.getSettings(), + Collections.emptyList(), new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING); final int docs; @@ -203,6 +205,7 @@ public class FrozenEngineTests extends EngineTestCase { try (Store store = createStore()) { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, null, globalCheckpoint::get, new HierarchyCircuitBreakerService(defaultSettings.getSettings(), + Collections.emptyList(), new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING); try (InternalEngine engine = createEngine(config)) { diff --git a/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/bucket/geogrid/GeoGridTilerTests.java b/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/bucket/geogrid/GeoGridTilerTests.java index f00c8e50244..49926ac4e2f 100644 --- a/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/bucket/geogrid/GeoGridTilerTests.java +++ b/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/bucket/geogrid/GeoGridTilerTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.xpack.spatial.index.fielddata.TriangleTreeReader; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.function.Consumer; import static org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils.LATITUDE_MASK; @@ -478,9 +479,9 @@ public class GeoGridTilerTests extends ESTestCase { numBytes = values.getValuesBytes(); } - CircuitBreakerService service = new HierarchyCircuitBreakerService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, - ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); - service.registerBreaker(new BreakerSettings("limited", numBytes - 1, 1.0)); + CircuitBreakerService service = new HierarchyCircuitBreakerService(Settings.EMPTY, + Collections.singletonList(new BreakerSettings("limited", numBytes - 1, 1.0)), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); CircuitBreaker limitedBreaker = service.getBreaker("limited"); Consumer<Long> circuitBreakerConsumer = (l) -> limitedBreaker.addEstimateBytesAndMaybeBreak(l, "agg");