apply review comments from @nik9000
This commit is contained in:
parent
5d7f4ef394
commit
6c7e5069d4
|
@ -74,18 +74,12 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
|
||||||
*/
|
*/
|
||||||
public class BalancedShardsAllocator extends AbstractComponent implements ShardsAllocator {
|
public class BalancedShardsAllocator extends AbstractComponent implements ShardsAllocator {
|
||||||
|
|
||||||
|
public static final Setting<Float> INDEX_BALANCE_FACTOR_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.index", 0.55f, true, Setting.Scope.Cluster);
|
||||||
|
public static final Setting<Float> SHARD_BALANCE_FACTOR_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.shard", 0.45f, true, Setting.Scope.Cluster);
|
||||||
|
public static final Setting<Float> THRESHOLD_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.threshold", 1.0f, 0.0f, true, Setting.Scope.Cluster);
|
||||||
|
|
||||||
public static final float DEFAULT_INDEX_BALANCE_FACTOR = 0.55f;
|
private volatile WeightFunction weightFunction;
|
||||||
public static final float DEFAULT_SHARD_BALANCE_FACTOR = 0.45f;
|
private volatile float threshold;
|
||||||
public static final float DEFAULT_THRESHOLD = 1.0f;
|
|
||||||
|
|
||||||
public static final Setting<Float> INDEX_BALANCE_FACTOR_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.index", DEFAULT_INDEX_BALANCE_FACTOR, true, Setting.Scope.Cluster);
|
|
||||||
public static final Setting<Float> SHARD_BALANCE_FACTOR_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.shard", DEFAULT_SHARD_BALANCE_FACTOR, true, Setting.Scope.Cluster);
|
|
||||||
public static final Setting<Float> THRESHOLD_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.threshold", DEFAULT_THRESHOLD, true, Setting.Scope.Cluster);
|
|
||||||
|
|
||||||
private volatile WeightFunction weightFunction = new WeightFunction(DEFAULT_INDEX_BALANCE_FACTOR, DEFAULT_SHARD_BALANCE_FACTOR);
|
|
||||||
|
|
||||||
private volatile float threshold = DEFAULT_THRESHOLD;
|
|
||||||
|
|
||||||
public BalancedShardsAllocator(Settings settings) {
|
public BalancedShardsAllocator(Settings settings) {
|
||||||
this(settings, new ClusterSettingsService(settings, new ClusterSettings(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
|
this(settings, new ClusterSettingsService(settings, new ClusterSettings(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
|
||||||
|
@ -94,26 +88,18 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
@Inject
|
@Inject
|
||||||
public BalancedShardsAllocator(Settings settings, ClusterSettingsService clusterSettingsService) {
|
public BalancedShardsAllocator(Settings settings, ClusterSettingsService clusterSettingsService) {
|
||||||
super(settings);
|
super(settings);
|
||||||
setIndexBalance(INDEX_BALANCE_FACTOR_SETTING.get(settings));
|
weightFunction = new WeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings));
|
||||||
setShardBalance(SHARD_BALANCE_FACTOR_SETTING.get(settings));
|
|
||||||
setThreshold(THRESHOLD_SETTING.get(settings));
|
setThreshold(THRESHOLD_SETTING.get(settings));
|
||||||
clusterSettingsService.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, this::setIndexBalance);
|
clusterSettingsService.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction);
|
||||||
clusterSettingsService.addSettingsUpdateConsumer(SHARD_BALANCE_FACTOR_SETTING, this::setShardBalance);
|
|
||||||
clusterSettingsService.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
|
clusterSettingsService.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setIndexBalance(float indexBalance) {
|
public void setWeightFunction(float indexBalance, float shardBalanceFactor) {
|
||||||
weightFunction = new WeightFunction(indexBalance, weightFunction.shardBalance);
|
weightFunction = new WeightFunction(indexBalance, weightFunction.shardBalance);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setShardBalance(float shardBalanceFactor) {
|
|
||||||
weightFunction = new WeightFunction(weightFunction.indexBalance, shardBalanceFactor);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setThreshold(float threshold) {
|
public void setThreshold(float threshold) {
|
||||||
if (threshold <= 0.0f) {
|
|
||||||
throw new IllegalArgumentException("threshold must be greater than 0.0f but was: " + threshold);
|
|
||||||
}
|
|
||||||
this.threshold = threshold;
|
this.threshold = threshold;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -84,7 +84,7 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
|
|
||||||
public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING = Setting.boolSetting("cluster.routing.allocation.disk.threshold_enabled", true, true, Setting.Scope.Cluster);
|
public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING = Setting.boolSetting("cluster.routing.allocation.disk.threshold_enabled", true, true, Setting.Scope.Cluster);
|
||||||
public static final Setting<String> CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING = new Setting<>("cluster.routing.allocation.disk.watermark.low", "_na_", "85%", (s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.low"), true, Setting.Scope.Cluster);
|
public static final Setting<String> CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING = new Setting<>("cluster.routing.allocation.disk.watermark.low", "_na_", "85%", (s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.low"), true, Setting.Scope.Cluster);
|
||||||
public static final Setting<String> CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING = new Setting<>("cluster.routing.allocation.disk.watermark.high", "_na_", "90%", (s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.low"), true, Setting.Scope.Cluster);
|
public static final Setting<String> CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING = new Setting<>("cluster.routing.allocation.disk.watermark.high", "_na_", "90%", (s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.high"), true, Setting.Scope.Cluster);
|
||||||
public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING = Setting.boolSetting("cluster.routing.allocation.disk.include_relocations", true, true, Setting.Scope.Cluster);;
|
public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING = Setting.boolSetting("cluster.routing.allocation.disk.include_relocations", true, true, Setting.Scope.Cluster);;
|
||||||
public static final Setting<TimeValue> CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING = Setting.positiveTimeSetting("cluster.routing.allocation.disk.reroute_interval", TimeValue.timeValueSeconds(60), true, Setting.Scope.Cluster);
|
public static final Setting<TimeValue> CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING = Setting.positiveTimeSetting("cluster.routing.allocation.disk.reroute_interval", TimeValue.timeValueSeconds(60), true, Setting.Scope.Cluster);
|
||||||
|
|
||||||
|
|
|
@ -241,7 +241,6 @@ public class Setting<T> extends ToXContentToBytes {
|
||||||
if (accept.test(inst) == false) {
|
if (accept.test(inst) == false) {
|
||||||
throw new IllegalArgumentException("illegal value can't update [" + key + "] from [" + value + "] to [" + getRaw(settings) + "]");
|
throw new IllegalArgumentException("illegal value can't update [" + key + "] from [" + value + "] to [" + getRaw(settings) + "]");
|
||||||
}
|
}
|
||||||
logger.info("update [{}] from [{}] to [{}]", key, value, getRaw(settings));
|
|
||||||
pendingValue = newValue;
|
pendingValue = newValue;
|
||||||
valueInstance = inst;
|
valueInstance = inst;
|
||||||
commitPending = true;
|
commitPending = true;
|
||||||
|
@ -254,6 +253,7 @@ public class Setting<T> extends ToXContentToBytes {
|
||||||
|
|
||||||
public void apply() {
|
public void apply() {
|
||||||
if (commitPending) {
|
if (commitPending) {
|
||||||
|
logger.info("update [{}] from [{}] to [{}]", key, value, pendingValue);
|
||||||
value = pendingValue;
|
value = pendingValue;
|
||||||
consumer.accept(valueInstance);
|
consumer.accept(valueInstance);
|
||||||
}
|
}
|
||||||
|
@ -283,6 +283,16 @@ public class Setting<T> extends ToXContentToBytes {
|
||||||
return new Setting<>(key, "_na_", (s) -> Float.toString(defaultValue), Float::parseFloat, dynamic, scope);
|
return new Setting<>(key, "_na_", (s) -> Float.toString(defaultValue), Float::parseFloat, dynamic, scope);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Setting<Float> floatSetting(String key, float defaultValue, float minValue, boolean dynamic, Scope scope) {
|
||||||
|
return new Setting<>(key, "_na_", (s) -> Float.toString(defaultValue), (s) -> {
|
||||||
|
float value = Float.parseFloat(s);
|
||||||
|
if (value < minValue) {
|
||||||
|
throw new ElasticsearchParseException("Failed to parse value [" + s + "] for setting [" + key + "] must be >= " + minValue);
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}, dynamic, scope);
|
||||||
|
}
|
||||||
|
|
||||||
public static Setting<Integer> intSetting(String key, int defaultValue, boolean dynamic, Scope scope) {
|
public static Setting<Integer> intSetting(String key, int defaultValue, boolean dynamic, Scope scope) {
|
||||||
return new Setting<>(key, "_na_", (s) -> Integer.toString(defaultValue), Integer::parseInt, dynamic, scope);
|
return new Setting<>(key, "_na_", (s) -> Integer.toString(defaultValue), Integer::parseInt, dynamic, scope);
|
||||||
}
|
}
|
||||||
|
@ -304,7 +314,9 @@ public class Setting<T> extends ToXContentToBytes {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Setting<Settings> groupSetting(String key, boolean dynamic, Scope scope) {
|
public static Setting<Settings> groupSetting(String key, boolean dynamic, Scope scope) {
|
||||||
String prefix = key.endsWith(".") ? key : key + ".";
|
if (key.endsWith(".") == false) {
|
||||||
|
throw new IllegalArgumentException("key must end with a '.'");
|
||||||
|
}
|
||||||
return new Setting<Settings>(key, "_na_", "", (s) -> null, dynamic, scope) {
|
return new Setting<Settings>(key, "_na_", "", (s) -> null, dynamic, scope) {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -314,12 +326,12 @@ public class Setting<T> extends ToXContentToBytes {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Settings get(Settings settings) {
|
public Settings get(Settings settings) {
|
||||||
return settings.getByPrefix(prefix);
|
return settings.getByPrefix(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean match(String toTest) {
|
public boolean match(String toTest) {
|
||||||
return Regex.simpleMatch(prefix + "*", toTest);
|
return Regex.simpleMatch(key + "*", toTest);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -387,8 +399,14 @@ public class Setting<T> extends ToXContentToBytes {
|
||||||
return new Setting<>(key, "_na_", (s) -> defaultValue.toString(), (s) -> TimeValue.parseTimeValue(s, defaultValue, key), dynamic, scope);
|
return new Setting<>(key, "_na_", (s) -> defaultValue.toString(), (s) -> TimeValue.parseTimeValue(s, defaultValue, key), dynamic, scope);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Setting<Double> nonNegativeDouble(String key, double defaultValue, boolean dynamic, Scope scope) {
|
public static Setting<Double> doubleSetting(String key, double defaultValue, double minValue, boolean dynamic, Scope scope) {
|
||||||
return new Setting<>(key, "_na_", (s) -> Double.toString(defaultValue), Double::parseDouble, dynamic, scope);
|
return new Setting<>(key, "_na_", (s) -> Double.toString(defaultValue), (s) -> {
|
||||||
|
final double d = Double.parseDouble(s);
|
||||||
|
if (d < minValue) {
|
||||||
|
throw new ElasticsearchParseException("Failed to parse value [" + s + "] for setting [" + key + "] must be >= " + minValue);
|
||||||
|
}
|
||||||
|
return d;
|
||||||
|
}, dynamic, scope);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,7 +66,7 @@ public abstract class SettingsService extends AbstractComponent {
|
||||||
try {
|
try {
|
||||||
settingUpdater.rollback();
|
settingUpdater.rollback();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn("failed to rollback settings for [{}]", e, settingUpdater);
|
logger.error("failed to rollback settings for [{}]", e, settingUpdater);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -77,15 +77,15 @@ public abstract class SettingsService extends AbstractComponent {
|
||||||
* Applies the given settings to all the settings consumers or to none of them. The settings
|
* Applies the given settings to all the settings consumers or to none of them. The settings
|
||||||
* will be merged with the node settings before they are applied while given settings override existing node
|
* will be merged with the node settings before they are applied while given settings override existing node
|
||||||
* settings.
|
* settings.
|
||||||
* @param settings the settings to apply
|
* @param newSettings the settings to apply
|
||||||
* @return the unmerged applied settings
|
* @return the unmerged applied settings
|
||||||
*/
|
*/
|
||||||
public synchronized Settings applySettings(Settings settings) {
|
public synchronized Settings applySettings(Settings newSettings) {
|
||||||
if (lastSettingsApplied != null && settings.equals(lastSettingsApplied)) {
|
if (lastSettingsApplied != null && newSettings.equals(lastSettingsApplied)) {
|
||||||
// nothing changed in the settings, ignore
|
// nothing changed in the settings, ignore
|
||||||
return settings;
|
return newSettings;
|
||||||
}
|
}
|
||||||
final Settings build = Settings.builder().put(this.settings).put(settings).build();
|
final Settings build = Settings.builder().put(this.settings).put(newSettings).build();
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
for (SettingUpdater settingUpdater : settingUpdaters) {
|
for (SettingUpdater settingUpdater : settingUpdaters) {
|
||||||
|
@ -109,14 +109,14 @@ public abstract class SettingsService extends AbstractComponent {
|
||||||
try {
|
try {
|
||||||
settingUpdater.rollback();
|
settingUpdater.rollback();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn("failed to refresh settings for [{}]", e, settingUpdater);
|
logger.error("failed to refresh settings for [{}]", e, settingUpdater);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
for (Map.Entry<String, String> entry : settings.getAsMap().entrySet()) {
|
for (Map.Entry<String, String> entry : newSettings.getAsMap().entrySet()) {
|
||||||
if (entry.getKey().startsWith("logger.")) {
|
if (entry.getKey().startsWith("logger.")) {
|
||||||
String component = entry.getKey().substring("logger.".length());
|
String component = entry.getKey().substring("logger.".length());
|
||||||
if ("_root".equals(component)) {
|
if ("_root".equals(component)) {
|
||||||
|
@ -130,7 +130,7 @@ public abstract class SettingsService extends AbstractComponent {
|
||||||
logger.warn("failed to refresh settings for [{}]", e, "logger");
|
logger.warn("failed to refresh settings for [{}]", e, "logger");
|
||||||
}
|
}
|
||||||
|
|
||||||
return lastSettingsApplied = settings;
|
return lastSettingsApplied = newSettings;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -49,11 +49,11 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
|
||||||
public static final Setting<ByteSizeValue> TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING = Setting.byteSizeSetting("indices.breaker.total.limit", "70%", true, Setting.Scope.Cluster);
|
public static final Setting<ByteSizeValue> TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING = Setting.byteSizeSetting("indices.breaker.total.limit", "70%", true, Setting.Scope.Cluster);
|
||||||
|
|
||||||
public static final Setting<ByteSizeValue> FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING = Setting.byteSizeSetting("indices.breaker.fielddata.limit", "60%", true, Setting.Scope.Cluster);
|
public static final Setting<ByteSizeValue> FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING = Setting.byteSizeSetting("indices.breaker.fielddata.limit", "60%", true, Setting.Scope.Cluster);
|
||||||
public static final Setting<Double> FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING = Setting.nonNegativeDouble("indices.breaker.fielddata.overhead", 1.03d, true, Setting.Scope.Cluster);
|
public static final Setting<Double> FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING = Setting.doubleSetting("indices.breaker.fielddata.overhead", 1.03d, 0.0d, true, Setting.Scope.Cluster);
|
||||||
public static final String FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING = "indices.breaker.fielddata.type";
|
public static final String FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING = "indices.breaker.fielddata.type";
|
||||||
|
|
||||||
public static final Setting<ByteSizeValue> REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING = Setting.byteSizeSetting("indices.breaker.request.limit", "40%", true, Setting.Scope.Cluster);
|
public static final Setting<ByteSizeValue> REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING = Setting.byteSizeSetting("indices.breaker.request.limit", "40%", true, Setting.Scope.Cluster);
|
||||||
public static final Setting<Double> REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING = Setting.nonNegativeDouble("indices.breaker.request.overhead", 1.0d, true, Setting.Scope.Cluster);
|
public static final Setting<Double> REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING = Setting.doubleSetting("indices.breaker.request.overhead", 1.0d, 0.0d, true, Setting.Scope.Cluster);
|
||||||
public static final String REQUEST_CIRCUIT_BREAKER_TYPE_SETTING = "indices.breaker.request.type";
|
public static final String REQUEST_CIRCUIT_BREAKER_TYPE_SETTING = "indices.breaker.request.type";
|
||||||
|
|
||||||
public static final String DEFAULT_BREAKER_TYPE = "memory";
|
public static final String DEFAULT_BREAKER_TYPE = "memory";
|
||||||
|
|
|
@ -164,12 +164,12 @@ public class TimeValueTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testToStringRep() {
|
public void testToStringRep() {
|
||||||
assertThat("-1", equalTo(new TimeValue(-1).getStringRep()));
|
assertEquals("-1", new TimeValue(-1).getStringRep());
|
||||||
assertThat("10ms", equalTo(new TimeValue(10, TimeUnit.MILLISECONDS).getStringRep()));
|
assertEquals("10ms", new TimeValue(10, TimeUnit.MILLISECONDS).getStringRep());
|
||||||
assertThat("1533ms", equalTo(new TimeValue(1533, TimeUnit.MILLISECONDS).getStringRep()));
|
assertEquals("1533ms", new TimeValue(1533, TimeUnit.MILLISECONDS).getStringRep());
|
||||||
assertThat("90s", equalTo(new TimeValue(90, TimeUnit.SECONDS).getStringRep()));
|
assertEquals("90s", new TimeValue(90, TimeUnit.SECONDS).getStringRep());
|
||||||
assertThat("90m", equalTo(new TimeValue(90, TimeUnit.MINUTES).getStringRep()));
|
assertEquals("90m", new TimeValue(90, TimeUnit.MINUTES).getStringRep());
|
||||||
assertThat("36h", equalTo(new TimeValue(36, TimeUnit.HOURS).getStringRep()));
|
assertEquals("36h", new TimeValue(36, TimeUnit.HOURS).getStringRep());
|
||||||
assertThat("1000d", equalTo(new TimeValue(1000, TimeUnit.DAYS).getStringRep()));
|
assertEquals("1000d", new TimeValue(1000, TimeUnit.DAYS).getStringRep());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue