add support for registering custom circuit breaker

This commit is contained in:
Sebastian Utz 2014-12-05 12:26:59 +01:00
parent 86e1655e4b
commit b9843dbda9
6 changed files with 157 additions and 21 deletions

View File

@ -141,7 +141,7 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker {
newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead));
}
if (memoryBytesLimit > 0 && newUsedWithOverhead > memoryBytesLimit) {
logger.warn("[{}] New used memory {} [{}] from field [{}] would be larger than configured breaker: {} [{}], breaking",
logger.warn("[{}] New used memory {} [{}] for data of [{}] would be larger than configured breaker: {} [{}], breaking",
this.name,
newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), label,
memoryBytesLimit, new ByteSizeValue(memoryBytesLimit));

View File

@ -24,7 +24,9 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
/**
* Interface for an object that can be incremented, breaking after some
@ -35,38 +37,73 @@ public interface CircuitBreaker {
/**
* Enum used for specifying different types of circuit breakers
*/
public static enum Name {
PARENT(0),
FIELDDATA(1),
REQUEST(2);
public static class Name {
private int ordinal;
private static Map<Integer, Name> names = new HashMap<>();
Name(int ordinal) {
this.ordinal = ordinal;
public static final Name PARENT = register(0, "parent");
public static final Name FIELDDATA = register(1, "fielddata");
public static final Name REQUEST = register(2, "request");
private final int id;
private final String label;
Name(int ordinal, String label) {
this.id = ordinal;
this.label = label;
}
public int getSerializableValue() {
return this.ordinal;
return this.id;
}
public String toString() {
return label.toUpperCase(Locale.ENGLISH);
}
public static Name register(int id, String label) {
if (names.containsKey(id)) {
throw new ElasticsearchIllegalArgumentException(
String.format(Locale.ENGLISH,
"CircuitBreaker.Name with id %d already registered", id));
}
Name name = new Name(id, label);
names.put(id, name);
return name;
}
public static Name readFrom(StreamInput in) throws IOException {
int value = in.readVInt();
switch (value) {
case 0:
return Name.PARENT;
case 1:
return Name.FIELDDATA;
case 2:
return Name.REQUEST;
default:
throw new ElasticsearchIllegalArgumentException("No CircuitBreaker with ordinal: " + value);
Name name = names.get(value);
if (name == null) {
throw new ElasticsearchIllegalArgumentException("No CircuitBreaker.Name with id: " + value);
}
return name;
}
public static void writeTo(Name name, StreamOutput out) throws IOException {
out.writeVInt(name.getSerializableValue());
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Name name = (Name) o;
if (id != name.id) return false;
if (label != null ? !label.equals(name.label) : name.label != null) return false;
return true;
}
@Override
public int hashCode() {
int result = id;
result = 31 * result + (label != null ? label.hashCode() : 0);
return result;
}
}
public static enum Type {

View File

@ -34,6 +34,13 @@ public abstract class CircuitBreakerService extends AbstractLifecycleComponent<C
super(settings);
}
/**
* Allows to register of a custom circuit breaker.
*
* @param breakerSettings
*/
public abstract void registerBreaker(BreakerSettings breakerSettings);
/**
* @return the breaker that can be used to register estimates against
*/

View File

@ -20,6 +20,8 @@
package org.elasticsearch.indices.breaker;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreaker;
@ -33,6 +35,7 @@ import org.elasticsearch.node.settings.NodeSettingsService;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import static com.google.common.collect.Lists.newArrayList;
@ -65,12 +68,20 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
public static final String DEFAULT_BREAKER_TYPE = "memory";
private volatile BreakerSettings parentSettings;
private static final Set<CircuitBreaker.Name> BUILT_IN_BREAKER_NAMES = ImmutableSet.<CircuitBreaker.Name>builder()
.add(CircuitBreaker.Name.PARENT)
.add(CircuitBreaker.Name.FIELDDATA)
.add(CircuitBreaker.Name.REQUEST)
.build();
private volatile BreakerSettings parentSettings;
private volatile BreakerSettings fielddataSettings;
private volatile BreakerSettings requestSettings;
// Tripped count for when redistribution was attempted but wasn't successful
private final AtomicLong parentTripCount = new AtomicLong(0);
private final Object lock = new Object();
@Inject
public HierarchyCircuitBreakerService(Settings settings, NodeSettingsService nodeSettingsService) {
@ -133,7 +144,9 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
tempBreakers.put(CircuitBreaker.Name.FIELDDATA, fielddataBreaker);
tempBreakers.put(CircuitBreaker.Name.REQUEST, requestBreaker);
this.breakers = ImmutableMap.copyOf(tempBreakers);
synchronized (lock) {
this.breakers = ImmutableMap.copyOf(tempBreakers);
}
nodeSettingsService.addListener(new ApplySettings());
}
@ -208,7 +221,9 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
tempBreakers.put(CircuitBreaker.Name.FIELDDATA, fielddataBreaker);
tempBreakers.put(CircuitBreaker.Name.REQUEST, requestBreaker);
HierarchyCircuitBreakerService.this.breakers = ImmutableMap.copyOf(tempBreakers);
synchronized (lock) {
HierarchyCircuitBreakerService.this.breakers = ImmutableMap.copyOf(tempBreakers);
}
}
}
}
@ -277,4 +292,38 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
totalUsed, parentLimit);
}
}
/**
* Allows to register of a custom circuit breaker.
*
* Warning: Will overwrite any existing custom breaker with the same
* {@link CircuitBreaker.Name}.
* Trying to overwrite a built-in breaker like e.g. {@link CircuitBreaker.Name.REQUEST}
* will fail with an {@link ElasticsearchIllegalArgumentException}.
*
* @param breakerSettings
*/
@Override
public void registerBreaker(BreakerSettings breakerSettings) {
// Validate the configured settings
if (BUILT_IN_BREAKER_NAMES.contains(breakerSettings.getName())) {
throw new ElasticsearchIllegalArgumentException(
"Overwriting of built-in breaker " + breakerSettings.getName() + " is forbidden");
}
validateSettings(new BreakerSettings[] {breakerSettings});
CircuitBreaker breaker;
if (breakerSettings.getType() == CircuitBreaker.Type.NOOP) {
breaker = new NoopCircuitBreaker(breakerSettings.getName());
} else {
breaker = new ChildMemoryCircuitBreaker(breakerSettings, logger, this, breakerSettings.getName());
}
Map<CircuitBreaker.Name, CircuitBreaker> tempBreakers = new HashMap<>();
tempBreakers.putAll(this.breakers);
tempBreakers.put(breakerSettings.getName(), breaker);
synchronized (lock) {
this.breakers = ImmutableMap.copyOf(tempBreakers);
}
}
}

