[TESTS] fix circuit breaker tests for remote clusters and bwc

Adds additional version checks in NodeStats for older versions

When using an external cluster (backwards compatibility tests), the act
of checking the request breaker requires a network buffer, which
increments the breaker. This change only checks the request breaker in
InternalTestCluster and uses Guice to retrieve it instead of
a (possible) network request.

Also removed the now unused InternalCircuitBreakerService class
This commit is contained in:
Lee Hinman 2014-07-28 14:11:00 +02:00
parent 4e5ad568bb
commit a93ee599d3
10 changed files with 99 additions and 153 deletions

View File

@ -19,9 +19,11 @@
package org.elasticsearch.action.admin.cluster.node.stats;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.nodes.NodeOperationResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
@ -29,6 +31,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.indices.NodeIndicesStats;
import org.elasticsearch.indices.breaker.AllCircuitBreakerStats;
import org.elasticsearch.indices.breaker.CircuitBreakerStats;
import org.elasticsearch.monitor.fs.FsStats;
import org.elasticsearch.monitor.jvm.JvmStats;
import org.elasticsearch.monitor.network.NetworkStats;
@ -215,7 +218,16 @@ public class NodeStats extends NodeOperationResponse implements ToXContent {
if (in.readBoolean()) {
http = HttpStats.readHttpStats(in);
}
breaker = AllCircuitBreakerStats.readOptionalAllCircuitBreakerStats(in);
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
breaker = AllCircuitBreakerStats.readOptionalAllCircuitBreakerStats(in);
} else {
// If 1.3.0 or earlier, only a single CircuitBreakerStats can be read
CircuitBreakerStats fdStats = CircuitBreakerStats.readOptionalCircuitBreakerStats(in);
CircuitBreakerStats reqStats = new CircuitBreakerStats(CircuitBreaker.Name.REQUEST, 0, 0, 1.0, -1);
CircuitBreakerStats parentStats = new CircuitBreakerStats(CircuitBreaker.Name.PARENT, 0, 0, 1.0, -1);
breaker = new AllCircuitBreakerStats(new CircuitBreakerStats[] {parentStats, fdStats, reqStats});
}
}
@Override
@ -276,7 +288,12 @@ public class NodeStats extends NodeOperationResponse implements ToXContent {
out.writeBoolean(true);
http.writeTo(out);
}
out.writeOptionalStreamable(breaker);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeOptionalStreamable(breaker);
} else {
// Writing to a 1.3.0 or earlier stream expects only a single breaker stats
out.writeOptionalStreamable(breaker == null ? null : breaker.getStats(CircuitBreaker.Name.FIELDDATA));
}
}
@Override

View File

@ -30,7 +30,6 @@ import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import org.elasticsearch.indices.breaker.InternalCircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.ttl.IndicesTTLService;
@ -82,8 +81,6 @@ public class ClusterDynamicSettingsModule extends AbstractModule {
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED);
clusterDynamicSettings.addDynamicSetting(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, Validator.TIME);
clusterDynamicSettings.addDynamicSetting(SnapshotInProgressAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED);
clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, Validator.MEMORY_SIZE);
clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
clusterDynamicSettings.addDynamicSetting(DestructiveOperations.REQUIRES_NAME);
clusterDynamicSettings.addDynamicSetting(DiscoverySettings.PUBLISH_TIMEOUT, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE);

View File

@ -42,7 +42,6 @@ import java.util.concurrent.atomic.AtomicLong;
/** Utility class to work with arrays. */
public class BigArrays extends AbstractComponent {
public static final String MAX_SIZE_IN_BYTES_SETTING = "requests.memory.breaker.limit";
public static final BigArrays NON_RECYCLING_INSTANCE = new BigArrays(ImmutableSettings.EMPTY, null, null);
/** Page size in bytes: 16KB */

View File

@ -28,7 +28,7 @@ import org.elasticsearch.common.settings.Settings;
* Interface for Circuit Breaker services, which provide breakers to classes
* that load field data.
*/
public abstract class CircuitBreakerService extends AbstractLifecycleComponent<InternalCircuitBreakerService> {
public abstract class CircuitBreakerService extends AbstractLifecycleComponent<CircuitBreakerService> {
protected CircuitBreakerService(Settings settings) {
super(settings);

View File

@ -44,6 +44,10 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
private volatile ImmutableMap<CircuitBreaker.Name, CircuitBreaker> breakers;
// Old pre-1.4.0 backwards compatible settings
public static final String OLD_CIRCUIT_BREAKER_MAX_BYTES_SETTING = "indices.fielddata.breaker.limit";
public static final String OLD_CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.fielddata.breaker.overhead";
public static final String TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING = "indices.breaker.total.limit";
public static final String DEFAULT_TOTAL_CIRCUIT_BREAKER_LIMIT = "70%";
@ -60,6 +64,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
private volatile BreakerSettings fielddataSettings;
private volatile BreakerSettings requestSettings;
// Tripped count for when redistribution was attempted but wasn't successful
private final AtomicLong parentTripCount = new AtomicLong(0);
@ -68,19 +73,19 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
super(settings);
// This uses the old InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING
// setting to keep backwards compatibility with 1.2, it can be safely
// removed when compatibility with 1.2 is no longer needed
// setting to keep backwards compatibility with 1.3, it can be safely
// removed when compatibility with 1.3 is no longer needed
String compatibilityFielddataLimitDefault = DEFAULT_FIELDDATA_BREAKER_LIMIT;
ByteSizeValue compatibilityFielddataLimit = settings.getAsMemory(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, null);
ByteSizeValue compatibilityFielddataLimit = settings.getAsMemory(OLD_CIRCUIT_BREAKER_MAX_BYTES_SETTING, null);
if (compatibilityFielddataLimit != null) {
compatibilityFielddataLimitDefault = compatibilityFielddataLimit.toString();
}
// This uses the old InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING
// setting to keep backwards compatibility with 1.2, it can be safely
// removed when compatibility with 1.2 is no longer needed
// setting to keep backwards compatibility with 1.3, it can be safely
// removed when compatibility with 1.3 is no longer needed
double compatibilityFielddataOverheadDefault = DEFAULT_FIELDDATA_OVERHEAD_CONSTANT;
Double compatibilityFielddataOverhead = settings.getAsDouble(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, null);
Double compatibilityFielddataOverhead = settings.getAsDouble(OLD_CIRCUIT_BREAKER_OVERHEAD_SETTING, null);
if (compatibilityFielddataOverhead != null) {
compatibilityFielddataOverheadDefault = compatibilityFielddataOverhead;
}

View File

@ -1,120 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.breaker;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.node.settings.NodeSettingsService;
/**
* The InternalCircuitBreakerService handles providing
* {@link org.elasticsearch.common.breaker.MemoryCircuitBreaker}s
* that can be used to keep track of memory usage across the node, preventing
* actions that could cause an {@link OutOfMemoryError} on the node.
*/
public class InternalCircuitBreakerService extends CircuitBreakerService {
public static final String CIRCUIT_BREAKER_MAX_BYTES_SETTING = "indices.fielddata.breaker.limit";
public static final String CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.fielddata.breaker.overhead";
public static final double DEFAULT_OVERHEAD_CONSTANT = 1.03;
private static final String DEFAULT_BREAKER_LIMIT = "60%";
private volatile MemoryCircuitBreaker breaker;
private volatile long maxBytes;
private volatile double overhead;
@Inject
public InternalCircuitBreakerService(Settings settings, NodeSettingsService nodeSettingsService) {
super(settings);
this.maxBytes = settings.getAsMemory(CIRCUIT_BREAKER_MAX_BYTES_SETTING, DEFAULT_BREAKER_LIMIT).bytes();
this.overhead = settings.getAsDouble(CIRCUIT_BREAKER_OVERHEAD_SETTING, DEFAULT_OVERHEAD_CONSTANT);
this.breaker = new MemoryCircuitBreaker(new ByteSizeValue(maxBytes), overhead, null, logger);
nodeSettingsService.addListener(new ApplySettings());
}
class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
// clear breaker now that settings have changed
long newMaxByteSizeValue = settings.getAsMemory(CIRCUIT_BREAKER_MAX_BYTES_SETTING, Long.toString(maxBytes)).bytes();
boolean breakerResetNeeded = false;
if (newMaxByteSizeValue != maxBytes) {
logger.info("updating [{}] from [{}]({}) to [{}]({})", CIRCUIT_BREAKER_MAX_BYTES_SETTING,
InternalCircuitBreakerService.this.maxBytes, new ByteSizeValue(InternalCircuitBreakerService.this.maxBytes),
newMaxByteSizeValue, new ByteSizeValue(newMaxByteSizeValue));
maxBytes = newMaxByteSizeValue;
breakerResetNeeded = true;
}
double newOverhead = settings.getAsDouble(CIRCUIT_BREAKER_OVERHEAD_SETTING, overhead);
if (newOverhead != overhead) {
logger.info("updating [{}] from [{}] to [{}]", CIRCUIT_BREAKER_OVERHEAD_SETTING,
overhead, newOverhead);
overhead = newOverhead;
breakerResetNeeded = true;
}
if (breakerResetNeeded) {
resetBreaker();
}
}
}
/**
* @return a {@link org.elasticsearch.common.breaker.MemoryCircuitBreaker} that can be used for aggregating memory usage
*/
public MemoryCircuitBreaker getBreaker(CircuitBreaker.Name name) {
// Return the only breaker, regardless of name
return this.breaker;
}
/**
* Reset the breaker, creating a new one and initializing its used value
* to the actual field data usage, or the existing estimated usage if the
* actual value is not available. Will not trip the breaker even if the
* used value is higher than the limit for the breaker.
*/
public synchronized void resetBreaker() {
final MemoryCircuitBreaker oldBreaker = this.breaker;
// discard old breaker by creating a new one and pre-populating from the current breaker
this.breaker = new MemoryCircuitBreaker(new ByteSizeValue(maxBytes), overhead, oldBreaker, logger);
}
@Override
public AllCircuitBreakerStats stats() {
return new AllCircuitBreakerStats(new CircuitBreakerStats[]{
new CircuitBreakerStats(CircuitBreaker.Name.FIELDDATA, breaker.getLimit(), breaker.getUsed(),
breaker.getOverhead(), breaker.getTrippedCount())
});
}
@Override
public CircuitBreakerStats stats(CircuitBreaker.Name name) {
// There is only a single breaker, so always return it
return new CircuitBreakerStats(CircuitBreaker.Name.FIELDDATA, breaker.getLimit(), breaker.getUsed(),
breaker.getOverhead(), breaker.getTrippedCount());
}
}

View File

@ -23,10 +23,13 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
@ -37,6 +40,9 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Random;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
/**
* A test cluster implementation that holds a fixed set of external nodes as well as a InternalTestCluster
* which is used to run mixed version clusters in tests like backwards compatibility tests.
@ -220,6 +226,21 @@ public class CompositeTestCluster extends TestCluster {
}
}
@Override
public void ensureEstimatedStats() {
if (size() > 0) {
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats()
.clear().setBreaker(true).execute().actionGet();
for (NodeStats stats : nodeStats.getNodes()) {
assertThat("Fielddata breaker not reset to 0 on node: " + stats.getNode(),
stats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L));
}
// CompositeTestCluster does not check the request breaker,
// because checking it requires a network request, which in
// turn increments the breaker, making it non-0
}
}
@Override
public boolean hasFilterCache() {
return true;

View File

@ -22,8 +22,11 @@ package org.elasticsearch.test;
import com.google.common.collect.Lists;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -34,6 +37,9 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
/**
* External cluster to run the tests against.
* It is a pure immutable test cluster that allows to send requests to a pre-existing cluster
@ -110,6 +116,21 @@ public final class ExternalTestCluster extends TestCluster {
client.close();
}
@Override
public void ensureEstimatedStats() {
if (size() > 0) {
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats()
.clear().setBreaker(true).execute().actionGet();
for (NodeStats stats : nodeStats.getNodes()) {
assertThat("Fielddata breaker not reset to 0 on node: " + stats.getNode(),
stats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L));
// ExternalTestCluster does not check the request breaker,
// because checking it requires a network request, which in
// turn increments the breaker, making it non-0
}
}
}
@Override
public Iterator<Client> iterator() {
return Lists.newArrayList(client).iterator();

View File

@ -45,6 +45,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
@ -66,6 +67,7 @@ import org.elasticsearch.index.cache.filter.none.NoneFilterCache;
import org.elasticsearch.index.cache.filter.weighted.WeightedFilterCache;
import org.elasticsearch.index.engine.IndexEngineModule;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.plugins.PluginsService;
@ -100,6 +102,8 @@ import static org.apache.lucene.util.LuceneTestCase.usually;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
/**
* InternalTestCluster manages a set of JVM private nodes and allows convenient access to them.
@ -1503,4 +1507,21 @@ public final class InternalTestCluster extends TestCluster {
return defaultSettings;
}
@Override
public void ensureEstimatedStats() {
if (size() > 0) {
// Checks that the breakers have been reset without incurring a
// network request, because a network request can increment one
// of the breakers
for (NodeAndClient nodeAndClient : nodes.values()) {
String name = nodeAndClient.name;
CircuitBreakerService breakerService = getInstanceFromNode(CircuitBreakerService.class, nodeAndClient.node);
CircuitBreaker fdBreaker = breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA);
CircuitBreaker reqBreaker = breakerService.getBreaker(CircuitBreaker.Name.REQUEST);
assertThat("Fielddata breaker not reset to 0 on node: " + name, fdBreaker.getUsed(), equalTo(0L));
assertThat("Request breaker not reset to 0 on node: " + name, reqBreaker.getUsed(), equalTo(0L));
}
}
}
}

View File

@ -21,12 +21,9 @@ package org.elasticsearch.test;
import com.carrotsearch.hppc.ObjectArrayList;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.indices.IndexMissingException;
@ -39,8 +36,6 @@ import java.net.InetSocketAddress;
import java.util.Random;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
/**
* Base test cluster that exposes the basis to run tests against any elasticsearch cluster, whose layout
@ -186,22 +181,12 @@ public abstract class TestCluster implements Iterable<Client>, Closeable {
}
/**
* Ensures that the breaker statistics are reset to 0 since we wiped all indices and that
* means all stats should be set to 0 otherwise something is wrong with the field data
* calculation.
* Ensures that any breaker statistics are reset to 0.
*
* The implementation is specific to the test cluster, because the act of
* checking some breaker stats can increase them.
*/
public void ensureEstimatedStats() {
if (size() > 0) {
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats()
.clear().setBreaker(true).execute().actionGet();
for (NodeStats stats : nodeStats.getNodes()) {
assertThat("Fielddata breaker not reset to 0 on node: " + stats.getNode(),
stats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L));
assertThat("Request breaker not reset to 0 on node: " + stats.getNode(),
stats.getBreaker().getStats(CircuitBreaker.Name.REQUEST).getEstimated(), equalTo(0L));
}
}
}
public abstract void ensureEstimatedStats();
/**
* Return whether or not this cluster can cache filters.