apply feedback from @bleskes

This commit is contained in:
Simon Willnauer 2015-12-15 16:20:58 +01:00
parent 8c7e142eb0
commit 4f445688dd
14 changed files with 56 additions and 40 deletions

View File

@ -79,7 +79,7 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
}
}
private ClusterRebalanceType type;
private volatile ClusterRebalanceType type;
@Inject
public ClusterRebalanceAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
@ -95,7 +95,7 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, this::setType);
}
public void setType(ClusterRebalanceType type) {
private void setType(ClusterRebalanceType type) {
this.type = type;
}

View File

@ -42,7 +42,7 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
public static final String NAME = "concurrent_rebalance";
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING = Setting.intSetting("cluster.routing.allocation.cluster_concurrent_rebalance", 2, true, Setting.Scope.CLUSTER);
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING = Setting.intSetting("cluster.routing.allocation.cluster_concurrent_rebalance", 2, 0, true, Setting.Scope.CLUSTER);
private volatile int clusterConcurrentRebalance;
@Inject
@ -53,7 +53,7 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING, this::setClusterConcurrentRebalance);
}
public void setClusterConcurrentRebalance(int concurrentRebalance) {
private void setClusterConcurrentRebalance(int concurrentRebalance) {
clusterConcurrentRebalance = concurrentRebalance;
}

View File

@ -550,7 +550,7 @@ public class DiskThresholdDecider extends AllocationDecider {
/**
* Checks if a watermark string is a valid percentage or byte size value,
* returning true if valid, false if invalid.
* @return the watermark value given
*/
public static String validWatermarkSetting(String watermark, String settingName) {
try {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing.allocation.decider;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@ -52,8 +53,8 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
public static final String NAME = "throttling";
public static final String CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES = "cluster.routing.allocation.concurrent_recoveries";
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING = Setting.intSetting("cluster.routing.allocation.node_initial_primaries_recoveries", DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, true, Setting.Scope.CLUSTER);
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING = new Setting<>("cluster.routing.allocation.node_concurrent_recoveries", (s) -> s.get(CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES,Integer.toString(DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES)), Integer::parseInt, true, Setting.Scope.CLUSTER);
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING = Setting.intSetting("cluster.routing.allocation.node_initial_primaries_recoveries", DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, 0, true, Setting.Scope.CLUSTER);
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING = new Setting<>("cluster.routing.allocation.node_concurrent_recoveries", (s) -> s.get(CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES,Integer.toString(DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES)), (s) -> Setting.parseInt(s, 0, "cluster.routing.allocation.node_concurrent_recoveries"), true, Setting.Scope.CLUSTER);
private volatile int primariesInitialRecoveries;
private volatile int concurrentRecoveries;

View File

@ -64,7 +64,7 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF
public class InternalClusterService extends AbstractLifecycleComponent<ClusterService> implements ClusterService {
public static final Setting<TimeValue> CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING = Setting.positiveTimeSetting("cluster.service.slow_task_logging_threshold", TimeValue.timeValueSeconds(30), true, Setting.Scope.CLUSTER);
public static final String SETTING_CLUSTER_SERVICE_RECONNECT_INTERVAL = "cluster.service.reconnect_interval";
public static final Setting<TimeValue> CLUSTER_SERVICE_RECONNECT_INTERVAL_SETTING = Setting.positiveTimeSetting("cluster.service.reconnect_interval", TimeValue.timeValueSeconds(10), false, Setting.Scope.CLUSTER);
public static final String UPDATE_THREAD_NAME = "clusterService#updateTask";
private final ThreadPool threadPool;
@ -123,7 +123,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, this::setSlowTaskLoggingThreshold);
this.reconnectInterval = this.settings.getAsTime(SETTING_CLUSTER_SERVICE_RECONNECT_INTERVAL, TimeValue.timeValueSeconds(10));
this.reconnectInterval = CLUSTER_SERVICE_RECONNECT_INTERVAL_SETTING.get(settings);
this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);

View File

@ -174,6 +174,7 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
/**
* Returns the instance value for the current settings. This method is stateless and idempotent.
* This method will throw an exception if the source of this value is invalid.
*/
T getValue(Settings current, Settings previous);

View File

@ -135,5 +135,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
TransportService.TRACE_LOG_EXCLUDE_SETTING,
TransportService.TRACE_LOG_INCLUDE_SETTING,
TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING,
ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING)));
ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING,
InternalClusterService.CLUSTER_SERVICE_RECONNECT_INTERVAL_SETTING,
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING,
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING)));
}

