Settings: migrate network service to the new infra

This commit migrates all the settings under network service to the new settings infra.

It also adds some chaining utils to make fall back settings slightly less verbose.

Breaking (but I think acceptable)  - network.tcp.no_delay and network.tcp.keep_alive used to accept the value `default` which make us not set them at all on netty. Our default was true so we weren't using this feature. I removed it and now we only accept a true boolean.
This commit is contained in:
Boaz Leskes 2016-01-22 16:34:05 +01:00
parent ae5da3432c
commit ec31feca93
10 changed files with 195 additions and 135 deletions

View File

@ -19,7 +19,9 @@
package org.elasticsearch.common.network;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@ -41,31 +43,30 @@ public class NetworkService extends AbstractComponent {
/** By default, we bind to loopback interfaces */
public static final String DEFAULT_NETWORK_HOST = "_local_";
private static final String GLOBAL_NETWORK_HOST_SETTING = "network.host";
private static final String GLOBAL_NETWORK_BINDHOST_SETTING = "network.bind_host";
private static final String GLOBAL_NETWORK_PUBLISHHOST_SETTING = "network.publish_host";
public static final Setting<List<String>> GLOBAL_NETWORK_HOST_SETTING = Setting.listSetting("network.host", Arrays.asList(DEFAULT_NETWORK_HOST),
s -> s, false, Setting.Scope.CLUSTER);
public static final Setting<List<String>> GLOBAL_NETWORK_BINDHOST_SETTING = Setting.listSetting("network.bind_host", GLOBAL_NETWORK_HOST_SETTING,
s -> s, false, Setting.Scope.CLUSTER);
public static final Setting<List<String>> GLOBAL_NETWORK_PUBLISHHOST_SETTING = Setting.listSetting("network.publish_host", GLOBAL_NETWORK_HOST_SETTING,
s -> s, false, Setting.Scope.CLUSTER);
public static final class TcpSettings {
public static final String TCP_NO_DELAY = "network.tcp.no_delay";
public static final String TCP_KEEP_ALIVE = "network.tcp.keep_alive";
public static final String TCP_REUSE_ADDRESS = "network.tcp.reuse_address";
public static final String TCP_SEND_BUFFER_SIZE = "network.tcp.send_buffer_size";
public static final String TCP_RECEIVE_BUFFER_SIZE = "network.tcp.receive_buffer_size";
public static final String TCP_BLOCKING = "network.tcp.blocking";
public static final String TCP_BLOCKING_SERVER = "network.tcp.blocking_server";
public static final String TCP_BLOCKING_CLIENT = "network.tcp.blocking_client";
public static final String TCP_CONNECT_TIMEOUT = "network.tcp.connect_timeout";
public static final ByteSizeValue TCP_DEFAULT_SEND_BUFFER_SIZE = null;
public static final ByteSizeValue TCP_DEFAULT_RECEIVE_BUFFER_SIZE = null;
public static final TimeValue TCP_DEFAULT_CONNECT_TIMEOUT = new TimeValue(30, TimeUnit.SECONDS);
public static final Setting<Boolean> TCP_NO_DELAY = Setting.boolSetting("network.tcp.no_delay", true, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_KEEP_ALIVE = Setting.boolSetting("network.tcp.keep_alive", true, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_REUSE_ADDRESS = Setting.boolSetting("network.tcp.reuse_address", NetworkUtils.defaultReuseAddress(), false, Setting.Scope.CLUSTER);
public static final Setting<ByteSizeValue> TCP_SEND_BUFFER_SIZE = Setting.byteSizeSetting("network.tcp.send_buffer_size", new ByteSizeValue(-1), false, Setting.Scope.CLUSTER);
public static final Setting<ByteSizeValue> TCP_RECEIVE_BUFFER_SIZE = Setting.byteSizeSetting("network.tcp.receive_buffer_size", new ByteSizeValue(-1), false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_BLOCKING = Setting.boolSetting("network.tcp.blocking", false, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_BLOCKING_SERVER = Setting.boolSetting("network.tcp.blocking_server", TCP_BLOCKING, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_BLOCKING_CLIENT = Setting.boolSetting("network.tcp.blocking_client", TCP_BLOCKING, false, Setting.Scope.CLUSTER);
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT = Setting.timeSetting("network.tcp.connect_timeout", new TimeValue(30, TimeUnit.SECONDS), false, Setting.Scope.CLUSTER);
}
/**
* A custom name resolver can support custom lookup keys (my_net_key:ipv4) and also change
* the default inet address used in case no settings is provided.
*/
public static interface CustomNameResolver {
public interface CustomNameResolver {
/**
* Resolves the default value if possible. If not, return <tt>null</tt>.
*/
@ -94,6 +95,7 @@ public class NetworkService extends AbstractComponent {
/**
* Resolves {@code bindHosts} to a list of internet addresses. The list will
* not contain duplicate addresses.
*
* @param bindHosts list of hosts to bind to. this may contain special pseudo-hostnames
* such as _local_ (see the documentation). if it is null, it will be populated
* based on global default settings.
@ -102,21 +104,22 @@ public class NetworkService extends AbstractComponent {
public InetAddress[] resolveBindHostAddresses(String bindHosts[]) throws IOException {
// first check settings
if (bindHosts == null) {
bindHosts = settings.getAsArray(GLOBAL_NETWORK_BINDHOST_SETTING, settings.getAsArray(GLOBAL_NETWORK_HOST_SETTING, null));
}
// next check any registered custom resolvers
if (bindHosts == null) {
for (CustomNameResolver customNameResolver : customNameResolvers) {
InetAddress addresses[] = customNameResolver.resolveDefault();
if (addresses != null) {
return addresses;
if (GLOBAL_NETWORK_BINDHOST_SETTING.exists(settings) || GLOBAL_NETWORK_HOST_SETTING.exists(settings)) {
// if we have settings use them (we have a fallback to GLOBAL_NETWORK_HOST_SETTING inline
bindHosts = GLOBAL_NETWORK_BINDHOST_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY);
} else {
// next check any registered custom resolvers
for (CustomNameResolver customNameResolver : customNameResolvers) {
InetAddress addresses[] = customNameResolver.resolveDefault();
if (addresses != null) {
return addresses;
}
}
// we know it's not here. get the defaults
bindHosts = GLOBAL_NETWORK_BINDHOST_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY);
}
}
// finally, fill with our default
if (bindHosts == null) {
bindHosts = new String[] { DEFAULT_NETWORK_HOST };
}
InetAddress addresses[] = resolveInetAddresses(bindHosts);
// try to deal with some (mis)configuration
@ -138,6 +141,7 @@ public class NetworkService extends AbstractComponent {
* only one address is just a current limitation.
* <p>
* If {@code publishHosts} resolves to more than one address, <b>then one is selected with magic</b>
*
* @param publishHosts list of hosts to publish as. this may contain special pseudo-hostnames
* such as _local_ (see the documentation). if it is null, it will be populated
* based on global default settings.
@ -145,23 +149,23 @@ public class NetworkService extends AbstractComponent {
*/
// TODO: needs to be InetAddress[]
public InetAddress resolvePublishHostAddresses(String publishHosts[]) throws IOException {
// first check settings
if (publishHosts == null) {
publishHosts = settings.getAsArray(GLOBAL_NETWORK_PUBLISHHOST_SETTING, settings.getAsArray(GLOBAL_NETWORK_HOST_SETTING, null));
}
// next check any registered custom resolvers
if (publishHosts == null) {
for (CustomNameResolver customNameResolver : customNameResolvers) {
InetAddress addresses[] = customNameResolver.resolveDefault();
if (addresses != null) {
return addresses[0];
if (GLOBAL_NETWORK_PUBLISHHOST_SETTING.exists(settings) || GLOBAL_NETWORK_HOST_SETTING.exists(settings)) {
// if we have settings use them (we have a fallback to GLOBAL_NETWORK_HOST_SETTING inline
publishHosts = GLOBAL_NETWORK_PUBLISHHOST_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY);
} else {
// next check any registered custom resolvers
for (CustomNameResolver customNameResolver : customNameResolvers) {
InetAddress addresses[] = customNameResolver.resolveDefault();
if (addresses != null) {
return addresses[0];
}
}
// we know it's not here. get the defaults
publishHosts = GLOBAL_NETWORK_PUBLISHHOST_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY);
}
}
// finally, fill with our default
if (publishHosts == null) {
publishHosts = new String[] { DEFAULT_NETWORK_HOST };
}
InetAddress addresses[] = resolveInetAddresses(publishHosts);
// TODO: allow publishing multiple addresses
// for now... the hack begins
@ -184,17 +188,17 @@ public class NetworkService extends AbstractComponent {
throw new IllegalArgumentException("publish address: {" + NetworkAddress.format(address) + "} is wildcard, but multiple addresses specified: this makes no sense");
}
}
// 3. if we end out with multiple publish addresses, select by preference.
// don't warn the user, or they will get confused by bind_host vs publish_host etc.
if (addresses.length > 1) {
List<InetAddress> sorted = new ArrayList<>(Arrays.asList(addresses));
NetworkUtils.sortAddresses(sorted);
addresses = new InetAddress[] { sorted.get(0) };
addresses = new InetAddress[]{sorted.get(0)};
}
return addresses[0];
}
/** resolves (and deduplicates) host specification */
private InetAddress[] resolveInetAddresses(String hosts[]) throws IOException {
if (hosts.length == 0) {

View File

@ -39,6 +39,7 @@ import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
@ -192,6 +193,18 @@ public final class ClusterSettings extends AbstractScopedSettings {
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING,
Transport.TRANSPORT_PROFILES_SETTING,
Transport.TRANSPORT_TCP_COMPRESS,
NetworkService.GLOBAL_NETWORK_HOST_SETTING,
NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING,
NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING,
NetworkService.TcpSettings.TCP_NO_DELAY,
NetworkService.TcpSettings.TCP_KEEP_ALIVE,
NetworkService.TcpSettings.TCP_REUSE_ADDRESS,
NetworkService.TcpSettings.TCP_SEND_BUFFER_SIZE,
NetworkService.TcpSettings.TCP_RECEIVE_BUFFER_SIZE,
NetworkService.TcpSettings.TCP_BLOCKING,
NetworkService.TcpSettings.TCP_BLOCKING_SERVER,
NetworkService.TcpSettings.TCP_BLOCKING_CLIENT,
NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT,
IndexSettings.QUERY_STRING_ANALYZE_WILDCARD,
IndexSettings.QUERY_STRING_ALLOW_LEADING_WILDCARD,
PrimaryShardAllocator.NODE_INITIAL_SHARDS_SETTING,

View File

@ -41,6 +41,7 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* A setting. Encapsulates typical stuff like default value, parsing, and scope.
@ -70,8 +71,21 @@ public class Setting<T> extends ToXContentToBytes {
this.scope = scope;
}
/**
* Creates a new Setting instance
* @param key the settings key for this setting.
* @param fallBackSetting a setting to fall back to if the current setting is not set.
* @param parser a parser that parses the string rep into a complex datatype.
* @param dynamic true iff this setting can be dynamically updateable
* @param scope the scope of this setting
*/
public Setting(String key, Setting<T> fallBackSetting, Function<String, T> parser, boolean dynamic, Scope scope) {
this(key, fallBackSetting::getRaw, parser, dynamic, scope);
}
/**
* Returns the settings key or a prefix if this setting is a group setting
*
* @see #isGroupSetting()
*/
public final String getKey() {
@ -106,13 +120,21 @@ public class Setting<T> extends ToXContentToBytes {
}
/**
* Returns the default values string representation for this setting.
* Returns the default value string representation for this setting.
* @param settings a settings object for settings that has a default value depending on another setting if available
*/
public final String getDefault(Settings settings) {
public final String getDefaultRaw(Settings settings) {
return defaultValue.apply(settings);
}
/**
* Returns the default value for this setting.
* @param settings a settings object for settings that has a default value depending on another setting if available
*/
public final T getDefault(Settings settings) {
return parser.apply(getDefaultRaw(settings));
}
/**
* Returns <code>true</code> iff this setting is present in the given settings object. Otherwise <code>false</code>
*/
@ -337,6 +359,10 @@ public class Setting<T> extends ToXContentToBytes {
return new Setting<>(key, (s) -> Boolean.toString(defaultValue), Booleans::parseBooleanExact, dynamic, scope);
}
public static Setting<Boolean> boolSetting(String key, Setting<Boolean> fallbackSetting, boolean dynamic, Scope scope) {
return new Setting<>(key, fallbackSetting, Booleans::parseBooleanExact, dynamic, scope);
}
public static Setting<ByteSizeValue> byteSizeSetting(String key, String percentage, boolean dynamic, Scope scope) {
return new Setting<>(key, (s) -> percentage, (s) -> MemorySizeValue.parseBytesSizeValueOrHeapRatio(s, key), dynamic, scope);
}
@ -352,25 +378,15 @@ public class Setting<T> extends ToXContentToBytes {
public static <T> Setting<List<T>> listSetting(String key, List<String> defaultStringValue, Function<String, T> singleValueParser, boolean dynamic, Scope scope) {
return listSetting(key, (s) -> defaultStringValue, singleValueParser, dynamic, scope);
}
public static <T> Setting<List<T>> listSetting(String key, Setting<List<T>> fallbackSetting, Function<String, T> singleValueParser, boolean dynamic, Scope scope) {
return listSetting(key, (s) -> parseableStringToList(fallbackSetting.getRaw(s)), singleValueParser, dynamic, scope);
}
public static <T> Setting<List<T>> listSetting(String key, Function<Settings, List<String>> defaultStringValue, Function<String, T> singleValueParser, boolean dynamic, Scope scope) {
Function<String, List<T>> parser = (s) -> {
try (XContentParser xContentParser = XContentType.JSON.xContent().createParser(s)){
XContentParser.Token token = xContentParser.nextToken();
if (token != XContentParser.Token.START_ARRAY) {
throw new IllegalArgumentException("expected START_ARRAY but got " + token);
}
ArrayList<T> list = new ArrayList<>();
while ((token = xContentParser.nextToken()) !=XContentParser.Token.END_ARRAY) {
if (token != XContentParser.Token.VALUE_STRING) {
throw new IllegalArgumentException("expected VALUE_STRING but got " + token);
}
list.add(singleValueParser.apply(xContentParser.text()));
}
return list;
} catch (IOException e) {
throw new IllegalArgumentException("failed to parse array", e);
}
};
Function<String, List<T>> parser = (s) ->
parseableStringToList(s).stream().map(singleValueParser).collect(Collectors.toList());
return new Setting<List<T>>(key, (s) -> arrayToParsableString(defaultStringValue.apply(s).toArray(Strings.EMPTY_ARRAY)), parser, dynamic, scope) {
private final Pattern pattern = Pattern.compile(Pattern.quote(key)+"(\\.\\d+)?");
@Override
@ -391,6 +407,26 @@ public class Setting<T> extends ToXContentToBytes {
};
}
private static List<String> parseableStringToList(String parsableString) {
try (XContentParser xContentParser = XContentType.JSON.xContent().createParser(parsableString)) {
XContentParser.Token token = xContentParser.nextToken();
if (token != XContentParser.Token.START_ARRAY) {
throw new IllegalArgumentException("expected START_ARRAY but got " + token);
}
ArrayList<String> list = new ArrayList<>();
while ((token = xContentParser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token != XContentParser.Token.VALUE_STRING) {
throw new IllegalArgumentException("expected VALUE_STRING but got " + token);
}
list.add(xContentParser.text());
}
return list;
} catch (IOException e) {
throw new IllegalArgumentException("failed to parse array", e);
}
}
private static String arrayToParsableString(String[] array) {
try {
XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent());

View File

@ -19,7 +19,6 @@
package org.elasticsearch.http.netty;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
@ -27,7 +26,6 @@ import org.elasticsearch.common.netty.NettyUtils;
import org.elasticsearch.common.netty.OpenChannelsHandler;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Scope;
import org.elasticsearch.common.settings.Settings;
@ -77,9 +75,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_BLOCKING;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_BLOCKING_SERVER;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_DEFAULT_RECEIVE_BUFFER_SIZE;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_DEFAULT_SEND_BUFFER_SIZE;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_KEEP_ALIVE;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_NO_DELAY;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_RECEIVE_BUFFER_SIZE;
@ -98,11 +93,11 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
public static final Setting<Boolean> SETTING_CORS_ENABLED = Setting.boolSetting("http.cors.enabled", false, false, Scope.CLUSTER);
public static final String SETTING_CORS_ALLOW_ORIGIN = "http.cors.allow-origin";
public static final Setting<Integer> SETTING_CORS_MAX_AGE = Setting.intSetting("http.cors.max-age", 1728000, false, Scope.CLUSTER);
public static final Setting<Integer> SETTING_CORS_MAX_AGE = Setting.intSetting("http.cors.max-age", 1728000, false, Scope.CLUSTER);
public static final String SETTING_CORS_ALLOW_METHODS = "http.cors.allow-methods";
public static final String SETTING_CORS_ALLOW_HEADERS = "http.cors.allow-headers";
public static final Setting<Boolean> SETTING_CORS_ALLOW_CREDENTIALS = Setting.boolSetting("http.cors.allow-credentials", false, false, Scope.CLUSTER);
public static final Setting<Boolean> SETTING_PIPELINING = Setting.boolSetting("http.pipelining", true, false, Scope.CLUSTER);
public static final String SETTING_PIPELINING_MAX_EVENTS = "http.pipelining.max_events";
public static final String SETTING_HTTP_COMPRESSION = "http.compression";
@ -144,8 +139,8 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
protected int publishPort;
protected final String tcpNoDelay;
protected final String tcpKeepAlive;
protected final boolean tcpNoDelay;
protected final boolean tcpKeepAlive;
protected final boolean reuseAddress;
protected final ByteSizeValue tcpSendBufferSize;
@ -188,16 +183,16 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
this.maxCumulationBufferCapacity = settings.getAsBytesSize("http.netty.max_cumulation_buffer_capacity", null);
this.maxCompositeBufferComponents = settings.getAsInt("http.netty.max_composite_buffer_components", -1);
this.workerCount = settings.getAsInt("http.netty.worker_count", EsExecutors.boundedNumberOfProcessors(settings) * 2);
this.blockingServer = settings.getAsBoolean("http.netty.http.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
this.blockingServer = settings.getAsBoolean("http.netty.http.blocking_server", TCP_BLOCKING.get(settings));
this.port = settings.get("http.netty.port", settings.get("http.port", DEFAULT_PORT_RANGE));
this.bindHosts = settings.getAsArray("http.netty.bind_host", settings.getAsArray("http.bind_host", settings.getAsArray("http.host", null)));
this.publishHosts = settings.getAsArray("http.netty.publish_host", settings.getAsArray("http.publish_host", settings.getAsArray("http.host", null)));
this.publishPort = settings.getAsInt("http.netty.publish_port", settings.getAsInt("http.publish_port", 0));
this.tcpNoDelay = settings.get("http.netty.tcp_no_delay", settings.get(TCP_NO_DELAY, "true"));
this.tcpKeepAlive = settings.get("http.netty.tcp_keep_alive", settings.get(TCP_KEEP_ALIVE, "true"));
this.reuseAddress = settings.getAsBoolean("http.netty.reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress()));
this.tcpSendBufferSize = settings.getAsBytesSize("http.netty.tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE));
this.tcpReceiveBufferSize = settings.getAsBytesSize("http.netty.tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE));
this.tcpNoDelay = settings.getAsBoolean("http.netty.tcp_no_delay", TCP_NO_DELAY.get(settings));
this.tcpKeepAlive = settings.getAsBoolean("http.netty.tcp_keep_alive", TCP_KEEP_ALIVE.get(settings));
this.reuseAddress = settings.getAsBoolean("http.netty.reuse_address", TCP_REUSE_ADDRESS.get(settings));
this.tcpSendBufferSize = settings.getAsBytesSize("http.netty.tcp_send_buffer_size", TCP_SEND_BUFFER_SIZE.get(settings));
this.tcpReceiveBufferSize = settings.getAsBytesSize("http.netty.tcp_receive_buffer_size", TCP_RECEIVE_BUFFER_SIZE.get(settings));
this.detailedErrorsEnabled = SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings);
long defaultReceiverPredictor = 512 * 1024;
@ -259,16 +254,13 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
serverBootstrap.setPipelineFactory(configureServerChannelPipelineFactory());
if (!"default".equals(tcpNoDelay)) {
serverBootstrap.setOption("child.tcpNoDelay", Booleans.parseBoolean(tcpNoDelay, null));
}
if (!"default".equals(tcpKeepAlive)) {
serverBootstrap.setOption("child.keepAlive", Booleans.parseBoolean(tcpKeepAlive, null));
}
if (tcpSendBufferSize != null && tcpSendBufferSize.bytes() > 0) {
serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
serverBootstrap.setOption("child.keepAlive", tcpKeepAlive);
if (tcpSendBufferSize.bytes() > 0) {
serverBootstrap.setOption("child.sendBufferSize", tcpSendBufferSize.bytes());
}
if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.bytes() > 0) {
if (tcpReceiveBufferSize.bytes() > 0) {
serverBootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize.bytes());
}
serverBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
@ -310,7 +302,8 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
throw new BindHttpException("Publish address [" + publishInetAddress + "] does not match any of the bound addresses [" + boundAddresses + "]");
}
final InetSocketAddress publishAddress = new InetSocketAddress(publishInetAddress, publishPort);;
final InetSocketAddress publishAddress = new InetSocketAddress(publishInetAddress, publishPort);
;
this.boundAddress = new BoundTransportAddress(boundAddresses.toArray(new TransportAddress[boundAddresses.size()]), new InetSocketTransportAddress(publishAddress));
}

View File

@ -117,13 +117,9 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_BLOCKING;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_BLOCKING_CLIENT;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_BLOCKING_SERVER;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_DEFAULT_CONNECT_TIMEOUT;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_DEFAULT_RECEIVE_BUFFER_SIZE;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_DEFAULT_SEND_BUFFER_SIZE;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_KEEP_ALIVE;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_NO_DELAY;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_RECEIVE_BUFFER_SIZE;
@ -221,8 +217,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
this.workerCount = settings.getAsInt(WORKER_COUNT, EsExecutors.boundedNumberOfProcessors(settings) * 2);
this.blockingClient = settings.getAsBoolean("transport.netty.transport.tcp.blocking_client", settings.getAsBoolean(TCP_BLOCKING_CLIENT, settings.getAsBoolean(TCP_BLOCKING, false)));
this.connectTimeout = this.settings.getAsTime("transport.netty.connect_timeout", settings.getAsTime("transport.tcp.connect_timeout", settings.getAsTime(TCP_CONNECT_TIMEOUT, TCP_DEFAULT_CONNECT_TIMEOUT)));
this.blockingClient = settings.getAsBoolean("transport.netty.transport.tcp.blocking_client", TCP_BLOCKING_CLIENT.get(settings));
this.connectTimeout = this.settings.getAsTime("transport.netty.connect_timeout", settings.getAsTime("transport.tcp.connect_timeout", TCP_CONNECT_TIMEOUT.get(settings)));
this.maxCumulationBufferCapacity = this.settings.getAsBytesSize("transport.netty.max_cumulation_buffer_capacity", null);
this.maxCompositeBufferComponents = this.settings.getAsInt("transport.netty.max_composite_buffer_components", -1);
this.compress = Transport.TRANSPORT_TCP_COMPRESS.get(settings);
@ -362,29 +358,25 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
clientBootstrap.setPipelineFactory(configureClientChannelPipelineFactory());
clientBootstrap.setOption("connectTimeoutMillis", connectTimeout.millis());
String tcpNoDelay = settings.get("transport.netty.tcp_no_delay", settings.get(TCP_NO_DELAY, "true"));
if (!"default".equals(tcpNoDelay)) {
clientBootstrap.setOption("tcpNoDelay", Booleans.parseBoolean(tcpNoDelay, null));
}
boolean tcpNoDelay = settings.getAsBoolean("transport.netty.tcp_no_delay", TCP_NO_DELAY.get(settings));
clientBootstrap.setOption("tcpNoDelay", tcpNoDelay);
String tcpKeepAlive = settings.get("transport.netty.tcp_keep_alive", settings.get(TCP_KEEP_ALIVE, "true"));
if (!"default".equals(tcpKeepAlive)) {
clientBootstrap.setOption("keepAlive", Booleans.parseBoolean(tcpKeepAlive, null));
}
boolean tcpKeepAlive = settings.getAsBoolean("transport.netty.tcp_keep_alive", TCP_KEEP_ALIVE.get(settings));
clientBootstrap.setOption("keepAlive", tcpKeepAlive);
ByteSizeValue tcpSendBufferSize = settings.getAsBytesSize("transport.netty.tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE));
if (tcpSendBufferSize != null && tcpSendBufferSize.bytes() > 0) {
ByteSizeValue tcpSendBufferSize = settings.getAsBytesSize("transport.netty.tcp_send_buffer_size", TCP_SEND_BUFFER_SIZE.get(settings));
if (tcpSendBufferSize.bytes() > 0) {
clientBootstrap.setOption("sendBufferSize", tcpSendBufferSize.bytes());
}
ByteSizeValue tcpReceiveBufferSize = settings.getAsBytesSize("transport.netty.tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE));
if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.bytes() > 0) {
ByteSizeValue tcpReceiveBufferSize = settings.getAsBytesSize("transport.netty.tcp_receive_buffer_size", TCP_RECEIVE_BUFFER_SIZE.get(settings));
if (tcpReceiveBufferSize.bytes() > 0) {
clientBootstrap.setOption("receiveBufferSize", tcpReceiveBufferSize.bytes());
}
clientBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
boolean reuseAddress = settings.getAsBoolean("transport.netty.reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress()));
boolean reuseAddress = settings.getAsBoolean("transport.netty.reuse_address", TCP_REUSE_ADDRESS.get(settings));
clientBootstrap.setOption("reuseAddress", reuseAddress);
return clientBootstrap;
@ -403,26 +395,22 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
fallbackSettingsBuilder.put("publish_host", fallbackPublishHost);
}
String fallbackTcpNoDelay = settings.get("transport.netty.tcp_no_delay", settings.get(TCP_NO_DELAY, "true"));
if (fallbackTcpNoDelay != null) {
fallbackSettingsBuilder.put("tcp_no_delay", fallbackTcpNoDelay);
}
boolean fallbackTcpNoDelay = settings.getAsBoolean("transport.netty.tcp_no_delay", TCP_NO_DELAY.get(settings));
fallbackSettingsBuilder.put("tcp_no_delay", fallbackTcpNoDelay);
String fallbackTcpKeepAlive = settings.get("transport.netty.tcp_keep_alive", settings.get(TCP_KEEP_ALIVE, "true"));
if (fallbackTcpKeepAlive != null) {
boolean fallbackTcpKeepAlive = settings.getAsBoolean("transport.netty.tcp_keep_alive", TCP_KEEP_ALIVE.get(settings));
fallbackSettingsBuilder.put("tcp_keep_alive", fallbackTcpKeepAlive);
}
boolean fallbackReuseAddress = settings.getAsBoolean("transport.netty.reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress()));
boolean fallbackReuseAddress = settings.getAsBoolean("transport.netty.reuse_address", TCP_REUSE_ADDRESS.get(settings));
fallbackSettingsBuilder.put("reuse_address", fallbackReuseAddress);
ByteSizeValue fallbackTcpSendBufferSize = settings.getAsBytesSize("transport.netty.tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE));
if (fallbackTcpSendBufferSize != null) {
ByteSizeValue fallbackTcpSendBufferSize = settings.getAsBytesSize("transport.netty.tcp_send_buffer_size", TCP_SEND_BUFFER_SIZE.get(settings));
if (fallbackTcpSendBufferSize.bytes() >= 0) {
fallbackSettingsBuilder.put("tcp_send_buffer_size", fallbackTcpSendBufferSize);
}
ByteSizeValue fallbackTcpBufferSize = settings.getAsBytesSize("transport.netty.tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE));
if (fallbackTcpBufferSize != null) {
ByteSizeValue fallbackTcpBufferSize = settings.getAsBytesSize("transport.netty.tcp_receive_buffer_size", TCP_RECEIVE_BUFFER_SIZE.get(settings));
if (fallbackTcpBufferSize.bytes() >= 0) {
fallbackSettingsBuilder.put("tcp_receive_buffer_size", fallbackTcpBufferSize);
}
@ -552,15 +540,15 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
private void createServerBootstrap(String name, Settings settings) {
boolean blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", this.settings.getAsBoolean(TCP_BLOCKING_SERVER, this.settings.getAsBoolean(TCP_BLOCKING, false)));
boolean blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", TCP_BLOCKING_SERVER.get(settings));
String port = settings.get("port");
String bindHost = settings.get("bind_host");
String publishHost = settings.get("publish_host");
String tcpNoDelay = settings.get("tcp_no_delay");
String tcpKeepAlive = settings.get("tcp_keep_alive");
boolean reuseAddress = settings.getAsBoolean("reuse_address", NetworkUtils.defaultReuseAddress());
ByteSizeValue tcpSendBufferSize = settings.getAsBytesSize("tcp_send_buffer_size", TCP_DEFAULT_SEND_BUFFER_SIZE);
ByteSizeValue tcpReceiveBufferSize = settings.getAsBytesSize("tcp_receive_buffer_size", TCP_DEFAULT_RECEIVE_BUFFER_SIZE);
ByteSizeValue tcpSendBufferSize = settings.getAsBytesSize("tcp_send_buffer_size", TCP_SEND_BUFFER_SIZE.getDefault(settings));
ByteSizeValue tcpReceiveBufferSize = settings.getAsBytesSize("tcp_receive_buffer_size", TCP_RECEIVE_BUFFER_SIZE.getDefault(settings));
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]",
name, workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeRecovery, connectionsPerNodeBulk, connectionsPerNodeReg, connectionsPerNodeState, connectionsPerNodePing, receivePredictorMin, receivePredictorMax);

View File

@ -104,13 +104,17 @@ public class SettingTests extends ESTestCase {
TimeValue defautlValue = TimeValue.timeValueMillis(randomIntBetween(0, 1000000));
Setting<TimeValue> setting = Setting.positiveTimeSetting("my.time.value", defautlValue, randomBoolean(), Setting.Scope.CLUSTER);
assertFalse(setting.isGroupSetting());
String aDefault = setting.getDefault(Settings.EMPTY);
String aDefault = setting.getDefaultRaw(Settings.EMPTY);
assertEquals(defautlValue.millis() + "ms", aDefault);
assertEquals(defautlValue.millis(), setting.get(Settings.EMPTY).millis());
assertEquals(defautlValue, setting.getDefault(Settings.EMPTY));
Setting<String> secondaryDefault = new Setting<>("foo.bar", (s) -> s.get("old.foo.bar", "some_default"), (s) -> s, randomBoolean(), Setting.Scope.CLUSTER);
assertEquals("some_default", secondaryDefault.get(Settings.EMPTY));
assertEquals("42", secondaryDefault.get(Settings.builder().put("old.foo.bar", 42).build()));
Setting<String> secondaryDefaultViaSettings = new Setting<>("foo.bar", secondaryDefault, (s) -> s, randomBoolean(), Setting.Scope.CLUSTER);
assertEquals("some_default", secondaryDefaultViaSettings.get(Settings.EMPTY));
assertEquals("42", secondaryDefaultViaSettings.get(Settings.builder().put("old.foo.bar", 42).build()));
}
public void testComplexType() {
@ -298,6 +302,26 @@ public class SettingTests extends ESTestCase {
for (int i = 0; i < intValues.size(); i++) {
assertEquals(i, intValues.get(i).intValue());
}
Setting<List<String>> settingWithFallback = Setting.listSetting("foo.baz", listSetting, s -> s, true, Setting.Scope.CLUSTER);
value = settingWithFallback.get(Settings.EMPTY);
assertEquals(1, value.size());
assertEquals("foo,bar", value.get(0));
value = settingWithFallback.get(Settings.builder().putArray("foo.bar", "1", "2").build());
assertEquals(2, value.size());
assertEquals("1", value.get(0));
assertEquals("2", value.get(1));
value = settingWithFallback.get(Settings.builder().putArray("foo.baz", "3", "4").build());
assertEquals(2, value.size());
assertEquals("3", value.get(0));
assertEquals("4", value.get(1));
value = settingWithFallback.get(Settings.builder().putArray("foo.baz", "3", "4").putArray("foo.bar", "1", "2").build());
assertEquals(2, value.size());
assertEquals("3", value.get(0));
assertEquals("4", value.get(1));
}
public void testListSettingAcceptsNumberSyntax() {

View File

@ -64,11 +64,11 @@ public class CircuitBreakerServiceIT extends ESIntegTestCase {
logger.info("--> resetting breaker settings");
Settings resetSettings = settingsBuilder()
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(),
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING.getDefault(null))
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING.getDefaultRaw(null))
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING.getKey(),
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING.getDefault(null))
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING.getDefaultRaw(null))
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(),
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getDefault(null))
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getDefaultRaw(null))
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING.getKey(), 1.0)
.build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings));

View File

@ -61,7 +61,7 @@ public class DateHistogramOffsetIT extends ESIntegTestCase {
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(AssertingLocalTransport.ASSERTING_TRANSPORT_MIN_VERSION_KEY, Version.V_1_4_0_Beta1).build();
.put(AssertingLocalTransport.ASSERTING_TRANSPORT_MIN_VERSION_KEY.getKey(), Version.V_1_4_0_Beta1).build();
}
@Before

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
@ -60,8 +61,10 @@ public class AssertingLocalTransport extends LocalTransport {
}
}
public static final String ASSERTING_TRANSPORT_MIN_VERSION_KEY = "transport.asserting.version.min";
public static final String ASSERTING_TRANSPORT_MAX_VERSION_KEY = "transport.asserting.version.max";
public static final Setting<Version> ASSERTING_TRANSPORT_MIN_VERSION_KEY = new Setting<>("transport.asserting.version.min",
Version.CURRENT.minimumCompatibilityVersion().toString(), Version::fromString, false, Setting.Scope.CLUSTER);
public static final Setting<Version> ASSERTING_TRANSPORT_MAX_VERSION_KEY = new Setting<>("transport.asserting.version.max",
Version.CURRENT.toString(), Version::fromString, false, Setting.Scope.CLUSTER);
private final Random random;
private final Version minVersion;
private final Version maxVersion;
@ -71,8 +74,8 @@ public class AssertingLocalTransport extends LocalTransport {
super(settings, threadPool, version, namedWriteableRegistry);
final long seed = ESIntegTestCase.INDEX_TEST_SEED_SETTING.get(settings);
random = new Random(seed);
minVersion = settings.getAsVersion(ASSERTING_TRANSPORT_MIN_VERSION_KEY, Version.V_0_18_0);
maxVersion = settings.getAsVersion(ASSERTING_TRANSPORT_MAX_VERSION_KEY, Version.CURRENT);
minVersion = ASSERTING_TRANSPORT_MIN_VERSION_KEY.get(settings);
maxVersion = ASSERTING_TRANSPORT_MAX_VERSION_KEY.get(settings);
}
@Override

View File

@ -34,7 +34,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RequestHandlerRegistry;
@ -283,7 +282,7 @@ public class MockTransportService extends TransportService {
}
// TODO: Replace with proper setting
TimeValue connectingTimeout = NetworkService.TcpSettings.TCP_DEFAULT_CONNECT_TIMEOUT;
TimeValue connectingTimeout = NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT.getDefault(Settings.EMPTY);
try {
if (delay.millis() < connectingTimeout.millis()) {
Thread.sleep(delay.millis());
@ -306,7 +305,7 @@ public class MockTransportService extends TransportService {
}
// TODO: Replace with proper setting
TimeValue connectingTimeout = NetworkService.TcpSettings.TCP_DEFAULT_CONNECT_TIMEOUT;
TimeValue connectingTimeout = NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT.getDefault(Settings.EMPTY);
try {
if (delay.millis() < connectingTimeout.millis()) {
Thread.sleep(delay.millis());