mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-24 17:09:48 +00:00
* Add new circuitbreaker plugin and refactor CircuitBreakerService (#55695) This commit lays the ground work for plugins supplying their own circuit breakers. It adds a new interface: `CircuitBreakerPlugin`. This interface provides methods for providing custom child CircuitBreaker objects. There are also facilities for allowing dynamic settings for the custom breakers. With the refactor, circuit breakers are no longer replaced on setting changes. Instead, the two mutable settings themselves are `volatile`. Plugins that want to use their custom circuit breaker should keep a reference of their constructed breaker.
This commit is contained in:
parent
aebb78bf5c
commit
15aba60c02
@ -41,8 +41,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.indices.breaker.BreakerSettings;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerStats;
|
||||
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
@ -316,37 +314,6 @@ public class CircuitBreakerServiceIT extends ESIntegTestCase {
|
||||
}, 30, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public void testCustomCircuitBreakerRegistration() throws Exception {
|
||||
Iterable<CircuitBreakerService> serviceIter = internalCluster().getInstances(CircuitBreakerService.class);
|
||||
|
||||
final String breakerName = "customBreaker";
|
||||
BreakerSettings breakerSettings = new BreakerSettings(breakerName, 8, 1.03);
|
||||
CircuitBreaker breaker = null;
|
||||
|
||||
for (CircuitBreakerService s : serviceIter) {
|
||||
s.registerBreaker(breakerSettings);
|
||||
breaker = s.getBreaker(breakerSettings.getName());
|
||||
}
|
||||
|
||||
if (breaker != null) {
|
||||
try {
|
||||
breaker.addEstimateBytesAndMaybeBreak(16, "test");
|
||||
} catch (CircuitBreakingException e) {
|
||||
// ignore, we forced a circuit break
|
||||
}
|
||||
}
|
||||
|
||||
NodesStatsResponse stats = client().admin().cluster().prepareNodesStats().clear()
|
||||
.addMetric(BREAKER.metricName())
|
||||
.get();
|
||||
int breaks = 0;
|
||||
for (NodeStats stat : stats.getNodes()) {
|
||||
CircuitBreakerStats breakerStats = stat.getBreaker().getStats(breakerName);
|
||||
breaks += breakerStats.getTrippedCount();
|
||||
}
|
||||
assertThat(breaks, greaterThanOrEqualTo(1));
|
||||
}
|
||||
|
||||
public void testCanResetUnreasonableSettings() {
|
||||
if (noopBreakerUsed()) {
|
||||
logger.info("--> noop breakers used, skipping test");
|
||||
@ -367,7 +334,7 @@ public class CircuitBreakerServiceIT extends ESIntegTestCase {
|
||||
|
||||
}
|
||||
|
||||
public void testLimitsRequestSize() throws Exception {
|
||||
public void testLimitsRequestSize() {
|
||||
ByteSizeValue inFlightRequestsLimit = new ByteSizeValue(8, ByteSizeUnit.KB);
|
||||
if (noopBreakerUsed()) {
|
||||
logger.info("--> noop breakers used, skipping test");
|
||||
|
@ -191,6 +191,7 @@ public abstract class TransportClient extends AbstractClient {
|
||||
modules.add(actionModule);
|
||||
|
||||
CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
|
||||
Collections.emptyList(),
|
||||
settingsModule.getClusterSettings());
|
||||
resourcesToClose.add(circuitBreakerService);
|
||||
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
|
||||
|
@ -20,6 +20,7 @@
|
||||
package org.elasticsearch.common.breaker;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.indices.breaker.BreakerSettings;
|
||||
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
||||
@ -31,8 +32,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
*/
|
||||
public class ChildMemoryCircuitBreaker implements CircuitBreaker {
|
||||
|
||||
private final long memoryBytesLimit;
|
||||
private final double overheadConstant;
|
||||
private volatile LimitAndOverhead limitAndOverhead;
|
||||
private final Durability durability;
|
||||
private final AtomicLong used;
|
||||
private final AtomicLong trippedCount;
|
||||
@ -40,19 +40,6 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker {
|
||||
private final HierarchyCircuitBreakerService parent;
|
||||
private final String name;
|
||||
|
||||
/**
|
||||
* Create a circuit breaker that will break if the number of estimated
|
||||
* bytes grows above the limit. All estimations will be multiplied by
|
||||
* the given overheadConstant. This breaker starts with 0 bytes used.
|
||||
* @param settings settings to configure this breaker
|
||||
* @param parent parent circuit breaker service to delegate tripped breakers to
|
||||
* @param name the name of the breaker
|
||||
*/
|
||||
public ChildMemoryCircuitBreaker(BreakerSettings settings, Logger logger,
|
||||
HierarchyCircuitBreakerService parent, String name) {
|
||||
this(settings, null, logger, parent, name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a circuit breaker that will break if the number of estimated
|
||||
* bytes grows above the limit. All estimations will be multiplied by
|
||||
@ -61,25 +48,15 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker {
|
||||
* @param settings settings to configure this breaker
|
||||
* @param parent parent circuit breaker service to delegate tripped breakers to
|
||||
* @param name the name of the breaker
|
||||
* @param oldBreaker the previous circuit breaker to inherit the used value from (starting offset)
|
||||
*/
|
||||
public ChildMemoryCircuitBreaker(BreakerSettings settings, ChildMemoryCircuitBreaker oldBreaker,
|
||||
Logger logger, HierarchyCircuitBreakerService parent, String name) {
|
||||
public ChildMemoryCircuitBreaker(BreakerSettings settings, Logger logger, HierarchyCircuitBreakerService parent, String name) {
|
||||
this.name = name;
|
||||
this.memoryBytesLimit = settings.getLimit();
|
||||
this.overheadConstant = settings.getOverhead();
|
||||
this.limitAndOverhead = new LimitAndOverhead(settings.getLimit(), settings.getOverhead());
|
||||
this.durability = settings.getDurability();
|
||||
if (oldBreaker == null) {
|
||||
this.used = new AtomicLong(0);
|
||||
this.trippedCount = new AtomicLong(0);
|
||||
} else {
|
||||
this.used = oldBreaker.used;
|
||||
this.trippedCount = oldBreaker.trippedCount;
|
||||
}
|
||||
this.used = new AtomicLong(0);
|
||||
this.trippedCount = new AtomicLong(0);
|
||||
this.logger = logger;
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("creating ChildCircuitBreaker with settings {}", settings);
|
||||
}
|
||||
logger.trace(() -> new ParameterizedMessage("creating ChildCircuitBreaker with settings {}", settings));
|
||||
this.parent = parent;
|
||||
}
|
||||
|
||||
@ -89,12 +66,13 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker {
|
||||
*/
|
||||
@Override
|
||||
public void circuitBreak(String fieldName, long bytesNeeded) {
|
||||
final long memoryBytesLimit = this.limitAndOverhead.limit;
|
||||
this.trippedCount.incrementAndGet();
|
||||
final String message = "[" + this.name + "] Data too large, data for [" + fieldName + "]" +
|
||||
" would be [" + bytesNeeded + "/" + new ByteSizeValue(bytesNeeded) + "]" +
|
||||
", which is larger than the limit of [" +
|
||||
memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]";
|
||||
logger.debug("{}", message);
|
||||
logger.debug(() -> new ParameterizedMessage("{}", message));
|
||||
throw new CircuitBreakingException(message, bytesNeeded, memoryBytesLimit, durability);
|
||||
}
|
||||
|
||||
@ -108,6 +86,9 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker {
|
||||
*/
|
||||
@Override
|
||||
public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
|
||||
final LimitAndOverhead limitAndOverhead = this.limitAndOverhead;
|
||||
final long memoryBytesLimit = limitAndOverhead.limit;
|
||||
final double overheadConstant = limitAndOverhead.overhead;
|
||||
// short-circuit on no data allowed, immediately throwing an exception
|
||||
if (memoryBytesLimit == 0) {
|
||||
circuitBreak(label, bytes);
|
||||
@ -117,10 +98,10 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker {
|
||||
// If there is no limit (-1), we can optimize a bit by using
|
||||
// .addAndGet() instead of looping (because we don't have to check a
|
||||
// limit), which makes the RamAccountingTermsEnum case faster.
|
||||
if (this.memoryBytesLimit == -1) {
|
||||
if (memoryBytesLimit == -1) {
|
||||
newUsed = noLimit(bytes, label);
|
||||
} else {
|
||||
newUsed = limit(bytes, label);
|
||||
newUsed = limit(bytes, label, overheadConstant, memoryBytesLimit);
|
||||
}
|
||||
|
||||
// Additionally, we need to check that we haven't exceeded the parent's limit
|
||||
@ -139,14 +120,12 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker {
|
||||
private long noLimit(long bytes, String label) {
|
||||
long newUsed;
|
||||
newUsed = this.used.addAndGet(bytes);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]",
|
||||
this.name, new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed));
|
||||
}
|
||||
logger.trace(() -> new ParameterizedMessage("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]",
|
||||
this.name, new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed)));
|
||||
return newUsed;
|
||||
}
|
||||
|
||||
private long limit(long bytes, String label) {
|
||||
private long limit(long bytes, String label, double overheadConstant, long memoryBytesLimit) {
|
||||
long newUsed;// Otherwise, check the addition and commit the addition, looping if
|
||||
// there are conflicts. May result in additional logging, but it's
|
||||
// trace logging and shouldn't be counted on for additions.
|
||||
@ -188,9 +167,7 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker {
|
||||
@Override
|
||||
public long addWithoutBreaking(long bytes) {
|
||||
long u = used.addAndGet(bytes);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[{}] Adjusted breaker by [{}] bytes, now [{}]", this.name, bytes, u);
|
||||
}
|
||||
logger.trace(() -> new ParameterizedMessage("[{}] Adjusted breaker by [{}] bytes, now [{}]", this.name, bytes, u));
|
||||
assert u >= 0 : "Used bytes: [" + u + "] must be >= 0";
|
||||
return u;
|
||||
}
|
||||
@ -208,7 +185,7 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker {
|
||||
*/
|
||||
@Override
|
||||
public long getLimit() {
|
||||
return this.memoryBytesLimit;
|
||||
return this.limitAndOverhead.limit;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -216,7 +193,7 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker {
|
||||
*/
|
||||
@Override
|
||||
public double getOverhead() {
|
||||
return this.overheadConstant;
|
||||
return this.limitAndOverhead.overhead;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -242,4 +219,20 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker {
|
||||
public Durability getDurability() {
|
||||
return this.durability;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLimitAndOverhead(long limit, double overhead) {
|
||||
this.limitAndOverhead = new LimitAndOverhead(limit, overhead);
|
||||
}
|
||||
|
||||
private static class LimitAndOverhead {
|
||||
|
||||
private final long limit;
|
||||
private final double overhead;
|
||||
|
||||
LimitAndOverhead(long limit, double overhead) {
|
||||
this.limit = limit;
|
||||
this.overhead = overhead;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -139,4 +139,12 @@ public interface CircuitBreaker {
|
||||
* @return whether a tripped circuit breaker will reset itself (transient) or requires manual intervention (permanent).
|
||||
*/
|
||||
Durability getDurability();
|
||||
|
||||
/**
|
||||
* sets the new limit and overhead values for the circuit breaker.
|
||||
* The resulting write should be readable by other threads.
|
||||
* @param limit the desired limit
|
||||
* @param overhead the desired overhead constant
|
||||
*/
|
||||
void setLimitAndOverhead(long limit, double overhead);
|
||||
}
|
||||
|
@ -76,4 +76,7 @@ public class NoopCircuitBreaker implements CircuitBreaker {
|
||||
public Durability getDurability() {
|
||||
return Durability.PERMANENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLimitAndOverhead(long limit, double overhead) { }
|
||||
}
|
||||
|
@ -90,6 +90,7 @@ import org.elasticsearch.indices.IndicesQueryCache;
|
||||
import org.elasticsearch.indices.IndicesRequestCache;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.analysis.HunspellService;
|
||||
import org.elasticsearch.indices.breaker.BreakerSettings;
|
||||
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
||||
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||
@ -200,6 +201,9 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||
BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING,
|
||||
BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING,
|
||||
BalancedShardsAllocator.THRESHOLD_SETTING,
|
||||
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
|
||||
BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING,
|
||||
BreakerSettings.CIRCUIT_BREAKER_TYPE,
|
||||
ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING,
|
||||
ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING,
|
||||
DanglingIndicesState.AUTO_IMPORT_DANGLING_INDICES_SETTING,
|
||||
|
@ -20,6 +20,8 @@
|
||||
package org.elasticsearch.indices.breaker;
|
||||
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
||||
/**
|
||||
@ -27,12 +29,70 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
*/
|
||||
public final class BreakerSettings {
|
||||
|
||||
private static final String BREAKER_SETTING_PREFIX = "breaker.";
|
||||
private static final String BREAKER_LIMIT_SUFFIX = "limit";
|
||||
private static final String BREAKER_OVERHEAD_SUFFIX = "overhead";
|
||||
private static final String BREAKER_TYPE_SUFFIX = "type";
|
||||
|
||||
public static final Setting.AffixSetting<ByteSizeValue> CIRCUIT_BREAKER_LIMIT_SETTING =
|
||||
Setting.affixKeySetting(BREAKER_SETTING_PREFIX,
|
||||
BREAKER_LIMIT_SUFFIX,
|
||||
name -> Setting.memorySizeSetting(name, "100%", Setting.Property.Dynamic, Setting.Property.NodeScope));
|
||||
static String breakerLimitSettingKey(String breakerName) {
|
||||
return BREAKER_SETTING_PREFIX + breakerName + "." + BREAKER_LIMIT_SUFFIX;
|
||||
}
|
||||
|
||||
public static final Setting.AffixSetting<Double> CIRCUIT_BREAKER_OVERHEAD_SETTING =
|
||||
Setting.affixKeySetting(BREAKER_SETTING_PREFIX,
|
||||
BREAKER_OVERHEAD_SUFFIX,
|
||||
name -> Setting.doubleSetting(name, 2.0d, 0.0d, Setting.Property.Dynamic, Setting.Property.NodeScope));
|
||||
static String breakerOverheadSettingKey(String breakerName) {
|
||||
return BREAKER_SETTING_PREFIX + breakerName + "." + BREAKER_OVERHEAD_SUFFIX;
|
||||
}
|
||||
|
||||
public static final Setting.AffixSetting<CircuitBreaker.Type> CIRCUIT_BREAKER_TYPE =
|
||||
Setting.affixKeySetting(BREAKER_SETTING_PREFIX,
|
||||
BREAKER_TYPE_SUFFIX,
|
||||
name -> new Setting<>(name,
|
||||
"noop",
|
||||
CircuitBreaker.Type::parseValue,
|
||||
(type) -> {
|
||||
if (CircuitBreaker.Type.PARENT.equals(type)) {
|
||||
throw new IllegalArgumentException(
|
||||
"Invalid circuit breaker type [parent]. Only [memory] or [noop] are configurable"
|
||||
);
|
||||
}
|
||||
},
|
||||
Setting.Property.NodeScope));
|
||||
static String breakerTypeSettingKey(String breakerName) {
|
||||
return BREAKER_SETTING_PREFIX + breakerName + "." + BREAKER_TYPE_SUFFIX;
|
||||
}
|
||||
|
||||
private final String name;
|
||||
private final long limitBytes;
|
||||
private final double overhead;
|
||||
private final CircuitBreaker.Type type;
|
||||
private final CircuitBreaker.Durability durability;
|
||||
|
||||
public static BreakerSettings updateFromSettings(BreakerSettings defaultSettings, Settings currentSettings) {
|
||||
final String breakerName = defaultSettings.name;
|
||||
return new BreakerSettings(breakerName,
|
||||
getOrDefault(CIRCUIT_BREAKER_LIMIT_SETTING.getConcreteSetting(breakerLimitSettingKey(breakerName)),
|
||||
new ByteSizeValue(defaultSettings.limitBytes),
|
||||
currentSettings).getBytes(),
|
||||
getOrDefault(CIRCUIT_BREAKER_OVERHEAD_SETTING.getConcreteSetting(breakerOverheadSettingKey(breakerName)),
|
||||
defaultSettings.overhead,
|
||||
currentSettings),
|
||||
getOrDefault(CIRCUIT_BREAKER_TYPE.getConcreteSetting(breakerTypeSettingKey(breakerName)),
|
||||
defaultSettings.type,
|
||||
currentSettings),
|
||||
defaultSettings.durability);
|
||||
}
|
||||
|
||||
private static <T> T getOrDefault(Setting<T> concreteSetting, T defaultValue, Settings settings) {
|
||||
return concreteSetting.exists(settings) ? concreteSetting.get(settings) : defaultValue;
|
||||
}
|
||||
|
||||
public BreakerSettings(String name, long limitBytes, double overhead) {
|
||||
this(name, limitBytes, overhead, CircuitBreaker.Type.MEMORY, CircuitBreaker.Durability.PERMANENT);
|
||||
}
|
||||
@ -69,7 +129,7 @@ public final class BreakerSettings {
|
||||
public String toString() {
|
||||
return "[" + this.name +
|
||||
",type=" + this.type.toString() +
|
||||
",durability=" + this.durability.toString() +
|
||||
",durability=" + (this.durability == null ? "null" : this.durability.toString()) +
|
||||
",limit=" + this.limitBytes + "/" + new ByteSizeValue(this.limitBytes) +
|
||||
",overhead=" + this.overhead + "]";
|
||||
}
|
||||
|
@ -34,11 +34,6 @@ public abstract class CircuitBreakerService extends AbstractLifecycleComponent {
|
||||
protected CircuitBreakerService() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows to register of a custom circuit breaker.
|
||||
*/
|
||||
public abstract void registerBreaker(BreakerSettings breakerSettings);
|
||||
|
||||
/**
|
||||
* @return the breaker that can be used to register estimates against
|
||||
*/
|
||||
|
@ -21,6 +21,7 @@ package org.elasticsearch.indices.breaker;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.breaker.CircuitBreakingException;
|
||||
@ -34,12 +35,16 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.MemoryMXBean;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.indices.breaker.BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING;
|
||||
import static org.elasticsearch.indices.breaker.BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING;
|
||||
|
||||
/**
|
||||
* CircuitBreakerService that attempts to redistribute space between breakers
|
||||
* if tripped
|
||||
@ -51,7 +56,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
|
||||
|
||||
private static final MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean();
|
||||
|
||||
private final ConcurrentMap<String, CircuitBreaker> breakers = new ConcurrentHashMap<>();
|
||||
private final Map<String, CircuitBreaker> breakers;
|
||||
|
||||
public static final Setting<Boolean> USE_REAL_MEMORY_USAGE_SETTING =
|
||||
Setting.boolSetting("indices.breaker.total.use_real_memory", true, Property.NodeScope);
|
||||
@ -95,120 +100,93 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
|
||||
|
||||
private final boolean trackRealMemoryUsage;
|
||||
private volatile BreakerSettings parentSettings;
|
||||
private volatile BreakerSettings fielddataSettings;
|
||||
private volatile BreakerSettings inFlightRequestsSettings;
|
||||
private volatile BreakerSettings requestSettings;
|
||||
private volatile BreakerSettings accountingSettings;
|
||||
|
||||
// Tripped count for when redistribution was attempted but wasn't successful
|
||||
private final AtomicLong parentTripCount = new AtomicLong(0);
|
||||
|
||||
public HierarchyCircuitBreakerService(Settings settings, ClusterSettings clusterSettings) {
|
||||
public HierarchyCircuitBreakerService(Settings settings, List<BreakerSettings> customBreakers, ClusterSettings clusterSettings) {
|
||||
super();
|
||||
this.fielddataSettings = new BreakerSettings(CircuitBreaker.FIELDDATA,
|
||||
HashMap<String, CircuitBreaker> childCircuitBreakers = new HashMap<>();
|
||||
childCircuitBreakers.put(CircuitBreaker.FIELDDATA, validateAndCreateBreaker(
|
||||
new BreakerSettings(CircuitBreaker.FIELDDATA,
|
||||
FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(),
|
||||
FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings),
|
||||
FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING.get(settings),
|
||||
CircuitBreaker.Durability.PERMANENT
|
||||
);
|
||||
|
||||
this.inFlightRequestsSettings = new BreakerSettings(CircuitBreaker.IN_FLIGHT_REQUESTS,
|
||||
)));
|
||||
childCircuitBreakers.put(CircuitBreaker.IN_FLIGHT_REQUESTS, validateAndCreateBreaker(
|
||||
new BreakerSettings(CircuitBreaker.IN_FLIGHT_REQUESTS,
|
||||
IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(),
|
||||
IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings),
|
||||
IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_TYPE_SETTING.get(settings),
|
||||
CircuitBreaker.Durability.TRANSIENT
|
||||
);
|
||||
|
||||
this.requestSettings = new BreakerSettings(CircuitBreaker.REQUEST,
|
||||
)));
|
||||
childCircuitBreakers.put(CircuitBreaker.REQUEST, validateAndCreateBreaker(
|
||||
new BreakerSettings(CircuitBreaker.REQUEST,
|
||||
REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(),
|
||||
REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings),
|
||||
REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.get(settings),
|
||||
CircuitBreaker.Durability.TRANSIENT
|
||||
);
|
||||
|
||||
this.accountingSettings = new BreakerSettings(CircuitBreaker.ACCOUNTING,
|
||||
)));
|
||||
childCircuitBreakers.put(CircuitBreaker.ACCOUNTING, validateAndCreateBreaker(new BreakerSettings(CircuitBreaker.ACCOUNTING,
|
||||
ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(),
|
||||
ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings),
|
||||
ACCOUNTING_CIRCUIT_BREAKER_TYPE_SETTING.get(settings),
|
||||
CircuitBreaker.Durability.PERMANENT
|
||||
);
|
||||
|
||||
)));
|
||||
for (BreakerSettings breakerSettings : customBreakers) {
|
||||
if (childCircuitBreakers.containsKey(breakerSettings.getName())) {
|
||||
throw new IllegalArgumentException("More than one circuit breaker with the name ["
|
||||
+ breakerSettings.getName()
|
||||
+"] exists. Circuit breaker names must be unique");
|
||||
}
|
||||
childCircuitBreakers.put(breakerSettings.getName(), validateAndCreateBreaker(breakerSettings));
|
||||
}
|
||||
this.breakers = Collections.unmodifiableMap(childCircuitBreakers);
|
||||
this.parentSettings = new BreakerSettings(CircuitBreaker.PARENT,
|
||||
TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(), 1.0,
|
||||
CircuitBreaker.Type.PARENT, null);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("parent circuit breaker with settings {}", this.parentSettings);
|
||||
}
|
||||
logger.trace(() -> new ParameterizedMessage("parent circuit breaker with settings {}", this.parentSettings));
|
||||
|
||||
this.trackRealMemoryUsage = USE_REAL_MEMORY_USAGE_SETTING.get(settings);
|
||||
|
||||
registerBreaker(this.requestSettings);
|
||||
registerBreaker(this.fielddataSettings);
|
||||
registerBreaker(this.inFlightRequestsSettings);
|
||||
registerBreaker(this.accountingSettings);
|
||||
|
||||
clusterSettings.addSettingsUpdateConsumer(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, this::setTotalCircuitBreakerLimit,
|
||||
this::validateTotalCircuitBreakerLimit);
|
||||
clusterSettings.addSettingsUpdateConsumer(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING,
|
||||
this::setFieldDataBreakerLimit);
|
||||
clusterSettings.addSettingsUpdateConsumer(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING,
|
||||
FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING,
|
||||
(limit, overhead) -> updateCircuitBreakerSettings(CircuitBreaker.FIELDDATA, limit, overhead));
|
||||
clusterSettings.addSettingsUpdateConsumer(IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING,
|
||||
IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setInFlightRequestsBreakerLimit);
|
||||
clusterSettings.addSettingsUpdateConsumer(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING,
|
||||
this::setRequestBreakerLimit);
|
||||
clusterSettings.addSettingsUpdateConsumer(ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING, ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING,
|
||||
this::setAccountingBreakerLimit);
|
||||
IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING,
|
||||
(limit, overhead) -> updateCircuitBreakerSettings(CircuitBreaker.IN_FLIGHT_REQUESTS, limit, overhead));
|
||||
clusterSettings.addSettingsUpdateConsumer(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING,
|
||||
REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING,
|
||||
(limit, overhead) -> updateCircuitBreakerSettings(CircuitBreaker.REQUEST, limit, overhead));
|
||||
clusterSettings.addSettingsUpdateConsumer(ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING,
|
||||
ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING,
|
||||
(limit, overhead) -> updateCircuitBreakerSettings(CircuitBreaker.ACCOUNTING, limit, overhead));
|
||||
clusterSettings.addAffixUpdateConsumer(CIRCUIT_BREAKER_LIMIT_SETTING,
|
||||
CIRCUIT_BREAKER_OVERHEAD_SETTING,
|
||||
(name, updatedValues) -> updateCircuitBreakerSettings(name, updatedValues.v1(), updatedValues.v2()),
|
||||
(s, t) -> {});
|
||||
}
|
||||
|
||||
private void setRequestBreakerLimit(ByteSizeValue newRequestMax, Double newRequestOverhead) {
|
||||
BreakerSettings newRequestSettings = new BreakerSettings(CircuitBreaker.REQUEST, newRequestMax.getBytes(), newRequestOverhead,
|
||||
this.requestSettings.getType(), this.requestSettings.getDurability());
|
||||
registerBreaker(newRequestSettings);
|
||||
this.requestSettings = newRequestSettings;
|
||||
logger.info("Updated breaker settings request: {}", newRequestSettings);
|
||||
private void updateCircuitBreakerSettings(String name, ByteSizeValue newLimit, Double newOverhead) {
|
||||
CircuitBreaker childBreaker = breakers.get(name);
|
||||
if (childBreaker != null) {
|
||||
childBreaker.setLimitAndOverhead(newLimit.getBytes(), newOverhead);
|
||||
logger.info("Updated limit {} and overhead {} for {}", newLimit.getStringRep(), newOverhead, name);
|
||||
}
|
||||
}
|
||||
|
||||
private void setInFlightRequestsBreakerLimit(ByteSizeValue newInFlightRequestsMax, Double newInFlightRequestsOverhead) {
|
||||
BreakerSettings newInFlightRequestsSettings = new BreakerSettings(CircuitBreaker.IN_FLIGHT_REQUESTS,
|
||||
newInFlightRequestsMax.getBytes(), newInFlightRequestsOverhead, this.inFlightRequestsSettings.getType(),
|
||||
this.inFlightRequestsSettings.getDurability());
|
||||
registerBreaker(newInFlightRequestsSettings);
|
||||
this.inFlightRequestsSettings = newInFlightRequestsSettings;
|
||||
logger.info("Updated breaker settings for in-flight requests: {}", newInFlightRequestsSettings);
|
||||
}
|
||||
|
||||
private void setFieldDataBreakerLimit(ByteSizeValue newFielddataMax, Double newFielddataOverhead) {
|
||||
long newFielddataLimitBytes = newFielddataMax == null ?
|
||||
HierarchyCircuitBreakerService.this.fielddataSettings.getLimit() : newFielddataMax.getBytes();
|
||||
newFielddataOverhead = newFielddataOverhead == null ?
|
||||
HierarchyCircuitBreakerService.this.fielddataSettings.getOverhead() : newFielddataOverhead;
|
||||
BreakerSettings newFielddataSettings = new BreakerSettings(CircuitBreaker.FIELDDATA, newFielddataLimitBytes, newFielddataOverhead,
|
||||
this.fielddataSettings.getType(), this.fielddataSettings.getDurability());
|
||||
registerBreaker(newFielddataSettings);
|
||||
HierarchyCircuitBreakerService.this.fielddataSettings = newFielddataSettings;
|
||||
logger.info("Updated breaker settings field data: {}", newFielddataSettings);
|
||||
}
|
||||
|
||||
private void setAccountingBreakerLimit(ByteSizeValue newAccountingMax, Double newAccountingOverhead) {
|
||||
BreakerSettings newAccountingSettings = new BreakerSettings(CircuitBreaker.ACCOUNTING, newAccountingMax.getBytes(),
|
||||
newAccountingOverhead, HierarchyCircuitBreakerService.this.accountingSettings.getType(),
|
||||
this.accountingSettings.getDurability());
|
||||
registerBreaker(newAccountingSettings);
|
||||
HierarchyCircuitBreakerService.this.accountingSettings = newAccountingSettings;
|
||||
logger.info("Updated breaker settings for accounting requests: {}", newAccountingSettings);
|
||||
}
|
||||
|
||||
private boolean validateTotalCircuitBreakerLimit(ByteSizeValue byteSizeValue) {
|
||||
private void validateTotalCircuitBreakerLimit(ByteSizeValue byteSizeValue) {
|
||||
BreakerSettings newParentSettings = new BreakerSettings(CircuitBreaker.PARENT, byteSizeValue.getBytes(), 1.0,
|
||||
CircuitBreaker.Type.PARENT, null);
|
||||
validateSettings(new BreakerSettings[]{newParentSettings});
|
||||
return true;
|
||||
}
|
||||
|
||||
private void setTotalCircuitBreakerLimit(ByteSizeValue byteSizeValue) {
|
||||
BreakerSettings newParentSettings = new BreakerSettings(CircuitBreaker.PARENT, byteSizeValue.getBytes(), 1.0,
|
||||
this.parentSettings = new BreakerSettings(CircuitBreaker.PARENT, byteSizeValue.getBytes(), 1.0,
|
||||
CircuitBreaker.Type.PARENT, null);
|
||||
this.parentSettings = newParentSettings;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -243,7 +221,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
|
||||
// Manually add the parent breaker settings since they aren't part of the breaker map
|
||||
allStats.add(new CircuitBreakerStats(CircuitBreaker.PARENT, parentSettings.getLimit(),
|
||||
memoryUsed(0L).totalUsage, 1.0, parentTripCount.get()));
|
||||
return new AllCircuitBreakerStats(allStats.toArray(new CircuitBreakerStats[allStats.size()]));
|
||||
return new AllCircuitBreakerStats(allStats.toArray(new CircuitBreakerStats[0]));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -331,56 +309,29 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
|
||||
message.append("]");
|
||||
}
|
||||
message.append(", usages [");
|
||||
message.append(String.join(", ",
|
||||
this.breakers.entrySet().stream().map(e -> {
|
||||
message.append(this.breakers.entrySet().stream().map(e -> {
|
||||
final CircuitBreaker breaker = e.getValue();
|
||||
final long breakerUsed = (long)(breaker.getUsed() * breaker.getOverhead());
|
||||
return e.getKey() + "=" + breakerUsed + "/" + new ByteSizeValue(breakerUsed);
|
||||
})
|
||||
.collect(Collectors.toList())));
|
||||
}).collect(Collectors.joining(", ")));
|
||||
message.append("]");
|
||||
// derive durability of a tripped parent breaker depending on whether the majority of memory tracked by
|
||||
// child circuit breakers is categorized as transient or permanent.
|
||||
CircuitBreaker.Durability durability = memoryUsed.transientChildUsage >= memoryUsed.permanentChildUsage ?
|
||||
CircuitBreaker.Durability.TRANSIENT : CircuitBreaker.Durability.PERMANENT;
|
||||
logger.debug("{}", message);
|
||||
logger.debug(() -> new ParameterizedMessage("{}", message.toString()));
|
||||
throw new CircuitBreakingException(message.toString(), memoryUsed.totalUsage, parentLimit, durability);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows to register a custom circuit breaker.
|
||||
* Warning: Will overwrite any existing custom breaker with the same name.
|
||||
*/
|
||||
@Override
|
||||
public void registerBreaker(BreakerSettings breakerSettings) {
|
||||
private CircuitBreaker validateAndCreateBreaker(BreakerSettings breakerSettings) {
|
||||
// Validate the settings
|
||||
validateSettings(new BreakerSettings[] {breakerSettings});
|
||||
|
||||
if (breakerSettings.getType() == CircuitBreaker.Type.NOOP) {
|
||||
CircuitBreaker breaker = new NoopCircuitBreaker(breakerSettings.getName());
|
||||
breakers.put(breakerSettings.getName(), breaker);
|
||||
} else {
|
||||
CircuitBreaker oldBreaker;
|
||||
CircuitBreaker breaker = new ChildMemoryCircuitBreaker(breakerSettings,
|
||||
LogManager.getLogger(CHILD_LOGGER_PREFIX + breakerSettings.getName()),
|
||||
this, breakerSettings.getName());
|
||||
|
||||
for (;;) {
|
||||
oldBreaker = breakers.putIfAbsent(breakerSettings.getName(), breaker);
|
||||
if (oldBreaker == null) {
|
||||
return;
|
||||
}
|
||||
breaker = new ChildMemoryCircuitBreaker(breakerSettings,
|
||||
(ChildMemoryCircuitBreaker)oldBreaker,
|
||||
LogManager.getLogger(CHILD_LOGGER_PREFIX + breakerSettings.getName()),
|
||||
this, breakerSettings.getName());
|
||||
|
||||
if (breakers.replace(breakerSettings.getName(), oldBreaker, breaker)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return breakerSettings.getType() == CircuitBreaker.Type.NOOP ?
|
||||
new NoopCircuitBreaker(breakerSettings.getName()) :
|
||||
new ChildMemoryCircuitBreaker(breakerSettings,
|
||||
LogManager.getLogger(CHILD_LOGGER_PREFIX + breakerSettings.getName()),
|
||||
this,
|
||||
breakerSettings.getName());
|
||||
}
|
||||
}
|
||||
|
@ -48,8 +48,4 @@ public class NoneCircuitBreakerService extends CircuitBreakerService {
|
||||
return new CircuitBreakerStats(CircuitBreaker.FIELDDATA, -1, -1, 0, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerBreaker(BreakerSettings breakerSettings) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
@ -110,6 +110,7 @@ import org.elasticsearch.indices.IndicesModule;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.SystemIndexDescriptor;
|
||||
import org.elasticsearch.indices.analysis.AnalysisModule;
|
||||
import org.elasticsearch.indices.breaker.BreakerSettings;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
@ -127,6 +128,7 @@ import org.elasticsearch.persistent.PersistentTasksExecutorRegistry;
|
||||
import org.elasticsearch.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.plugins.ActionPlugin;
|
||||
import org.elasticsearch.plugins.AnalysisPlugin;
|
||||
import org.elasticsearch.plugins.CircuitBreakerPlugin;
|
||||
import org.elasticsearch.plugins.ClusterPlugin;
|
||||
import org.elasticsearch.plugins.DiscoveryPlugin;
|
||||
import org.elasticsearch.plugins.EnginePlugin;
|
||||
@ -400,8 +402,18 @@ public class Node implements Closeable {
|
||||
modules.add(indicesModule);
|
||||
|
||||
SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class));
|
||||
CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
|
||||
List<BreakerSettings> pluginCircuitBreakers = pluginsService.filterPlugins(CircuitBreakerPlugin.class)
|
||||
.stream()
|
||||
.map(plugin -> plugin.getCircuitBreaker(settings))
|
||||
.collect(Collectors.toList());
|
||||
final CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
|
||||
pluginCircuitBreakers,
|
||||
settingsModule.getClusterSettings());
|
||||
pluginsService.filterPlugins(CircuitBreakerPlugin.class)
|
||||
.forEach(plugin -> {
|
||||
CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName());
|
||||
plugin.setCircuitBreaker(breaker);
|
||||
});
|
||||
resourcesToClose.add(circuitBreakerService);
|
||||
modules.add(new GatewayModule());
|
||||
|
||||
@ -1031,10 +1043,12 @@ public class Node implements Closeable {
|
||||
* Creates a new {@link CircuitBreakerService} based on the settings provided.
|
||||
* @see #BREAKER_TYPE_KEY
|
||||
*/
|
||||
public static CircuitBreakerService createCircuitBreakerService(Settings settings, ClusterSettings clusterSettings) {
|
||||
public static CircuitBreakerService createCircuitBreakerService(Settings settings,
|
||||
List<BreakerSettings> breakerSettings,
|
||||
ClusterSettings clusterSettings) {
|
||||
String type = BREAKER_TYPE_KEY.get(settings);
|
||||
if (type.equals("hierarchy")) {
|
||||
return new HierarchyCircuitBreakerService(settings, clusterSettings);
|
||||
return new HierarchyCircuitBreakerService(settings, breakerSettings, clusterSettings);
|
||||
} else if (type.equals("none")) {
|
||||
return new NoneCircuitBreakerService();
|
||||
} else {
|
||||
|
@ -0,0 +1,59 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugins;
|
||||
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.indices.breaker.BreakerSettings;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
|
||||
|
||||
/**
|
||||
* An extension point for {@link Plugin} implementations to add custom circuit breakers
|
||||
*/
|
||||
public interface CircuitBreakerPlugin {
|
||||
|
||||
/**
|
||||
* Each of the factory functions are passed to the configured {@link CircuitBreakerService}.
|
||||
*
|
||||
* The service then constructs a {@link CircuitBreaker} given the resulting {@link BreakerSettings}.
|
||||
*
|
||||
* Custom circuit breakers settings can be found in {@link BreakerSettings}.
|
||||
* See:
|
||||
* - limit (example: `breaker.foo.limit`) {@link BreakerSettings#CIRCUIT_BREAKER_LIMIT_SETTING}
|
||||
* - overhead (example: `breaker.foo.overhead`) {@link BreakerSettings#CIRCUIT_BREAKER_OVERHEAD_SETTING}
|
||||
* - type (example: `breaker.foo.type`) {@link BreakerSettings#CIRCUIT_BREAKER_TYPE}
|
||||
*
|
||||
* The `limit` and `overhead` settings will be dynamically updated in the circuit breaker service iff a {@link BreakerSettings}
|
||||
* object with the same name is provided at node startup.
|
||||
*/
|
||||
BreakerSettings getCircuitBreaker(Settings settings);
|
||||
|
||||
/**
|
||||
* The passed {@link CircuitBreaker} object is the same one that was constructed by the {@link BreakerSettings}
|
||||
* provided by {@link CircuitBreakerPlugin#getCircuitBreaker(Settings)}.
|
||||
*
|
||||
* This reference should never change throughout the lifetime of the node.
|
||||
*
|
||||
* @param circuitBreaker The constructed {@link CircuitBreaker} object from the {@link BreakerSettings}
|
||||
* provided by {@link CircuitBreakerPlugin#getCircuitBreaker(Settings)}
|
||||
*/
|
||||
void setCircuitBreaker(CircuitBreaker circuitBreaker);
|
||||
}
|
@ -33,6 +33,7 @@ import org.junit.Before;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
|
||||
@ -357,6 +358,7 @@ public class BigArraysTests extends ESTestCase {
|
||||
.put(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), maxSize, ByteSizeUnit.BYTES)
|
||||
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false)
|
||||
.build(),
|
||||
Collections.emptyList(),
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
||||
BigArrays bigArrays = new BigArrays(null, hcbs, CircuitBreaker.REQUEST).withCircuitBreaking();
|
||||
Method create = BigArrays.class.getMethod("new" + type + "Array", long.class);
|
||||
@ -421,6 +423,7 @@ public class BigArraysTests extends ESTestCase {
|
||||
.put(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), maxSize, ByteSizeUnit.BYTES)
|
||||
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false)
|
||||
.build(),
|
||||
Collections.emptyList(),
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
||||
BigArrays bigArrays = new BigArrays(null, hcbs, CircuitBreaker.REQUEST);
|
||||
return (withBreaking ? bigArrays.withCircuitBreaking() : bigArrays);
|
||||
|
@ -89,7 +89,7 @@ public class ZenFaultDetectionTests extends ESTestCase {
|
||||
.build();
|
||||
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
threadPool = new TestThreadPool(getClass().getName());
|
||||
circuitBreakerService = new HierarchyCircuitBreakerService(settings, clusterSettings);
|
||||
circuitBreakerService = new HierarchyCircuitBreakerService(settings, Collections.emptyList(), clusterSettings);
|
||||
settingsA = Settings.builder().put("node.name", "TS_A").put(settings).build();
|
||||
serviceA = build(settingsA, version0);
|
||||
nodeA = serviceA.getLocalDiscoNode();
|
||||
|
@ -0,0 +1,60 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.indices.breaker;
|
||||
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class BreakerSettingsTests extends ESTestCase {
|
||||
|
||||
public void testFromSettings() {
|
||||
Settings clusterSettings = Settings.builder()
|
||||
.put(BreakerSettings.breakerLimitSettingKey("foo"), "100b")
|
||||
.put(BreakerSettings.breakerLimitSettingKey("bar"), "150b")
|
||||
.put(BreakerSettings.breakerOverheadSettingKey("bar"), 2.5)
|
||||
.put(BreakerSettings.breakerTypeSettingKey("bar"), CircuitBreaker.Type.MEMORY)
|
||||
.build();
|
||||
|
||||
BreakerSettings breakerFoo = BreakerSettings.updateFromSettings(new BreakerSettings("foo",
|
||||
10L,
|
||||
1.2d,
|
||||
CircuitBreaker.Type.NOOP,
|
||||
CircuitBreaker.Durability.TRANSIENT),
|
||||
clusterSettings);
|
||||
assertThat(breakerFoo.getDurability(), equalTo(CircuitBreaker.Durability.TRANSIENT));
|
||||
assertThat(breakerFoo.getLimit(), equalTo(100L));
|
||||
assertThat(breakerFoo.getOverhead(), equalTo(1.2));
|
||||
assertThat(breakerFoo.getType(), equalTo(CircuitBreaker.Type.NOOP));
|
||||
|
||||
BreakerSettings breakerBar = BreakerSettings.updateFromSettings(new BreakerSettings("bar",
|
||||
5L,
|
||||
0.5d,
|
||||
CircuitBreaker.Type.NOOP,
|
||||
CircuitBreaker.Durability.PERMANENT),
|
||||
clusterSettings);
|
||||
assertThat(breakerBar.getDurability(), equalTo(CircuitBreaker.Durability.PERMANENT));
|
||||
assertThat(breakerBar.getLimit(), equalTo(150L));
|
||||
assertThat(breakerBar.getOverhead(), equalTo(2.5));
|
||||
assertThat(breakerBar.getType(), equalTo(CircuitBreaker.Type.MEMORY));
|
||||
}
|
||||
}
|
@ -30,16 +30,22 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
|
||||
|
||||
public void testThreadedUpdatesToChildBreaker() throws Exception {
|
||||
final int NUM_THREADS = scaledRandomIntBetween(3, 15);
|
||||
final int BYTES_PER_THREAD = scaledRandomIntBetween(500, 4500);
|
||||
@ -49,6 +55,7 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
|
||||
|
||||
final AtomicReference<ChildMemoryCircuitBreaker> breakerRef = new AtomicReference<>(null);
|
||||
final CircuitBreakerService service = new HierarchyCircuitBreakerService(Settings.EMPTY,
|
||||
Collections.emptyList(),
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) {
|
||||
|
||||
@Override
|
||||
@ -107,6 +114,7 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
|
||||
final AtomicInteger parentTripped = new AtomicInteger(0);
|
||||
final AtomicReference<ChildMemoryCircuitBreaker> breakerRef = new AtomicReference<>(null);
|
||||
final CircuitBreakerService service = new HierarchyCircuitBreakerService(Settings.EMPTY,
|
||||
Collections.emptyList(),
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) {
|
||||
|
||||
@Override
|
||||
@ -166,12 +174,11 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
|
||||
assertThat("total breaker was tripped at least once", tripped.get(), greaterThanOrEqualTo(1));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test that a breaker correctly redistributes to a different breaker, in
|
||||
* this case, the request breaker borrows space from the fielddata breaker
|
||||
*/
|
||||
public void testBorrowingSiblingBreakerMemory() throws Exception {
|
||||
public void testBorrowingSiblingBreakerMemory() {
|
||||
Settings clusterSettings = Settings.builder()
|
||||
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false)
|
||||
.put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "200mb")
|
||||
@ -179,6 +186,7 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
|
||||
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "150mb")
|
||||
.build();
|
||||
try (CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings,
|
||||
Collections.emptyList(),
|
||||
new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {
|
||||
CircuitBreaker requestCircuitBreaker = service.getBreaker(CircuitBreaker.REQUEST);
|
||||
CircuitBreaker fieldDataCircuitBreaker = service.getBreaker(CircuitBreaker.FIELDDATA);
|
||||
@ -207,7 +215,7 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void testParentBreaksOnRealMemoryUsage() throws Exception {
|
||||
public void testParentBreaksOnRealMemoryUsage() {
|
||||
Settings clusterSettings = Settings.builder()
|
||||
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), Boolean.TRUE)
|
||||
.put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "200b")
|
||||
@ -217,6 +225,7 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
|
||||
|
||||
AtomicLong memoryUsage = new AtomicLong();
|
||||
final CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings,
|
||||
Collections.emptyList(),
|
||||
new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) {
|
||||
@Override
|
||||
long currentMemoryUsage() {
|
||||
@ -269,6 +278,7 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
|
||||
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "150mb")
|
||||
.build();
|
||||
try (CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings,
|
||||
Collections.emptyList(),
|
||||
new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {
|
||||
CircuitBreaker requestCircuitBreaker = service.getBreaker(CircuitBreaker.REQUEST);
|
||||
CircuitBreaker fieldDataCircuitBreaker = service.getBreaker(CircuitBreaker.FIELDDATA);
|
||||
@ -294,16 +304,17 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void testAllocationBucketsBreaker() throws Exception {
|
||||
public void testAllocationBucketsBreaker() {
|
||||
Settings clusterSettings = Settings.builder()
|
||||
.put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "100b")
|
||||
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), "false")
|
||||
.build();
|
||||
|
||||
try (CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings,
|
||||
try (HierarchyCircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings,
|
||||
Collections.emptyList(),
|
||||
new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {
|
||||
|
||||
long parentLimitBytes = ((HierarchyCircuitBreakerService) service).getParentLimit();
|
||||
long parentLimitBytes = service.getParentLimit();
|
||||
assertEquals(new ByteSizeValue(100, ByteSizeUnit.BYTES).getBytes(), parentLimitBytes);
|
||||
|
||||
CircuitBreaker breaker = service.getBreaker(CircuitBreaker.REQUEST);
|
||||
@ -320,7 +331,39 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
private long mb(long size) {
|
||||
public void testRegisterCustomCircuitBreakers_WithDuplicates() {
|
||||
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class,
|
||||
() -> new HierarchyCircuitBreakerService(
|
||||
Settings.EMPTY,
|
||||
Collections.singletonList(new BreakerSettings(CircuitBreaker.FIELDDATA, 100, 1.2)),
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
|
||||
assertThat(iae.getMessage(),
|
||||
containsString("More than one circuit breaker with the name [fielddata] exists. Circuit breaker names must be unique"));
|
||||
|
||||
iae = expectThrows(IllegalArgumentException.class,
|
||||
() -> new HierarchyCircuitBreakerService(
|
||||
Settings.EMPTY,
|
||||
Arrays.asList(new BreakerSettings("foo", 100, 1.2), new BreakerSettings("foo", 200, 0.1)),
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
|
||||
assertThat(iae.getMessage(),
|
||||
containsString("More than one circuit breaker with the name [foo] exists. Circuit breaker names must be unique"));
|
||||
}
|
||||
|
||||
public void testCustomCircuitBreakers() {
|
||||
try (CircuitBreakerService service = new HierarchyCircuitBreakerService(
|
||||
Settings.EMPTY,
|
||||
Arrays.asList(new BreakerSettings("foo", 100, 1.2), new BreakerSettings("bar", 200, 0.1)),
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {
|
||||
assertThat(service.getBreaker("foo"), is(not(nullValue())));
|
||||
assertThat(service.getBreaker("foo").getOverhead(), equalTo(1.2));
|
||||
assertThat(service.getBreaker("foo").getLimit(), equalTo(100L));
|
||||
assertThat(service.getBreaker("bar"), is(not(nullValue())));
|
||||
assertThat(service.getBreaker("bar").getOverhead(), equalTo(0.1));
|
||||
assertThat(service.getBreaker("bar").getLimit(), equalTo(200L));
|
||||
}
|
||||
}
|
||||
|
||||
private static long mb(long size) {
|
||||
return new ByteSizeValue(size, ByteSizeUnit.MB).getBytes();
|
||||
}
|
||||
}
|
||||
|
@ -20,17 +20,12 @@
|
||||
package org.elasticsearch.indices.memory.breaker;
|
||||
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.indices.breaker.BreakerSettings;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
/**
|
||||
* Unit tests for the circuit breaker
|
||||
@ -65,16 +60,4 @@ public class CircuitBreakerUnitTests extends ESTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void testRegisterCustomBreaker() throws Exception {
|
||||
CircuitBreakerService service = new HierarchyCircuitBreakerService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
|
||||
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
||||
String customName = "custom";
|
||||
BreakerSettings settings = new BreakerSettings(customName, 20, 1.0);
|
||||
service.registerBreaker(settings);
|
||||
|
||||
CircuitBreaker breaker = service.getBreaker(customName);
|
||||
assertThat(breaker, notNullValue());
|
||||
assertThat(breaker, instanceOf(CircuitBreaker.class));
|
||||
assertThat(breaker.getName(), is(customName));
|
||||
}
|
||||
}
|
||||
|
@ -19,9 +19,11 @@
|
||||
package org.elasticsearch.node;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.bootstrap.BootstrapCheck;
|
||||
import org.elasticsearch.bootstrap.BootstrapContext;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
@ -30,6 +32,9 @@ import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.engine.Engine.Searcher;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.breaker.BreakerSettings;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.plugins.CircuitBreakerPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
@ -50,6 +55,10 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
@LuceneTestCase.SuppressFileSystems(value = "ExtrasFS")
|
||||
public class NodeTests extends ESTestCase {
|
||||
@ -288,4 +297,45 @@ public class NodeTests extends ESTestCase {
|
||||
shard.store().decRef();
|
||||
assertThat(e.getMessage(), containsString("Something is leaking index readers or store references"));
|
||||
}
|
||||
|
||||
public void testCreateWithCircuitBreakerPlugins() throws IOException {
|
||||
Settings.Builder settings = baseSettings()
|
||||
.put("breaker.test_breaker.limit", "50b");
|
||||
List<Class<? extends Plugin>> plugins = basePlugins();
|
||||
plugins.add(MockCircuitBreakerPlugin.class);
|
||||
try (Node node = new MockNode(settings.build(), plugins)) {
|
||||
CircuitBreakerService service = node.injector().getInstance(CircuitBreakerService.class);
|
||||
assertThat(service.getBreaker("test_breaker"), is(not(nullValue())));
|
||||
assertThat(service.getBreaker("test_breaker").getLimit(), equalTo(50L));
|
||||
CircuitBreakerPlugin breakerPlugin = node.getPluginsService().filterPlugins(CircuitBreakerPlugin.class).get(0);
|
||||
assertTrue(breakerPlugin instanceof MockCircuitBreakerPlugin);
|
||||
assertSame("plugin circuit breaker instance is not the same as breaker service's instance",
|
||||
((MockCircuitBreakerPlugin)breakerPlugin).myCircuitBreaker.get(),
|
||||
service.getBreaker("test_breaker"));
|
||||
}
|
||||
}
|
||||
|
||||
public static class MockCircuitBreakerPlugin extends Plugin implements CircuitBreakerPlugin {
|
||||
|
||||
private SetOnce<CircuitBreaker> myCircuitBreaker = new SetOnce<>();
|
||||
|
||||
public MockCircuitBreakerPlugin() {}
|
||||
|
||||
@Override
|
||||
public BreakerSettings getCircuitBreaker(Settings settings) {
|
||||
return BreakerSettings.updateFromSettings(
|
||||
new BreakerSettings("test_breaker",
|
||||
100L,
|
||||
1.0d,
|
||||
CircuitBreaker.Type.MEMORY,
|
||||
CircuitBreaker.Durability.TRANSIENT),
|
||||
settings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setCircuitBreaker(CircuitBreaker circuitBreaker) {
|
||||
assertThat(circuitBreaker.getName(), equalTo("test_breaker"));
|
||||
myCircuitBreaker.set(circuitBreaker);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -84,6 +84,7 @@ public class RestControllerTests extends ESTestCase {
|
||||
// We want to have reproducible results in this test, hence we disable real memory usage accounting
|
||||
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false)
|
||||
.build(),
|
||||
Collections.emptyList(),
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
||||
usageService = new UsageService();
|
||||
// we can do this here only because we know that we don't adjust breaker settings dynamically in the test
|
||||
|
@ -84,6 +84,7 @@ public class RestHttpResponseHeadersTests extends ESTestCase {
|
||||
|
||||
// Initialize test candidate RestController
|
||||
CircuitBreakerService circuitBreakerService = new HierarchyCircuitBreakerService(Settings.EMPTY,
|
||||
Collections.emptyList(),
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
||||
|
||||
final Settings settings = Settings.EMPTY;
|
||||
|
@ -377,7 +377,9 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
||||
SimilarityService similarityService = new SimilarityService(indexSettings, null, Collections.emptyMap());
|
||||
final Engine.Warmer warmer = createTestWarmer(indexSettings);
|
||||
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
CircuitBreakerService breakerService = new HierarchyCircuitBreakerService(nodeSettings, clusterSettings);
|
||||
CircuitBreakerService breakerService = new HierarchyCircuitBreakerService(nodeSettings,
|
||||
Collections.emptyList(),
|
||||
clusterSettings);
|
||||
indexShard = new IndexShard(
|
||||
routing,
|
||||
indexSettings,
|
||||
|
@ -28,6 +28,7 @@ import org.hamcrest.Matchers;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
@ -147,6 +148,7 @@ public class FrozenEngineTests extends EngineTestCase {
|
||||
EngineConfig config = config(defaultSettings, store, createTempDir(),
|
||||
NoMergePolicy.INSTANCE, // we don't merge we want no background merges to happen to ensure we have consistent breaker stats
|
||||
null, listener, null, globalCheckpoint::get, new HierarchyCircuitBreakerService(defaultSettings.getSettings(),
|
||||
Collections.emptyList(),
|
||||
new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
|
||||
CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING);
|
||||
final int docs;
|
||||
@ -203,6 +205,7 @@ public class FrozenEngineTests extends EngineTestCase {
|
||||
try (Store store = createStore()) {
|
||||
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, null, globalCheckpoint::get,
|
||||
new HierarchyCircuitBreakerService(defaultSettings.getSettings(),
|
||||
Collections.emptyList(),
|
||||
new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
|
||||
CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING);
|
||||
try (InternalEngine engine = createEngine(config)) {
|
||||
|
@ -35,6 +35,7 @@ import org.elasticsearch.xpack.spatial.index.fielddata.TriangleTreeReader;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils.LATITUDE_MASK;
|
||||
@ -478,9 +479,9 @@ public class GeoGridTilerTests extends ESTestCase {
|
||||
numBytes = values.getValuesBytes();
|
||||
}
|
||||
|
||||
CircuitBreakerService service = new HierarchyCircuitBreakerService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
|
||||
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
||||
service.registerBreaker(new BreakerSettings("limited", numBytes - 1, 1.0));
|
||||
CircuitBreakerService service = new HierarchyCircuitBreakerService(Settings.EMPTY,
|
||||
Collections.singletonList(new BreakerSettings("limited", numBytes - 1, 1.0)),
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
||||
CircuitBreaker limitedBreaker = service.getBreaker("limited");
|
||||
|
||||
Consumer<Long> circuitBreakerConsumer = (l) -> limitedBreaker.addEstimateBytesAndMaybeBreak(l, "agg");
|
||||
|
Loading…
x
Reference in New Issue
Block a user