View File

@ -30,11 +30,9 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.*;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
/**
*/
@ -117,7 +115,7 @@ public class Setting<T> extends ToXContentToBytes {
try {
return parser.apply(value);
} catch (ElasticsearchParseException ex) {
throw ex;
throw new IllegalArgumentException(ex.getMessage(), ex);
} catch (Exception t) {
throw new IllegalArgumentException("Failed to parse value [" + value + "] for setting [" + getKey() + "]", t);
}
@ -146,6 +144,7 @@ public class Setting<T> extends ToXContentToBytes {
builder.field("key", key);
builder.field("type", scope.name());
builder.field("dynamic", dynamic);
builder.field("is_group_setting", isGroupSetting());
builder.field("default", defaultValue.apply(Settings.EMPTY));
builder.endObject();
return builder;
@ -163,9 +162,9 @@ public class Setting<T> extends ToXContentToBytes {
return newUpdater(consumer, logger, (s) -> {});
}
AbstractScopedSettings.SettingUpdater newUpdater(Consumer<T> consumer, ESLogger logger, Consumer<T> accept) {
AbstractScopedSettings.SettingUpdater newUpdater(Consumer<T> consumer, ESLogger logger, Consumer<T> validator) {
if (isDynamic()) {
return new Updater(consumer, logger, accept);
return new Updater(consumer, logger, validator);
} else {
throw new IllegalStateException("setting [" + getKey() + "] is not dynamic");
}
@ -222,6 +221,9 @@ public class Setting<T> extends ToXContentToBytes {
public boolean hasChanged(Settings current, Settings previous) {
final String newValue = getRaw(current);
final String value = getRaw(previous);
assert isGroupSetting() == false : "group settings must override this method";
assert value != null : "value was null but can't be unless default is null which is invalid";
return value.equals(newValue) == false;
}
@ -258,14 +260,26 @@ public class Setting<T> extends ToXContentToBytes {
return new Setting<>(key, (s) -> Float.toString(defaultValue), (s) -> {
float value = Float.parseFloat(s);
if (value < minValue) {
throw new ElasticsearchParseException("Failed to parse value [" + s + "] for setting [" + key + "] must be >= " + minValue);
throw new IllegalArgumentException("Failed to parse value [" + s + "] for setting [" + key + "] must be >= " + minValue);
}
return value;
}, dynamic, scope);
}
public static Setting<Integer> intSetting(String key, int defaultValue, int minValue, boolean dynamic, Scope scope) {
return new Setting<>(key, (s) -> Integer.toString(defaultValue), (s) -> parseInt(s, minValue, key), dynamic, scope);
}
public static int parseInt(String s, int minValue, String key) {
int value = Integer.parseInt(s);
if (value < minValue) {
throw new IllegalArgumentException("Failed to parse value [" + s + "] for setting [" + key + "] must be >= " + minValue);
}
return value;
}
public static Setting<Integer> intSetting(String key, int defaultValue, boolean dynamic, Scope scope) {
return new Setting<>(key, (s) -> Integer.toString(defaultValue), Integer::parseInt, dynamic, scope);
return intSetting(key, defaultValue, Integer.MIN_VALUE, dynamic, scope);
}
public static Setting<Boolean> boolSetting(String key, boolean defaultValue, boolean dynamic, Scope scope) {
@ -306,7 +320,7 @@ public class Setting<T> extends ToXContentToBytes {
}
@Override
public AbstractScopedSettings.SettingUpdater<Settings> newUpdater(Consumer<Settings> consumer, ESLogger logger, Consumer<Settings> accept) {
public AbstractScopedSettings.SettingUpdater<Settings> newUpdater(Consumer<Settings> consumer, ESLogger logger, Consumer<Settings> validator) {
if (isDynamic() == false) {
throw new IllegalStateException("setting [" + getKey() + "] is not dynamic");
}
@ -325,7 +339,7 @@ public class Setting<T> extends ToXContentToBytes {
Settings currentSettings = get(current);
Settings previousSettings = get(previous);
try {
accept.accept(currentSettings);
validator.accept(currentSettings);
} catch (Exception | AssertionError e) {
throw new IllegalArgumentException("illegal value can't update [" + key + "] from [" + previousSettings.getAsMap() + "] to [" + currentSettings.getAsMap() + "]", e);
}
@ -350,7 +364,7 @@ public class Setting<T> extends ToXContentToBytes {
return new Setting<>(key, defaultValue, (s) -> {
TimeValue timeValue = TimeValue.parseTimeValue(s, null, key);
if (timeValue.millis() < minValue.millis()) {
throw new ElasticsearchParseException("Failed to parse value [" + s + "] for setting [" + key + "] must be >= " + minValue);
throw new IllegalArgumentException("Failed to parse value [" + s + "] for setting [" + key + "] must be >= " + minValue);
}
return timeValue;
}, dynamic, scope);
@ -368,7 +382,7 @@ public class Setting<T> extends ToXContentToBytes {
return new Setting<>(key, (s) -> Double.toString(defaultValue), (s) -> {
final double d = Double.parseDouble(s);
if (d < minValue) {
throw new ElasticsearchParseException("Failed to parse value [" + s + "] for setting [" + key + "] must be >= " + minValue);
throw new IllegalArgumentException("Failed to parse value [" + s + "] for setting [" + key + "] must be >= " + minValue);
}
return d;
}, dynamic, scope);

View File

@ -50,13 +50,13 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
public static final Setting<ByteSizeValue> FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING = Setting.byteSizeSetting("indices.breaker.fielddata.limit", "60%", true, Setting.Scope.CLUSTER);
public static final Setting<Double> FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING = Setting.doubleSetting("indices.breaker.fielddata.overhead", 1.03d, 0.0d, true, Setting.Scope.CLUSTER);
public static final String FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING = "indices.breaker.fielddata.type";
public static final Setting<CircuitBreaker.Type> FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING = new Setting<>("indices.breaker.fielddata.type", "memory", CircuitBreaker.Type::parseValue, false, Setting.Scope.CLUSTER);
public static final Setting<ByteSizeValue> REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING = Setting.byteSizeSetting("indices.breaker.request.limit", "40%", true, Setting.Scope.CLUSTER);
public static final Setting<Double> REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING = Setting.doubleSetting("indices.breaker.request.overhead", 1.0d, 0.0d, true, Setting.Scope.CLUSTER);
public static final String REQUEST_CIRCUIT_BREAKER_TYPE_SETTING = "indices.breaker.request.type";
public static final Setting<CircuitBreaker.Type> REQUEST_CIRCUIT_BREAKER_TYPE_SETTING = new Setting<>("indices.breaker.request.type", "memory", CircuitBreaker.Type::parseValue, false, Setting.Scope.CLUSTER);
public static final String DEFAULT_BREAKER_TYPE = "memory";
private volatile BreakerSettings parentSettings;
private volatile BreakerSettings fielddataSettings;
@ -71,13 +71,13 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
this.fielddataSettings = new BreakerSettings(CircuitBreaker.FIELDDATA,
FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).bytes(),
FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings),
CircuitBreaker.Type.parseValue(settings.get(FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING, DEFAULT_BREAKER_TYPE))
FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING.get(settings)
);
this.requestSettings = new BreakerSettings(CircuitBreaker.REQUEST,
REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).bytes(),
REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings),
CircuitBreaker.Type.parseValue(settings.get(REQUEST_CIRCUIT_BREAKER_TYPE_SETTING, DEFAULT_BREAKER_TYPE))
REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.get(settings)
);
this.parentSettings = new BreakerSettings(CircuitBreaker.PARENT, TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).bytes(), 1.0, CircuitBreaker.Type.PARENT);
@ -93,10 +93,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
clusterSettings.addSettingsUpdateConsumer(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setRequestBreakerLimit);
}
private void setRequestBreakerLimit(ByteSizeValue newRequestMax, Double newRequestOverhead) {
long newRequestLimitBytes = newRequestMax == null ? HierarchyCircuitBreakerService.this.requestSettings.getLimit() : newRequestMax.bytes();
newRequestOverhead = newRequestOverhead == null ? HierarchyCircuitBreakerService.this.requestSettings.getOverhead() : newRequestOverhead;
BreakerSettings newRequestSettings = new BreakerSettings(CircuitBreaker.REQUEST, newRequestLimitBytes, newRequestOverhead,
BreakerSettings newRequestSettings = new BreakerSettings(CircuitBreaker.REQUEST, newRequestMax.bytes(), newRequestOverhead,
HierarchyCircuitBreakerService.this.requestSettings.getType());
registerBreaker(newRequestSettings);
HierarchyCircuitBreakerService.this.requestSettings = newRequestSettings;

View File

@ -202,7 +202,7 @@ public class Node implements Releasable {
injector = modules.createInjector();
client = injector.getInstance(Client.class);
threadPool.setNodeSettingsService(injector.getInstance(ClusterSettings.class));
threadPool.setClusterSettings(injector.getInstance(ClusterSettings.class));
success = true;
} catch (IOException ex) {
throw new ElasticsearchException("failed to bind service", ex);

View File

@ -250,7 +250,7 @@ public class ThreadPool extends AbstractComponent {
this.estimatedTimeThread.start();
}
public void setNodeSettingsService(ClusterSettings clusterSettings) {
public void setClusterSettings(ClusterSettings clusterSettings) {
if(settingsListenerIsSet.compareAndSet(false, true)) {
clusterSettings.addSettingsUpdateConsumer(THREADPOOL_GROUP_SETTING, this::updateSettings, (s) -> validate(s.getAsGroups()));
} else {

View File

@ -47,7 +47,7 @@ public class SettingTests extends ESTestCase {
try {
settingUpdater.apply(Settings.builder().put("a.byte.size", 12).build(), Settings.EMPTY);
fail("no unit");
} catch (ElasticsearchParseException ex) {
} catch (IllegalArgumentException ex) {
assertEquals("failed to parse setting [a.byte.size] with value [12] as a size in bytes: unit is missing or unrecognized", ex.getMessage());
}

View File

@ -92,7 +92,7 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
try {
threadPool = new ThreadPool(settingsBuilder().put("name", "testUpdateSettingsCanNotChangeThreadPoolType").build());
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
threadPool.setNodeSettingsService(clusterSettings);
threadPool.setClusterSettings(clusterSettings);
clusterSettings.applySettings(
settingsBuilder()
@ -118,7 +118,7 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
.put("name", "testCachedExecutorType").build();
threadPool = new ThreadPool(nodeSettings);
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
threadPool.setNodeSettingsService(clusterSettings);
threadPool.setClusterSettings(clusterSettings);
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED);
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
@ -169,7 +169,7 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
.put("name", "testFixedExecutorType").build();
threadPool = new ThreadPool(nodeSettings);
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
threadPool.setNodeSettingsService(clusterSettings);
threadPool.setClusterSettings(clusterSettings);
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
Settings settings = clusterSettings.applySettings(settingsBuilder()
.put("threadpool." + threadPoolName + ".size", "15")
@ -224,7 +224,7 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
.put("name", "testScalingExecutorType").build();
threadPool = new ThreadPool(nodeSettings);
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
threadPool.setNodeSettingsService(clusterSettings);
threadPool.setClusterSettings(clusterSettings);
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(1));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(10));
assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(5L));
@ -262,7 +262,7 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
.put("name", "testCachedExecutorType").build();
threadPool = new ThreadPool(nodeSettings);
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
threadPool.setNodeSettingsService(clusterSettings);
threadPool.setClusterSettings(clusterSettings);
assertEquals(info(threadPool, threadPoolName).getQueueSize().getSingles(), 1000L);
final CountDownLatch latch = new CountDownLatch(1);
@ -299,7 +299,7 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
.put("name", "testCustomThreadPool").build();
threadPool = new ThreadPool(nodeSettings);
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
threadPool.setNodeSettingsService(clusterSettings);
threadPool.setClusterSettings(clusterSettings);
ThreadPoolInfo groups = threadPool.info();
boolean foundPool1 = false;
boolean foundPool2 = false;

View File

@ -64,7 +64,7 @@ public class NettySizeHeaderFrameDecoderTests extends ESTestCase {
@Before
public void startThreadPool() {
threadPool = new ThreadPool(settings);
threadPool.setNodeSettingsService(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
threadPool.setClusterSettings(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
NetworkService networkService = new NetworkService(settings);
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(settings, threadPool), new NoneCircuitBreakerService());
nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT, new NamedWriteableRegistry());