fixup! add support for registering custom circuit breaker

This commit is contained in:
Sebastian Utz 2014-12-17 15:48:43 +01:00
parent b9843dbda9
commit 3f51352b54
29 changed files with 106 additions and 281 deletions

View File

@ -38,7 +38,7 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker {
private final AtomicLong trippedCount;
private final ESLogger logger;
private final HierarchyCircuitBreakerService parent;
private final Name name;
private final String name;
/**
* Create a circuit breaker that will break if the number of estimated
@ -49,7 +49,7 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker {
* @param name the name of the breaker
*/
public ChildMemoryCircuitBreaker(BreakerSettings settings, ESLogger logger,
HierarchyCircuitBreakerService parent, Name name) {
HierarchyCircuitBreakerService parent, String name) {
this(settings, null, logger, parent, name);
}
@ -64,7 +64,7 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker {
* @param oldBreaker the previous circuit breaker to inherit the used value from (starting offset)
*/
public ChildMemoryCircuitBreaker(BreakerSettings settings, ChildMemoryCircuitBreaker oldBreaker,
ESLogger logger, HierarchyCircuitBreakerService parent, Name name) {
ESLogger logger, HierarchyCircuitBreakerService parent, String name) {
this.name = name;
this.settings = settings;
this.memoryBytesLimit = settings.getLimit();
@ -220,7 +220,7 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker {
/**
* @return the name of the breaker
*/
public Name getName() {
public String getName() {
return this.name;
}
}

View File

@ -20,13 +20,8 @@
package org.elasticsearch.common.breaker;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
/**
* Interface for an object that can be incremented, breaking after some
@ -34,77 +29,9 @@ import java.util.Map;
*/
public interface CircuitBreaker {
/**
* Enum used for specifying different types of circuit breakers
*/
public static class Name {
private static Map<Integer, Name> names = new HashMap<>();
public static final Name PARENT = register(0, "parent");
public static final Name FIELDDATA = register(1, "fielddata");
public static final Name REQUEST = register(2, "request");
private final int id;
private final String label;
Name(int ordinal, String label) {
this.id = ordinal;
this.label = label;
}
public int getSerializableValue() {
return this.id;
}
public String toString() {
return label.toUpperCase(Locale.ENGLISH);
}
public static Name register(int id, String label) {
if (names.containsKey(id)) {
throw new ElasticsearchIllegalArgumentException(
String.format(Locale.ENGLISH,
"CircuitBreaker.Name with id %d already registered", id));
}
Name name = new Name(id, label);
names.put(id, name);
return name;
}
public static Name readFrom(StreamInput in) throws IOException {
int value = in.readVInt();
Name name = names.get(value);
if (name == null) {
throw new ElasticsearchIllegalArgumentException("No CircuitBreaker.Name with id: " + value);
}
return name;
}
public static void writeTo(Name name, StreamOutput out) throws IOException {
out.writeVInt(name.getSerializableValue());
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Name name = (Name) o;
if (id != name.id) return false;
if (label != null ? !label.equals(name.label) : name.label != null) return false;
return true;
}
@Override
public int hashCode() {
int result = id;
result = 31 * result + (label != null ? label.hashCode() : 0);
return result;
}
}
public static final String PARENT = "PARENT";
public static final String FIELDDATA = "FIELDDATA";
public static final String REQUEST = "REQUEST";
public static enum Type {
// A regular or child MemoryCircuitBreaker
@ -172,5 +99,5 @@ public interface CircuitBreaker {
/**
* @return the name of the breaker
*/
public Name getName();
public String getName();
}

View File

@ -186,7 +186,7 @@ public class MemoryCircuitBreaker implements CircuitBreaker {
/**
* @return the name of the breaker
*/
public Name getName() {
return Name.FIELDDATA;
public String getName() {
return FIELDDATA;
}
}

View File

@ -25,9 +25,9 @@ package org.elasticsearch.common.breaker;
*/
public class NoopCircuitBreaker implements CircuitBreaker {
private final Name name;
private final String name;
public NoopCircuitBreaker(Name name) {
public NoopCircuitBreaker(String name) {
this.name = name;
}
@ -67,7 +67,7 @@ public class NoopCircuitBreaker implements CircuitBreaker {
}
@Override
public Name getName() {
public String getName() {
return this.name;
}
}

View File

@ -387,7 +387,7 @@ public class BigArrays extends AbstractComponent {
*/
void adjustBreaker(long delta) {
if (this.breakerService != null) {
CircuitBreaker breaker = this.breakerService.getBreaker(CircuitBreaker.Name.REQUEST);
CircuitBreaker breaker = this.breakerService.getBreaker(CircuitBreaker.REQUEST);
if (this.checkBreaker == true) {
// checking breaker means potentially tripping, but it doesn't
// have to if the delta is negative

View File

@ -53,7 +53,7 @@ public enum GlobalOrdinalsBuilder {
}
final OrdinalMap ordinalMap = OrdinalMap.build(null, subs, PackedInts.DEFAULT);
final long memorySizeInBytes = ordinalMap.ramBytesUsed();
breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA).addWithoutBreaking(memorySizeInBytes);
breakerService.getBreaker(CircuitBreaker.FIELDDATA).addWithoutBreaking(memorySizeInBytes);
if (logger.isDebugEnabled()) {
logger.debug(

View File

@ -75,7 +75,7 @@ public class DoubleArrayIndexFieldData extends AbstractIndexFieldData<AtomicNume
Terms terms = reader.terms(getFieldNames().indexName());
AtomicNumericFieldData data = null;
// TODO: Use an actual estimator to estimate before loading.
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA));
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.FIELDDATA));
if (terms == null) {
data = AtomicDoubleFieldData.empty(reader.maxDoc());
estimator.afterLoad(null, data.ramBytesUsed());

View File

@ -64,7 +64,7 @@ public class FSTBytesIndexFieldData extends AbstractIndexOrdinalsFieldData {
Terms terms = reader.terms(getFieldNames().indexName());
AtomicOrdinalsFieldData data = null;
// TODO: Use an actual estimator to estimate before loading.
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA));
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.FIELDDATA));
if (terms == null) {
data = AbstractAtomicOrdinalsFieldData.empty();
estimator.afterLoad(null, data.ramBytesUsed());

View File

@ -73,7 +73,7 @@ public class FloatArrayIndexFieldData extends AbstractIndexFieldData<AtomicNumer
Terms terms = reader.terms(getFieldNames().indexName());
AtomicNumericFieldData data = null;
// TODO: Use an actual estimator to estimate before loading.
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA));
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.FIELDDATA));
if (terms == null) {
data = AtomicDoubleFieldData.empty(reader.maxDoc());
estimator.afterLoad(null, data.ramBytesUsed());

View File

@ -83,7 +83,7 @@ public class GeoPointCompressedIndexFieldData extends AbstractIndexGeoPointField
Terms terms = reader.terms(getFieldNames().indexName());
AtomicGeoPointFieldData data = null;
// TODO: Use an actual estimator to estimate before loading.
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA));
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.FIELDDATA));
if (terms == null) {
data = AbstractAtomicGeoPointFieldData.empty(reader.maxDoc());
estimator.afterLoad(null, data.ramBytesUsed());

View File

@ -65,7 +65,7 @@ public class GeoPointDoubleArrayIndexFieldData extends AbstractIndexGeoPointFiel
Terms terms = reader.terms(getFieldNames().indexName());
AtomicGeoPointFieldData data = null;
// TODO: Use an actual estimator to estimate before loading.
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA));
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.FIELDDATA));
if (terms == null) {
data = AbstractAtomicGeoPointFieldData.empty(reader.maxDoc());
estimator.afterLoad(null, data.ramBytesUsed());

View File

@ -90,7 +90,7 @@ public class PackedArrayIndexFieldData extends AbstractIndexFieldData<AtomicNume
final LeafReader reader = context.reader();
Terms terms = reader.terms(getFieldNames().indexName());
AtomicNumericFieldData data = null;
PackedArrayEstimator estimator = new PackedArrayEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA), getNumericType(), getFieldNames().fullName());
PackedArrayEstimator estimator = new PackedArrayEstimator(breakerService.getBreaker(CircuitBreaker.FIELDDATA), getNumericType(), getFieldNames().fullName());
if (terms == null) {
data = AtomicLongFieldData.empty(reader.maxDoc());
estimator.adjustForNoTerms(data.ramBytesUsed());

View File

@ -62,7 +62,7 @@ public class PagedBytesIndexFieldData extends AbstractIndexOrdinalsFieldData {
LeafReader reader = context.reader();
AtomicOrdinalsFieldData data = null;
PagedBytesEstimator estimator = new PagedBytesEstimator(context, breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA), getFieldNames().fullName());
PagedBytesEstimator estimator = new PagedBytesEstimator(context, breakerService.getBreaker(CircuitBreaker.FIELDDATA), getFieldNames().fullName());
Terms terms = reader.terms(getFieldNames().indexName());
if (terms == null) {
data = AbstractAtomicOrdinalsFieldData.empty();

View File

@ -102,7 +102,7 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<AtomicPare
new ParentChildIntersectTermsEnum(reader, UidFieldMapper.NAME, ParentFieldMapper.NAME),
parentTypes
);
ParentChildEstimator estimator = new ParentChildEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA), termsEnum);
ParentChildEstimator estimator = new ParentChildEstimator(breakerService.getBreaker(CircuitBreaker.FIELDDATA), termsEnum);
TermsEnum estimatedTermsEnum = estimator.beforeLoad(null);
ObjectObjectOpenHashMap<String, TypeBuilder> typeBuilders = ObjectObjectOpenHashMap.newInstance();
try {
@ -338,7 +338,7 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<AtomicPare
}
}
breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA).addWithoutBreaking(ramBytesUsed);
breakerService.getBreaker(CircuitBreaker.FIELDDATA).addWithoutBreaking(ramBytesUsed);
if (logger.isDebugEnabled()) {
logger.debug(
"Global-ordinals[_parent] took {}",

View File

@ -19,7 +19,6 @@
package org.elasticsearch.indices.breaker;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -48,9 +47,9 @@ public class AllCircuitBreakerStats implements Streamable, ToXContent {
return this.allStats;
}
public CircuitBreakerStats getStats(CircuitBreaker.Name name) {
public CircuitBreakerStats getStats(String name) {
for (CircuitBreakerStats stats : allStats) {
if (stats.getName() == name) {
if (stats.getName().equals(name)) {
return stats;
}
}

View File

@ -27,23 +27,23 @@ import org.elasticsearch.common.unit.ByteSizeValue;
*/
public class BreakerSettings {
private final CircuitBreaker.Name name;
private final String name;
private final long limitBytes;
private final double overhead;
private final CircuitBreaker.Type type;
public BreakerSettings(CircuitBreaker.Name name, long limitBytes, double overhead) {
public BreakerSettings(String name, long limitBytes, double overhead) {
this(name, limitBytes, overhead, CircuitBreaker.Type.MEMORY);
}
public BreakerSettings(CircuitBreaker.Name name, long limitBytes, double overhead, CircuitBreaker.Type type) {
public BreakerSettings(String name, long limitBytes, double overhead, CircuitBreaker.Type type) {
this.name = name;
this.limitBytes = limitBytes;
this.overhead = overhead;
this.type = type;
}
public CircuitBreaker.Name getName() {
public String getName() {
return this.name;
}
@ -61,7 +61,7 @@ public class BreakerSettings {
@Override
public String toString() {
return "[" + this.name.toString() +
return "[" + this.name +
",type=" + this.type.toString() +
",limit=" + this.limitBytes + "/" + new ByteSizeValue(this.limitBytes) +
",overhead=" + this.overhead + "]";

View File

@ -44,7 +44,7 @@ public abstract class CircuitBreakerService extends AbstractLifecycleComponent<C
/**
* @return the breaker that can be used to register estimates against
*/
public abstract CircuitBreaker getBreaker(CircuitBreaker.Name type);
public abstract CircuitBreaker getBreaker(String name);
/**
* @return stats about all breakers
@ -54,7 +54,7 @@ public abstract class CircuitBreakerService extends AbstractLifecycleComponent<C
/**
* @return stats about a specific breaker
*/
public abstract CircuitBreakerStats stats(CircuitBreaker.Name name);
public abstract CircuitBreakerStats stats(String name);
protected void doStart() throws ElasticsearchException {
}

View File

@ -19,8 +19,6 @@
package org.elasticsearch.indices.breaker;
import org.elasticsearch.Version;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -37,7 +35,7 @@ import java.util.Locale;
*/
public class CircuitBreakerStats implements Streamable, ToXContent {
private CircuitBreaker.Name name;
private String name;
private long limit;
private long estimated;
private long trippedCount;
@ -47,7 +45,7 @@ public class CircuitBreakerStats implements Streamable, ToXContent {
}
public CircuitBreakerStats(CircuitBreaker.Name name, long limit, long estimated, double overhead, long trippedCount) {
public CircuitBreakerStats(String name, long limit, long estimated, double overhead, long trippedCount) {
this.name = name;
this.limit = limit;
this.estimated = estimated;
@ -55,7 +53,7 @@ public class CircuitBreakerStats implements Streamable, ToXContent {
this.overhead = overhead;
}
public CircuitBreaker.Name getName() {
public String getName() {
return this.name;
}
@ -87,7 +85,7 @@ public class CircuitBreakerStats implements Streamable, ToXContent {
estimated = in.readLong();
overhead = in.readDouble();
this.trippedCount = in.readLong();
this.name = CircuitBreaker.Name.readFrom(in);
this.name = in.readString();
}
@Override
@ -96,12 +94,12 @@ public class CircuitBreakerStats implements Streamable, ToXContent {
out.writeLong(estimated);
out.writeDouble(overhead);
out.writeLong(trippedCount);
CircuitBreaker.Name.writeTo(name, out);
out.writeString(name);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(name.toString().toLowerCase(Locale.ROOT));
builder.startObject(name.toLowerCase(Locale.ROOT));
builder.field(Fields.LIMIT, limit);
builder.field(Fields.LIMIT_HUMAN, new ByteSizeValue(limit));
builder.field(Fields.ESTIMATED, estimated);
@ -114,7 +112,7 @@ public class CircuitBreakerStats implements Streamable, ToXContent {
@Override
public String toString() {
return "[" + this.name.toString() +
return "[" + this.name +
",limit=" + this.limit + "/" + new ByteSizeValue(this.limit) +
",estimated=" + this.estimated + "/" + new ByteSizeValue(this.estimated) +
",overhead=" + this.overhead + ",tripped=" + this.trippedCount + "]";

View File

@ -19,9 +19,6 @@
package org.elasticsearch.indices.breaker;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreaker;
@ -32,10 +29,9 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.node.settings.NodeSettingsService;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import static com.google.common.collect.Lists.newArrayList;
@ -46,7 +42,7 @@ import static com.google.common.collect.Lists.newArrayList;
*/
public class HierarchyCircuitBreakerService extends CircuitBreakerService {
private volatile ImmutableMap<CircuitBreaker.Name, CircuitBreaker> breakers;
private final Map<String, CircuitBreaker> breakers = new ConcurrentHashMap();
// Old pre-1.4.0 backwards compatible settings
public static final String OLD_CIRCUIT_BREAKER_MAX_BYTES_SETTING = "indices.fielddata.breaker.limit";
@ -68,20 +64,12 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
public static final String DEFAULT_BREAKER_TYPE = "memory";
private static final Set<CircuitBreaker.Name> BUILT_IN_BREAKER_NAMES = ImmutableSet.<CircuitBreaker.Name>builder()
.add(CircuitBreaker.Name.PARENT)
.add(CircuitBreaker.Name.FIELDDATA)
.add(CircuitBreaker.Name.REQUEST)
.build();
private volatile BreakerSettings parentSettings;
private volatile BreakerSettings parentSettings;
private volatile BreakerSettings fielddataSettings;
private volatile BreakerSettings requestSettings;
// Tripped count for when redistribution was attempted but wasn't successful
private final AtomicLong parentTripCount = new AtomicLong(0);
private final Object lock = new Object();
@Inject
public HierarchyCircuitBreakerService(Settings settings, NodeSettingsService nodeSettingsService) {
@ -105,48 +93,26 @@ private volatile BreakerSettings parentSettings;
compatibilityFielddataOverheadDefault = compatibilityFielddataOverhead;
}
this.fielddataSettings = new BreakerSettings(CircuitBreaker.Name.FIELDDATA,
this.fielddataSettings = new BreakerSettings(CircuitBreaker.FIELDDATA,
settings.getAsMemory(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, compatibilityFielddataLimitDefault).bytes(),
settings.getAsDouble(FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, compatibilityFielddataOverheadDefault),
CircuitBreaker.Type.parseValue(settings.get(FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING, DEFAULT_BREAKER_TYPE))
);
this.requestSettings = new BreakerSettings(CircuitBreaker.Name.REQUEST,
this.requestSettings = new BreakerSettings(CircuitBreaker.REQUEST,
settings.getAsMemory(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, DEFAULT_REQUEST_BREAKER_LIMIT).bytes(),
settings.getAsDouble(REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.0),
CircuitBreaker.Type.parseValue(settings.get(REQUEST_CIRCUIT_BREAKER_TYPE_SETTING, DEFAULT_BREAKER_TYPE))
);
// Validate the configured settings
validateSettings(new BreakerSettings[] {this.requestSettings, this.fielddataSettings});
this.parentSettings = new BreakerSettings(CircuitBreaker.Name.PARENT,
this.parentSettings = new BreakerSettings(CircuitBreaker.PARENT,
settings.getAsMemory(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, DEFAULT_TOTAL_CIRCUIT_BREAKER_LIMIT).bytes(), 1.0, CircuitBreaker.Type.PARENT);
if (logger.isTraceEnabled()) {
logger.trace("parent circuit breaker with settings {}", this.parentSettings);
}
Map<CircuitBreaker.Name, CircuitBreaker> tempBreakers = new HashMap<>();
CircuitBreaker fielddataBreaker;
if (fielddataSettings.getType() == CircuitBreaker.Type.NOOP) {
fielddataBreaker = new NoopCircuitBreaker(CircuitBreaker.Name.FIELDDATA);
} else {
fielddataBreaker = new ChildMemoryCircuitBreaker(fielddataSettings, logger, this, CircuitBreaker.Name.FIELDDATA);
}
CircuitBreaker requestBreaker;
if (requestSettings.getType() == CircuitBreaker.Type.NOOP) {
requestBreaker = new NoopCircuitBreaker(CircuitBreaker.Name.REQUEST);
} else {
requestBreaker = new ChildMemoryCircuitBreaker(requestSettings, logger, this, CircuitBreaker.Name.REQUEST);
}
tempBreakers.put(CircuitBreaker.Name.FIELDDATA, fielddataBreaker);
tempBreakers.put(CircuitBreaker.Name.REQUEST, requestBreaker);
synchronized (lock) {
this.breakers = ImmutableMap.copyOf(tempBreakers);
}
registerBreaker(this.requestSettings);
registerBreaker(this.fielddataSettings);
nodeSettingsService.addListener(new ApplySettings());
}
@ -155,75 +121,43 @@ private volatile BreakerSettings parentSettings;
@Override
public void onRefreshSettings(Settings settings) {
boolean changed = false;
// Fielddata settings
BreakerSettings newFielddataSettings = HierarchyCircuitBreakerService.this.fielddataSettings;
ByteSizeValue newFielddataMax = settings.getAsMemory(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, null);
Double newFielddataOverhead = settings.getAsDouble(FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, null);
if (newFielddataMax != null || newFielddataOverhead != null) {
changed = true;
long newFielddataLimitBytes = newFielddataMax == null ? HierarchyCircuitBreakerService.this.fielddataSettings.getLimit() : newFielddataMax.bytes();
newFielddataOverhead = newFielddataOverhead == null ? HierarchyCircuitBreakerService.this.fielddataSettings.getOverhead() : newFielddataOverhead;
newFielddataSettings = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, newFielddataLimitBytes, newFielddataOverhead,
BreakerSettings newFielddataSettings = new BreakerSettings(CircuitBreaker.FIELDDATA, newFielddataLimitBytes, newFielddataOverhead,
HierarchyCircuitBreakerService.this.fielddataSettings.getType());
registerBreaker(newFielddataSettings);
HierarchyCircuitBreakerService.this.fielddataSettings = newFielddataSettings;
logger.info("Updated breaker settings fielddata: {}", newFielddataSettings);
}
// Request settings
BreakerSettings newRequestSettings = HierarchyCircuitBreakerService.this.requestSettings;
ByteSizeValue newRequestMax = settings.getAsMemory(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, null);
Double newRequestOverhead = settings.getAsDouble(REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, null);
if (newRequestMax != null || newRequestOverhead != null) {
changed = true;
long newRequestLimitBytes = newRequestMax == null ? HierarchyCircuitBreakerService.this.requestSettings.getLimit() : newRequestMax.bytes();
newRequestOverhead = newRequestOverhead == null ? HierarchyCircuitBreakerService.this.requestSettings.getOverhead() : newRequestOverhead;
newRequestSettings = new BreakerSettings(CircuitBreaker.Name.REQUEST, newRequestLimitBytes, newRequestOverhead,
BreakerSettings newRequestSettings = new BreakerSettings(CircuitBreaker.REQUEST, newRequestLimitBytes, newRequestOverhead,
HierarchyCircuitBreakerService.this.requestSettings.getType());
registerBreaker(newRequestSettings);
HierarchyCircuitBreakerService.this.requestSettings = newRequestSettings;
logger.info("Updated breaker settings request: {}", newRequestSettings);
}
// Parent settings
BreakerSettings newParentSettings = HierarchyCircuitBreakerService.this.parentSettings;
long oldParentMax = HierarchyCircuitBreakerService.this.parentSettings.getLimit();
ByteSizeValue newParentMax = settings.getAsMemory(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, null);
if (newParentMax != null && (newParentMax.bytes() != oldParentMax)) {
changed = true;
newParentSettings = new BreakerSettings(CircuitBreaker.Name.PARENT, newParentMax.bytes(), 1.0, CircuitBreaker.Type.PARENT);
}
if (changed) {
// change all the things
validateSettings(new BreakerSettings[]{newFielddataSettings, newRequestSettings});
logger.info("Updating settings parent: {}, fielddata: {}, request: {}", newParentSettings, newFielddataSettings, newRequestSettings);
BreakerSettings newParentSettings = new BreakerSettings(CircuitBreaker.PARENT, newParentMax.bytes(), 1.0, CircuitBreaker.Type.PARENT);
validateSettings(new BreakerSettings[]{newParentSettings});
HierarchyCircuitBreakerService.this.parentSettings = newParentSettings;
HierarchyCircuitBreakerService.this.fielddataSettings = newFielddataSettings;
HierarchyCircuitBreakerService.this.requestSettings = newRequestSettings;
Map<CircuitBreaker.Name, CircuitBreaker> tempBreakers = new HashMap<>();
CircuitBreaker fielddataBreaker;
if (newFielddataSettings.getType() == CircuitBreaker.Type.NOOP) {
fielddataBreaker = new NoopCircuitBreaker(CircuitBreaker.Name.FIELDDATA);
} else {
fielddataBreaker = new ChildMemoryCircuitBreaker(newFielddataSettings,
(ChildMemoryCircuitBreaker) HierarchyCircuitBreakerService.this.breakers.get(CircuitBreaker.Name.FIELDDATA),
logger, HierarchyCircuitBreakerService.this, CircuitBreaker.Name.FIELDDATA);
}
CircuitBreaker requestBreaker;
if (newRequestSettings.getType() == CircuitBreaker.Type.NOOP) {
requestBreaker = new NoopCircuitBreaker(CircuitBreaker.Name.REQUEST);
} else {
requestBreaker = new ChildMemoryCircuitBreaker(newRequestSettings,
(ChildMemoryCircuitBreaker)HierarchyCircuitBreakerService.this.breakers.get(CircuitBreaker.Name.REQUEST),
logger, HierarchyCircuitBreakerService.this, CircuitBreaker.Name.REQUEST);
}
tempBreakers.put(CircuitBreaker.Name.FIELDDATA, fielddataBreaker);
tempBreakers.put(CircuitBreaker.Name.REQUEST, requestBreaker);
synchronized (lock) {
HierarchyCircuitBreakerService.this.breakers = ImmutableMap.copyOf(tempBreakers);
}
logger.info("Updated breaker settings parent: {}", newParentSettings);
}
}
}
@ -246,7 +180,7 @@ private volatile BreakerSettings parentSettings;
}
@Override
public CircuitBreaker getBreaker(CircuitBreaker.Name name) {
public CircuitBreaker getBreaker(String name) {
return this.breakers.get(name);
}
@ -261,13 +195,13 @@ private volatile BreakerSettings parentSettings;
parentEstimated += breaker.getUsed();
}
// Manually add the parent breaker settings since they aren't part of the breaker map
allStats.add(new CircuitBreakerStats(CircuitBreaker.Name.PARENT, parentSettings.getLimit(),
allStats.add(new CircuitBreakerStats(CircuitBreaker.PARENT, parentSettings.getLimit(),
parentEstimated, 1.0, parentTripCount.get()));
return new AllCircuitBreakerStats(allStats.toArray(new CircuitBreakerStats[allStats.size()]));
}
@Override
public CircuitBreakerStats stats(CircuitBreaker.Name name) {
public CircuitBreakerStats stats(String name) {
CircuitBreaker breaker = this.breakers.get(name);
return new CircuitBreakerStats(breaker.getName(), breaker.getLimit(), breaker.getUsed(), breaker.getOverhead(), breaker.getTrippedCount());
}
@ -294,36 +228,25 @@ private volatile BreakerSettings parentSettings;
}
/**
* Allows to register of a custom circuit breaker.
*
* Warning: Will overwrite any existing custom breaker with the same
* {@link CircuitBreaker.Name}.
* Trying to overwrite a built-in breaker like e.g. {@link CircuitBreaker.Name.REQUEST}
* will fail with an {@link ElasticsearchIllegalArgumentException}.
* Allows to register a custom circuit breaker.
* Warning: Will overwrite any existing custom breaker with the same name.
*
* @param breakerSettings
*/
@Override
public void registerBreaker(BreakerSettings breakerSettings) {
// Validate the configured settings
if (BUILT_IN_BREAKER_NAMES.contains(breakerSettings.getName())) {
throw new ElasticsearchIllegalArgumentException(
"Overwriting of built-in breaker " + breakerSettings.getName() + " is forbidden");
}
// Validate the settings
validateSettings(new BreakerSettings[] {breakerSettings});
CircuitBreaker breaker;
if (breakerSettings.getType() == CircuitBreaker.Type.NOOP) {
breaker = new NoopCircuitBreaker(breakerSettings.getName());
} else {
breaker = new ChildMemoryCircuitBreaker(breakerSettings, logger, this, breakerSettings.getName());
CircuitBreaker oldBreaker = breakers.get(breakerSettings.getName());
breaker = new ChildMemoryCircuitBreaker(breakerSettings,
(ChildMemoryCircuitBreaker)oldBreaker, logger, this, breakerSettings.getName());
}
Map<CircuitBreaker.Name, CircuitBreaker> tempBreakers = new HashMap<>();
tempBreakers.putAll(this.breakers);
tempBreakers.put(breakerSettings.getName(), breaker);
synchronized (lock) {
this.breakers = ImmutableMap.copyOf(tempBreakers);
}
breakers.put(breakerSettings.getName(), breaker);
}
}

View File

@ -28,25 +28,25 @@ import org.elasticsearch.common.settings.ImmutableSettings;
*/
public class NoneCircuitBreakerService extends CircuitBreakerService {
private final CircuitBreaker breaker = new NoopCircuitBreaker(CircuitBreaker.Name.FIELDDATA);
private final CircuitBreaker breaker = new NoopCircuitBreaker(CircuitBreaker.FIELDDATA);
public NoneCircuitBreakerService() {
super(ImmutableSettings.EMPTY);
}
@Override
public CircuitBreaker getBreaker(CircuitBreaker.Name name) {
public CircuitBreaker getBreaker(String name) {
return breaker;
}
@Override
public AllCircuitBreakerStats stats() {
return new AllCircuitBreakerStats(new CircuitBreakerStats[] {stats(CircuitBreaker.Name.FIELDDATA)});
return new AllCircuitBreakerStats(new CircuitBreakerStats[] {stats(CircuitBreaker.FIELDDATA)});
}
@Override
public CircuitBreakerStats stats(CircuitBreaker.Name name) {
return new CircuitBreakerStats(CircuitBreaker.Name.FIELDDATA, -1, -1, 0, 0);
public CircuitBreakerStats stats(String name) {
return new CircuitBreakerStats(CircuitBreaker.FIELDDATA, -1, -1, 0, 0);
}
@Override

View File

@ -49,7 +49,7 @@ public class IndicesFieldDataCacheListener implements IndexFieldDataCache.Listen
@Override
public void onUnload(FieldMapper.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
assert sizeInBytes >= 0 : "When reducing circuit breaker, it should be adjusted with a number higher or equal to 0 and not [" + sizeInBytes + "]";
circuitBreakerService.getBreaker(CircuitBreaker.Name.FIELDDATA).addWithoutBreaking(-sizeInBytes);
circuitBreakerService.getBreaker(CircuitBreaker.FIELDDATA).addWithoutBreaking(-sizeInBytes);
}
}

View File

@ -94,7 +94,7 @@ public class MemoryCircuitBreakerTests extends ElasticsearchTestCase {
final CircuitBreakerService service = new HierarchyCircuitBreakerService(ImmutableSettings.EMPTY, new NodeSettingsService(ImmutableSettings.EMPTY)) {
@Override
public CircuitBreaker getBreaker(CircuitBreaker.Name type) {
public CircuitBreaker getBreaker(String name) {
return breakerRef.get();
}
@ -103,9 +103,9 @@ public class MemoryCircuitBreakerTests extends ElasticsearchTestCase {
// never trip
}
};
final BreakerSettings settings = new BreakerSettings(CircuitBreaker.Name.REQUEST, (BYTES_PER_THREAD * NUM_THREADS) - 1, 1.0);
final BreakerSettings settings = new BreakerSettings(CircuitBreaker.REQUEST, (BYTES_PER_THREAD * NUM_THREADS) - 1, 1.0);
final ChildMemoryCircuitBreaker breaker = new ChildMemoryCircuitBreaker(settings, logger,
(HierarchyCircuitBreakerService)service, CircuitBreaker.Name.REQUEST);
(HierarchyCircuitBreakerService)service, CircuitBreaker.REQUEST);
breakerRef.set(breaker);
for (int i = 0; i < NUM_THREADS; i++) {
@ -155,23 +155,23 @@ public class MemoryCircuitBreakerTests extends ElasticsearchTestCase {
final CircuitBreakerService service = new HierarchyCircuitBreakerService(ImmutableSettings.EMPTY, new NodeSettingsService(ImmutableSettings.EMPTY)) {
@Override
public CircuitBreaker getBreaker(CircuitBreaker.Name type) {
public CircuitBreaker getBreaker(String name) {
return breakerRef.get();
}
@Override
public void checkParentLimit(String label) throws CircuitBreakingException {
// Parent will trip right before regular breaker would trip
if (getBreaker(CircuitBreaker.Name.REQUEST).getUsed() > parentLimit) {
if (getBreaker(CircuitBreaker.REQUEST).getUsed() > parentLimit) {
parentTripped.incrementAndGet();
logger.info("--> parent tripped");
throw new CircuitBreakingException("parent tripped");
}
}
};
final BreakerSettings settings = new BreakerSettings(CircuitBreaker.Name.REQUEST, childLimit, 1.0);
final BreakerSettings settings = new BreakerSettings(CircuitBreaker.REQUEST, childLimit, 1.0);
final ChildMemoryCircuitBreaker breaker = new ChildMemoryCircuitBreaker(settings, logger,
(HierarchyCircuitBreakerService)service, CircuitBreaker.Name.REQUEST);
(HierarchyCircuitBreakerService)service, CircuitBreaker.REQUEST);
breakerRef.set(breaker);
for (int i = 0; i < NUM_THREADS; i++) {

View File

@ -347,7 +347,7 @@ public class BigArraysTests extends ElasticsearchSingleNodeTest {
} catch (InvocationTargetException e) {
assertTrue(e.getCause() instanceof CircuitBreakingException);
}
assertEquals(0, hcbs.getBreaker(CircuitBreaker.Name.REQUEST).getUsed());
assertEquals(0, hcbs.getBreaker(CircuitBreaker.REQUEST).getUsed());
}
}
@ -373,9 +373,9 @@ public class BigArraysTests extends ElasticsearchSingleNodeTest {
break;
}
}
assertEquals(array.ramBytesUsed(), hcbs.getBreaker(CircuitBreaker.Name.REQUEST).getUsed());
assertEquals(array.ramBytesUsed(), hcbs.getBreaker(CircuitBreaker.REQUEST).getUsed());
array.close();
assertEquals(0, hcbs.getBreaker(CircuitBreaker.Name.REQUEST).getUsed());
assertEquals(0, hcbs.getBreaker(CircuitBreaker.REQUEST).getUsed());
}
}

View File

@ -94,10 +94,10 @@ public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest {
private boolean noopBreakerUsed() {
NodesStatsResponse stats = client().admin().cluster().prepareNodesStats().setBreaker(true).get();
for (NodeStats nodeStats : stats) {
if (nodeStats.getBreaker().getStats(CircuitBreaker.Name.REQUEST).getLimit() == 0) {
if (nodeStats.getBreaker().getStats(CircuitBreaker.REQUEST).getLimit() == 0) {
return true;
}
if (nodeStats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getLimit() == 0) {
if (nodeStats.getBreaker().getStats(CircuitBreaker.FIELDDATA).getLimit() == 0) {
return true;
}
}
@ -144,7 +144,7 @@ public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest {
NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get();
int breaks = 0;
for (NodeStats stat : stats.getNodes()) {
CircuitBreakerStats breakerStats = stat.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA);
CircuitBreakerStats breakerStats = stat.getBreaker().getStats(CircuitBreaker.FIELDDATA);
breaks += breakerStats.getTrippedCount();
}
assertThat(breaks, greaterThanOrEqualTo(1));
@ -194,7 +194,7 @@ public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest {
NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get();
int breaks = 0;
for (NodeStats stat : stats.getNodes()) {
CircuitBreakerStats breakerStats = stat.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA);
CircuitBreakerStats breakerStats = stat.getBreaker().getStats(CircuitBreaker.FIELDDATA);
breaks += breakerStats.getTrippedCount();
}
assertThat(breaks, greaterThanOrEqualTo(1));
@ -223,7 +223,7 @@ public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest {
// We need the request limit beforehand, just from a single node because the limit should always be the same
long beforeReqLimit = client.admin().cluster().prepareNodesStats().setBreaker(true).get()
.getNodes()[0].getBreaker().getStats(CircuitBreaker.Name.REQUEST).getLimit();
.getNodes()[0].getBreaker().getStats(CircuitBreaker.REQUEST).getLimit();
Settings resetSettings = settingsBuilder()
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "10b")
@ -308,7 +308,7 @@ public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest {
.clear().setBreaker(true).get(new TimeValue(15, TimeUnit.SECONDS));
for (NodeStats nStats : resp.getNodes()) {
assertThat("fielddata breaker never reset back to 0",
nStats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(),
nStats.getBreaker().getStats(CircuitBreaker.FIELDDATA).getEstimated(),
equalTo(0L));
}
}

View File

@ -19,9 +19,7 @@
package org.elasticsearch.indices.memory.breaker;
import org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.indices.breaker.BreakerSettings;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
@ -30,13 +28,7 @@ import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;
/**
* Unit tests for the circuit breaker
@ -50,21 +42,21 @@ public class CircuitBreakerUnitTests extends ElasticsearchTestCase {
@Test
public void testBreakerSettingsValidationWithValidSettings() {
// parent: {:limit 70}, fd: {:limit 50}, request: {:limit 20}
BreakerSettings fd = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, pctBytes("50%"), 1.0);
BreakerSettings request = new BreakerSettings(CircuitBreaker.Name.REQUEST, pctBytes("20%"), 1.0);
BreakerSettings fd = new BreakerSettings(CircuitBreaker.FIELDDATA, pctBytes("50%"), 1.0);
BreakerSettings request = new BreakerSettings(CircuitBreaker.REQUEST, pctBytes("20%"), 1.0);
HierarchyCircuitBreakerService.validateSettings(new BreakerSettings[]{fd, request});
// parent: {:limit 70}, fd: {:limit 40}, request: {:limit 30}
fd = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, pctBytes("40%"), 1.0);
request = new BreakerSettings(CircuitBreaker.Name.REQUEST, pctBytes("30%"), 1.0);
fd = new BreakerSettings(CircuitBreaker.FIELDDATA, pctBytes("40%"), 1.0);
request = new BreakerSettings(CircuitBreaker.REQUEST, pctBytes("30%"), 1.0);
HierarchyCircuitBreakerService.validateSettings(new BreakerSettings[]{fd, request});
}
@Test
public void testBreakerSettingsValidationNegativeOverhead() {
// parent: {:limit 70}, fd: {:limit 50}, request: {:limit 20}
BreakerSettings fd = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, pctBytes("50%"), -0.1);
BreakerSettings request = new BreakerSettings(CircuitBreaker.Name.REQUEST, pctBytes("20%"), 1.0);
BreakerSettings fd = new BreakerSettings(CircuitBreaker.FIELDDATA, pctBytes("50%"), -0.1);
BreakerSettings request = new BreakerSettings(CircuitBreaker.REQUEST, pctBytes("20%"), 1.0);
try {
HierarchyCircuitBreakerService.validateSettings(new BreakerSettings[]{fd, request});
fail("settings are invalid but validate settings did not throw an exception");
@ -77,7 +69,7 @@ public class CircuitBreakerUnitTests extends ElasticsearchTestCase {
@Test
public void testRegisterCustomBreaker() throws Exception {
CircuitBreakerService service = new HierarchyCircuitBreakerService(ImmutableSettings.EMPTY, new NodeSettingsService(ImmutableSettings.EMPTY));
CircuitBreaker.Name customName = CircuitBreaker.Name.register(3, "custom");
String customName = "custom";
BreakerSettings settings = new BreakerSettings(customName, 20, 1.0);
service.registerBreaker(settings);
@ -87,18 +79,4 @@ public class CircuitBreakerUnitTests extends ElasticsearchTestCase {
assertThat(breaker.getName(), is(customName));
}
@Test
public void testRegisterBuiltInBreakerForbidden() throws Exception {
CircuitBreakerService service = new HierarchyCircuitBreakerService(ImmutableSettings.EMPTY, new NodeSettingsService(ImmutableSettings.EMPTY));
BreakerSettings settings = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, 20, 1.0);
try {
service.registerBreaker(settings);
fail("registering built-in breaker is forbidden but did not throw an exception");
} catch (Exception e) {
assertThat("Incorrect message: " + e.getMessage(),
e.getMessage().contains("Overwriting of built-in breaker " + CircuitBreaker.Name.FIELDDATA + " is forbidden"), equalTo(true));
}
}
}

View File

@ -60,7 +60,7 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration
public void testBreakerWithRandomExceptions() throws IOException, InterruptedException, ExecutionException {
for (NodeStats node : client().admin().cluster().prepareNodesStats()
.clear().setBreaker(true).execute().actionGet().getNodes()) {
assertThat("Breaker is not set to 0", node.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L));
assertThat("Breaker is not set to 0", node.getBreaker().getStats(CircuitBreaker.FIELDDATA).getEstimated(), equalTo(0L));
}
String mapping = XContentFactory.jsonBuilder()
@ -146,7 +146,7 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration
NodesStatsResponse resp = client().admin().cluster().prepareNodesStats()
.clear().setBreaker(true).execute().actionGet();
for (NodeStats stats : resp.getNodes()) {
assertThat("Breaker is set to 0", stats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L));
assertThat("Breaker is set to 0", stats.getBreaker().getStats(CircuitBreaker.FIELDDATA).getEstimated(), equalTo(0L));
}
for (int i = 0; i < numSearches; i++) {
@ -190,7 +190,7 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration
.clear().setBreaker(true).execute().actionGet();
for (NodeStats stats : nodeStats.getNodes()) {
assertThat("Breaker reset to 0 last search success: " + success + " mapping: " + mapping,
stats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L));
stats.getBreaker().getStats(CircuitBreaker.FIELDDATA).getEstimated(), equalTo(0L));
}
}
}

View File

@ -242,7 +242,7 @@ public class CompositeTestCluster extends TestCluster {
.clear().setBreaker(true).execute().actionGet();
for (NodeStats stats : nodeStats.getNodes()) {
assertThat("Fielddata breaker not reset to 0 on node: " + stats.getNode(),
stats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L));
stats.getBreaker().getStats(CircuitBreaker.FIELDDATA).getEstimated(), equalTo(0L));
}
// CompositeTestCluster does not check the request breaker,
// because checking it requires a network request, which in

View File

@ -147,7 +147,7 @@ public final class ExternalTestCluster extends TestCluster {
.clear().setBreaker(true).setIndices(true).execute().actionGet();
for (NodeStats stats : nodeStats.getNodes()) {
assertThat("Fielddata breaker not reset to 0 on node: " + stats.getNode(),
stats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L));
stats.getBreaker().getStats(CircuitBreaker.FIELDDATA).getEstimated(), equalTo(0L));
// ExternalTestCluster does not check the request breaker,
// because checking it requires a network request, which in
// turn increments the breaker, making it non-0

View File

@ -1693,7 +1693,7 @@ public final class InternalTestCluster extends TestCluster {
final String name = nodeAndClient.name;
final CircuitBreakerService breakerService = getInstanceFromNode(CircuitBreakerService.class, nodeAndClient.node);
CircuitBreaker fdBreaker = breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA);
CircuitBreaker fdBreaker = breakerService.getBreaker(CircuitBreaker.FIELDDATA);
assertThat("Fielddata breaker not reset to 0 on node: " + name, fdBreaker.getUsed(), equalTo(0L));
// Anything that uses transport or HTTP can increase the
// request breaker (because they use bigarrays), because of
@ -1707,7 +1707,7 @@ public final class InternalTestCluster extends TestCluster {
assertBusy(new Runnable() {
@Override
public void run() {
CircuitBreaker reqBreaker = breakerService.getBreaker(CircuitBreaker.Name.REQUEST);
CircuitBreaker reqBreaker = breakerService.getBreaker(CircuitBreaker.REQUEST);
assertThat("Request breaker not reset to 0 on node: " + name, reqBreaker.getUsed(), equalTo(0L));
}
});