fixup! fixup! add support for registering custom circuit breaker
This commit is contained in:
parent
3f51352b54
commit
358bb9bd75
|
@ -29,9 +29,9 @@ import java.util.Locale;
|
|||
*/
|
||||
public interface CircuitBreaker {
|
||||
|
||||
public static final String PARENT = "PARENT";
|
||||
public static final String FIELDDATA = "FIELDDATA";
|
||||
public static final String REQUEST = "REQUEST";
|
||||
public static final String PARENT = "parent";
|
||||
public static final String FIELDDATA = "fielddata";
|
||||
public static final String REQUEST = "request";
|
||||
|
||||
public static enum Type {
|
||||
// A regular or child MemoryCircuitBreaker
|
||||
|
|
|
@ -30,8 +30,8 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static com.google.common.collect.Lists.newArrayList;
|
||||
|
@ -42,7 +42,7 @@ import static com.google.common.collect.Lists.newArrayList;
|
|||
*/
|
||||
public class HierarchyCircuitBreakerService extends CircuitBreakerService {
|
||||
|
||||
private final Map<String, CircuitBreaker> breakers = new ConcurrentHashMap();
|
||||
private final ConcurrentMap<String, CircuitBreaker> breakers = new ConcurrentHashMap();
|
||||
|
||||
// Old pre-1.4.0 backwards compatible settings
|
||||
public static final String OLD_CIRCUIT_BREAKER_MAX_BYTES_SETTING = "indices.fielddata.breaker.limit";
|
||||
|
@ -220,7 +220,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
|
|||
long parentLimit = this.parentSettings.getLimit();
|
||||
if (totalUsed > parentLimit) {
|
||||
this.parentTripCount.incrementAndGet();
|
||||
throw new CircuitBreakingException("[PARENT] Data too large, data for [" +
|
||||
throw new CircuitBreakingException("[parent] Data too large, data for [" +
|
||||
label + "] would be larger than limit of [" +
|
||||
parentLimit + "/" + new ByteSizeValue(parentLimit) + "]",
|
||||
totalUsed, parentLimit);
|
||||
|
@ -238,15 +238,27 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
|
|||
// Validate the settings
|
||||
validateSettings(new BreakerSettings[] {breakerSettings});
|
||||
|
||||
CircuitBreaker breaker;
|
||||
if (breakerSettings.getType() == CircuitBreaker.Type.NOOP) {
|
||||
breaker = new NoopCircuitBreaker(breakerSettings.getName());
|
||||
CircuitBreaker breaker = new NoopCircuitBreaker(breakerSettings.getName());
|
||||
breakers.put(breakerSettings.getName(), breaker);
|
||||
} else {
|
||||
CircuitBreaker oldBreaker = breakers.get(breakerSettings.getName());
|
||||
CircuitBreaker oldBreaker;
|
||||
CircuitBreaker breaker = new ChildMemoryCircuitBreaker(breakerSettings,
|
||||
logger, this, breakerSettings.getName());
|
||||
|
||||
for (;;) {
|
||||
oldBreaker = breakers.putIfAbsent(breakerSettings.getName(), breaker);
|
||||
if (oldBreaker == null) {
|
||||
return;
|
||||
}
|
||||
breaker = new ChildMemoryCircuitBreaker(breakerSettings,
|
||||
(ChildMemoryCircuitBreaker)oldBreaker, logger, this, breakerSettings.getName());
|
||||
|
||||
if (breakers.replace(breakerSettings.getName(), oldBreaker, breaker)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
breakers.put(breakerSettings.getName(), breaker);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,8 +27,11 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
|
|||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.breaker.CircuitBreakingException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.indices.breaker.BreakerSettings;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerStats;
|
||||
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
@ -236,7 +239,7 @@ public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest {
|
|||
client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get();
|
||||
fail("should have thrown an exception");
|
||||
} catch (Exception e) {
|
||||
String errMsg = "[FIELDDATA] Data too large, data for [test] would be larger than limit of [10/10b]";
|
||||
String errMsg = "[fielddata] Data too large, data for [test] would be larger than limit of [10/10b]";
|
||||
assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException",
|
||||
ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true));
|
||||
}
|
||||
|
@ -258,7 +261,7 @@ public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest {
|
|||
client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get();
|
||||
fail("should have thrown an exception");
|
||||
} catch (Exception e) {
|
||||
String errMsg = "[PARENT] Data too large, data for [test] would be larger than limit of [15/15b]";
|
||||
String errMsg = "[parent] Data too large, data for [test] would be larger than limit of [15/15b]";
|
||||
assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException",
|
||||
ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true));
|
||||
}
|
||||
|
@ -292,7 +295,7 @@ public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest {
|
|||
client.prepareSearch("cb-test").setQuery(matchAllQuery()).addAggregation(cardinality("card").field("test")).get();
|
||||
fail("aggregation should have tripped the breaker");
|
||||
} catch (Exception e) {
|
||||
String errMsg = "CircuitBreakingException[[REQUEST] Data too large, data for [<reused_arrays>] would be larger than limit of [10/10b]]";
|
||||
String errMsg = "CircuitBreakingException[[request] Data too large, data for [<reused_arrays>] would be larger than limit of [10/10b]]";
|
||||
assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException",
|
||||
ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true));
|
||||
}
|
||||
|
@ -314,4 +317,34 @@ public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest {
|
|||
}
|
||||
}, 30, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCustomCircuitBreakerRegistration() throws Exception {
|
||||
Iterable<CircuitBreakerService> serviceIter = internalCluster().getInstances(CircuitBreakerService.class);
|
||||
|
||||
final String breakerName = "customBreaker";
|
||||
BreakerSettings breakerSettings = new BreakerSettings(breakerName, 8, 1.03);
|
||||
CircuitBreaker breaker = null;
|
||||
|
||||
for (CircuitBreakerService s : serviceIter) {
|
||||
s.registerBreaker(breakerSettings);
|
||||
breaker = s.getBreaker(breakerSettings.getName());
|
||||
}
|
||||
|
||||
if (breaker != null) {
|
||||
try {
|
||||
breaker.addEstimateBytesAndMaybeBreak(16, "test");
|
||||
} catch (CircuitBreakingException e) {
|
||||
// ignore, we forced a circuit break
|
||||
}
|
||||
}
|
||||
|
||||
NodesStatsResponse stats = client().admin().cluster().prepareNodesStats().clear().setBreaker(true).get();
|
||||
int breaks = 0;
|
||||
for (NodeStats stat : stats.getNodes()) {
|
||||
CircuitBreakerStats breakerStats = stat.getBreaker().getStats(breakerName);
|
||||
breaks += breakerStats.getTrippedCount();
|
||||
}
|
||||
assertThat(breaks, greaterThanOrEqualTo(1));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue