Allow to configure indices.fielddata.breaker.limit with a ratio of the heap size.
Close #4616
This commit is contained in:
parent
8f96930638
commit
728a5647c3
|
@ -27,10 +27,10 @@ import org.elasticsearch.cluster.routing.allocation.decider.*;
|
||||||
import org.elasticsearch.common.inject.AbstractModule;
|
import org.elasticsearch.common.inject.AbstractModule;
|
||||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||||
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
|
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
|
||||||
|
import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService;
|
||||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||||
import org.elasticsearch.indices.store.IndicesStore;
|
import org.elasticsearch.indices.store.IndicesStore;
|
||||||
import org.elasticsearch.indices.ttl.IndicesTTLService;
|
import org.elasticsearch.indices.ttl.IndicesTTLService;
|
||||||
import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService;
|
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -78,7 +78,7 @@ public class ClusterDynamicSettingsModule extends AbstractModule {
|
||||||
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED);
|
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED);
|
||||||
clusterDynamicSettings.addDynamicSetting(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, Validator.TIME);
|
clusterDynamicSettings.addDynamicSetting(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, Validator.TIME);
|
||||||
clusterDynamicSettings.addDynamicSetting(SnapshotInProgressAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED);
|
clusterDynamicSettings.addDynamicSetting(SnapshotInProgressAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED);
|
||||||
clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, Validator.BYTES_SIZE);
|
clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, Validator.MEMORY_SIZE);
|
||||||
clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
|
clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
|
||||||
clusterDynamicSettings.addDynamicSetting(DestructiveOperations.REQUIRES_NAME);
|
clusterDynamicSettings.addDynamicSetting(DestructiveOperations.REQUIRES_NAME);
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,8 @@ import org.elasticsearch.common.Booleans;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
|
||||||
import static org.elasticsearch.common.unit.ByteSizeValue.parseBytesSizeValue;
|
import static org.elasticsearch.common.unit.ByteSizeValue.parseBytesSizeValue;
|
||||||
|
import static org.elasticsearch.common.unit.MemorySizeValue.parseBytesSizeValueOrHeapRatio;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Validates a setting, returning a failure message if applicable.
|
* Validates a setting, returning a failure message if applicable.
|
||||||
|
@ -184,7 +186,19 @@ public interface Validator {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
public static final Validator MEMORY_SIZE = new Validator() {
|
||||||
|
@Override
|
||||||
|
public String validate(String setting, String value) {
|
||||||
|
try {
|
||||||
|
parseBytesSizeValueOrHeapRatio(value);
|
||||||
|
} catch (ElasticsearchParseException ex) {
|
||||||
|
return ex.getMessage();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
public static final Validator BOOLEAN = new Validator() {
|
public static final Validator BOOLEAN = new Validator() {
|
||||||
@Override
|
@Override
|
||||||
public String validate(String setting, String value) {
|
public String validate(String setting, String value) {
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.common.unit;
|
package org.elasticsearch.common.unit;
|
||||||
|
|
||||||
|
import org.elasticsearch.ElasticsearchParseException;
|
||||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||||
|
|
||||||
import static org.elasticsearch.common.unit.ByteSizeValue.parseBytesSizeValue;
|
import static org.elasticsearch.common.unit.ByteSizeValue.parseBytesSizeValue;
|
||||||
|
@ -32,8 +33,16 @@ public enum MemorySizeValue {
|
||||||
* the heap is 1G, <tt>10%</tt> will be parsed as <tt>100mb</tt>. */
|
* the heap is 1G, <tt>10%</tt> will be parsed as <tt>100mb</tt>. */
|
||||||
public static ByteSizeValue parseBytesSizeValueOrHeapRatio(String sValue) {
|
public static ByteSizeValue parseBytesSizeValueOrHeapRatio(String sValue) {
|
||||||
if (sValue.endsWith("%")) {
|
if (sValue.endsWith("%")) {
|
||||||
double percent = Double.parseDouble(sValue.substring(0, sValue.length() - 1));
|
final String percentAsString = sValue.substring(0, sValue.length() - 1);
|
||||||
return new ByteSizeValue((long) ((percent / 100) * JvmInfo.jvmInfo().getMem().getHeapMax().bytes()), ByteSizeUnit.BYTES);
|
try {
|
||||||
|
final double percent = Double.parseDouble(percentAsString);
|
||||||
|
if (percent < 0 || percent > 100) {
|
||||||
|
throw new ElasticsearchParseException("Percentage should be in [0-100], got " + percentAsString);
|
||||||
|
}
|
||||||
|
return new ByteSizeValue((long) ((percent / 100) * JvmInfo.jvmInfo().getMem().getHeapMax().bytes()), ByteSizeUnit.BYTES);
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
throw new ElasticsearchParseException("Failed to parse [" + percentAsString + "] as a double", e);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
return parseBytesSizeValue(sValue);
|
return parseBytesSizeValue(sValue);
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,6 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
|
||||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -39,9 +38,7 @@ public class InternalCircuitBreakerService extends AbstractLifecycleComponent<In
|
||||||
public static final String CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.fielddata.breaker.overhead";
|
public static final String CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.fielddata.breaker.overhead";
|
||||||
|
|
||||||
public static final double DEFAULT_OVERHEAD_CONSTANT = 1.03;
|
public static final double DEFAULT_OVERHEAD_CONSTANT = 1.03;
|
||||||
|
private static final String DEFAULT_BREAKER_LIMIT = "80%";
|
||||||
private static final long JVM_HEAP_MAX_BYTES = JvmInfo.jvmInfo().getMem().getHeapMax().bytes();
|
|
||||||
private static final long DEFAULT_BREAKER_LIMIT = (long) (0.8 * JVM_HEAP_MAX_BYTES); // 80% of the max heap
|
|
||||||
|
|
||||||
private volatile MemoryCircuitBreaker breaker;
|
private volatile MemoryCircuitBreaker breaker;
|
||||||
private volatile long maxBytes;
|
private volatile long maxBytes;
|
||||||
|
@ -50,7 +47,7 @@ public class InternalCircuitBreakerService extends AbstractLifecycleComponent<In
|
||||||
@Inject
|
@Inject
|
||||||
public InternalCircuitBreakerService(Settings settings, NodeSettingsService nodeSettingsService) {
|
public InternalCircuitBreakerService(Settings settings, NodeSettingsService nodeSettingsService) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.maxBytes = settings.getAsBytesSize(CIRCUIT_BREAKER_MAX_BYTES_SETTING, new ByteSizeValue(DEFAULT_BREAKER_LIMIT)).bytes();
|
this.maxBytes = settings.getAsMemory(CIRCUIT_BREAKER_MAX_BYTES_SETTING, DEFAULT_BREAKER_LIMIT).bytes();
|
||||||
this.overhead = settings.getAsDouble(CIRCUIT_BREAKER_OVERHEAD_SETTING, DEFAULT_OVERHEAD_CONSTANT);
|
this.overhead = settings.getAsDouble(CIRCUIT_BREAKER_OVERHEAD_SETTING, DEFAULT_OVERHEAD_CONSTANT);
|
||||||
|
|
||||||
this.breaker = new MemoryCircuitBreaker(new ByteSizeValue(maxBytes), overhead, null, logger);
|
this.breaker = new MemoryCircuitBreaker(new ByteSizeValue(maxBytes), overhead, null, logger);
|
||||||
|
@ -62,13 +59,13 @@ public class InternalCircuitBreakerService extends AbstractLifecycleComponent<In
|
||||||
@Override
|
@Override
|
||||||
public void onRefreshSettings(Settings settings) {
|
public void onRefreshSettings(Settings settings) {
|
||||||
// clear breaker now that settings have changed
|
// clear breaker now that settings have changed
|
||||||
ByteSizeValue newMaxByteSizeValue = settings.getAsBytesSize(CIRCUIT_BREAKER_MAX_BYTES_SETTING, null);
|
long newMaxByteSizeValue = settings.getAsMemory(CIRCUIT_BREAKER_MAX_BYTES_SETTING, DEFAULT_BREAKER_LIMIT).bytes();
|
||||||
boolean breakerResetNeeded = false;
|
boolean breakerResetNeeded = false;
|
||||||
|
|
||||||
if (newMaxByteSizeValue != null) {
|
if (newMaxByteSizeValue != maxBytes) {
|
||||||
logger.info("updating [{}] from [{}] to [{}]", CIRCUIT_BREAKER_MAX_BYTES_SETTING,
|
logger.info("updating [{}] from [{}] to [{}]", CIRCUIT_BREAKER_MAX_BYTES_SETTING,
|
||||||
new ByteSizeValue(InternalCircuitBreakerService.this.maxBytes), newMaxByteSizeValue);
|
new ByteSizeValue(InternalCircuitBreakerService.this.maxBytes), newMaxByteSizeValue);
|
||||||
InternalCircuitBreakerService.this.maxBytes = newMaxByteSizeValue.bytes();
|
maxBytes = newMaxByteSizeValue;
|
||||||
breakerResetNeeded = true;
|
breakerResetNeeded = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -76,7 +73,7 @@ public class InternalCircuitBreakerService extends AbstractLifecycleComponent<In
|
||||||
if (newOverhead != overhead) {
|
if (newOverhead != overhead) {
|
||||||
logger.info("updating [{}] from [{}] to [{}]", CIRCUIT_BREAKER_OVERHEAD_SETTING,
|
logger.info("updating [{}] from [{}] to [{}]", CIRCUIT_BREAKER_OVERHEAD_SETTING,
|
||||||
overhead, newOverhead);
|
overhead, newOverhead);
|
||||||
InternalCircuitBreakerService.this.overhead = newOverhead;
|
overhead = newOverhead;
|
||||||
breakerResetNeeded = true;
|
breakerResetNeeded = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,10 +23,13 @@ import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.common.collect.MapBuilder;
|
import org.elasticsearch.common.collect.MapBuilder;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
|
|
||||||
|
@ -36,6 +39,11 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
|
||||||
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST)
|
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST)
|
||||||
public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest {
|
public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest {
|
||||||
|
|
||||||
|
private String randomRidiculouslySmallLimit() {
|
||||||
|
// 3 different ways to say 100 bytes
|
||||||
|
return randomFrom(Arrays.asList("100b", "100", (10000. / JvmInfo.jvmInfo().getMem().getHeapMax().bytes()) + "%"));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@TestLogging("org.elasticsearch.indices.fielddata.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE")
|
@TestLogging("org.elasticsearch.indices.fielddata.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE")
|
||||||
public void testMemoryBreaker() {
|
public void testMemoryBreaker() {
|
||||||
|
@ -63,7 +71,7 @@ public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest {
|
||||||
|
|
||||||
// Update circuit breaker settings
|
// Update circuit breaker settings
|
||||||
Settings settings = settingsBuilder()
|
Settings settings = settingsBuilder()
|
||||||
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "100b")
|
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, randomRidiculouslySmallLimit())
|
||||||
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05)
|
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05)
|
||||||
.build();
|
.build();
|
||||||
client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet();
|
client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet();
|
||||||
|
@ -120,7 +128,7 @@ public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest {
|
||||||
|
|
||||||
// Update circuit breaker settings
|
// Update circuit breaker settings
|
||||||
Settings settings = settingsBuilder()
|
Settings settings = settingsBuilder()
|
||||||
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "100b")
|
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, randomRidiculouslySmallLimit())
|
||||||
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05)
|
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05)
|
||||||
.build();
|
.build();
|
||||||
client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet();
|
client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet();
|
||||||
|
|
Loading…
Reference in New Issue