View File

@ -48,4 +48,9 @@ public class NoneCircuitBreakerService extends CircuitBreakerService {
public CircuitBreakerStats stats(CircuitBreaker.Name name) {
return new CircuitBreakerStats(CircuitBreaker.Name.FIELDDATA, -1, -1, 0, 0);
}
@Override
public void registerBreaker(BreakerSettings breakerSettings) {
// ignore
}
}

View File

@ -19,14 +19,24 @@
package org.elasticsearch.indices.memory.breaker;
import org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.indices.breaker.BreakerSettings;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.is;
/**
* Unit tests for the circuit breaker
@ -63,4 +73,32 @@ public class CircuitBreakerUnitTests extends ElasticsearchTestCase {
e.getMessage().contains("must be non-negative"), equalTo(true));
}
}
@Test
public void testRegisterCustomBreaker() throws Exception {
CircuitBreakerService service = new HierarchyCircuitBreakerService(ImmutableSettings.EMPTY, new NodeSettingsService(ImmutableSettings.EMPTY));
CircuitBreaker.Name customName = CircuitBreaker.Name.register(3, "custom");
BreakerSettings settings = new BreakerSettings(customName, 20, 1.0);
service.registerBreaker(settings);
CircuitBreaker breaker = service.getBreaker(customName);
assertThat(breaker, notNullValue());
assertThat(breaker, instanceOf(CircuitBreaker.class));
assertThat(breaker.getName(), is(customName));
}
@Test
public void testRegisterBuiltInBreakerForbidden() throws Exception {
CircuitBreakerService service = new HierarchyCircuitBreakerService(ImmutableSettings.EMPTY, new NodeSettingsService(ImmutableSettings.EMPTY));
BreakerSettings settings = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, 20, 1.0);
try {
service.registerBreaker(settings);
fail("registering built-in breaker is forbidden but did not throw an exception");
} catch (Exception e) {
assertThat("Incorrect message: " + e.getMessage(),
e.getMessage().contains("Overwriting of built-in breaker " + CircuitBreaker.Name.FIELDDATA + " is forbidden"), equalTo(true));
}
}
}