Migrate the rest of NettyTransport settings to the new infra

Also does some consistency clean up, renaming trasnport.netty.* settings to transport.*

Closes #16307
This commit is contained in:
Boaz Leskes 2016-01-29 10:57:58 +01:00
parent af0e40ec7d
commit df80e8f215
8 changed files with 81 additions and 44 deletions

View File

@ -112,7 +112,7 @@ public class TransportClient extends AbstractClient {
final Settings.Builder settingsBuilder = settingsBuilder()
.put(NettyTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
.put(InternalSettingsPreparer.prepareSettings(settings))
.put(NettyTransport.NETWORK_SERVER.getKey(), false)
.put(NetworkService.NETWORK_SERVER.getKey(), false)
.put(Node.NODE_CLIENT_SETTING.getKey(), true)
.put(CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE);
return new PluginsService(settingsBuilder.build(), null, null, pluginClasses);

View File

@ -49,6 +49,7 @@ public class NetworkService extends AbstractComponent {
s -> s, false, Setting.Scope.CLUSTER);
public static final Setting<List<String>> GLOBAL_NETWORK_PUBLISHHOST_SETTING = Setting.listSetting("network.publish_host", GLOBAL_NETWORK_HOST_SETTING,
s -> s, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> NETWORK_SERVER = Setting.boolSetting("network.server", true, false, Setting.Scope.CLUSTER);
public static final class TcpSettings {
public static final Setting<Boolean> TCP_NO_DELAY = Setting.boolSetting("network.tcp.no_delay", true, false, Setting.Scope.CLUSTER);
@ -149,7 +150,7 @@ public class NetworkService extends AbstractComponent {
*/
// TODO: needs to be InetAddress[]
public InetAddress resolvePublishHostAddresses(String publishHosts[]) throws IOException {
if (publishHosts == null) {
if (publishHosts == null || publishHosts.length == 0) {
if (GLOBAL_NETWORK_PUBLISHHOST_SETTING.exists(settings) || GLOBAL_NETWORK_HOST_SETTING.exists(settings)) {
// if we have settings use them (we have a fallback to GLOBAL_NETWORK_HOST_SETTING inline
publishHosts = GLOBAL_NETWORK_PUBLISHHOST_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY);

View File

@ -226,6 +226,10 @@ public final class ClusterSettings extends AbstractScopedSettings {
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING,
Transport.TRANSPORT_TCP_COMPRESS,
TransportSettings.TRANSPORT_PROFILES_SETTING,
TransportSettings.HOST,
TransportSettings.PUBLISH_HOST,
TransportSettings.BIND_HOST,
TransportSettings.PUBLISH_PORT,
TransportSettings.PORT,
NettyTransport.WORKER_COUNT,
NettyTransport.CONNECTIONS_PER_NODE_RECOVERY,
@ -241,8 +245,14 @@ public final class ClusterSettings extends AbstractScopedSettings {
NettyTransport.NETTY_RECEIVE_PREDICTOR_SIZE,
NettyTransport.NETTY_RECEIVE_PREDICTOR_MIN,
NettyTransport.NETTY_RECEIVE_PREDICTOR_MAX,
NettyTransport.NETWORK_SERVER,
NetworkService.NETWORK_SERVER,
NettyTransport.NETTY_BOSS_COUNT,
NettyTransport.TCP_NO_DELAY,
NettyTransport.TCP_KEEP_ALIVE,
NettyTransport.TCP_REUSE_ADDRESS,
NettyTransport.TCP_SEND_BUFFER_SIZE,
NettyTransport.TCP_RECEIVE_BUFFER_SIZE,
NettyTransport.TCP_BLOCKING_SERVER,
NetworkService.GLOBAL_NETWORK_HOST_SETTING,
NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING,
NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING,

View File

@ -939,6 +939,14 @@ public final class Settings implements ToXContent {
return this;
}
/**
* Sets the setting with the provided setting key and an array of values.
*
* @param setting The setting key
* @param values The values
* @return The builder
*/
/**
* Sets the setting with the provided setting key and an array of values.
*
@ -947,6 +955,17 @@ public final class Settings implements ToXContent {
* @return The builder
*/
public Builder putArray(String setting, String... values) {
return putArray(setting, Arrays.asList(values));
}
/**
* Sets the setting with the provided setting key and a list of values.
*
* @param setting The setting key
* @param values The values
* @return The builder
*/
public Builder putArray(String setting, List<String> values) {
remove(setting);
int counter = 0;
while (true) {
@ -955,8 +974,8 @@ public final class Settings implements ToXContent {
break;
}
}
for (int i = 0; i < values.length; i++) {
put(setting + "." + i, values[i]);
for (int i = 0; i < values.size(); i++) {
put(setting + "." + i, values.get(i));
}
return this;
}

View File

@ -21,13 +21,21 @@ package org.elasticsearch.transport;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import java.util.List;
import static java.util.Collections.emptyList;
/**
* a collection of settings related to transport components, which are also needed in org.elasticsearch.bootstrap.Security
* This class should only contain static code which is *safe* to load before the security manager is enforced.
*/
final public class TransportSettings {
public static final Setting<List<String>> HOST = Setting.listSetting("transport.host", emptyList(), s -> s, false, Setting.Scope.CLUSTER);
public static final Setting<List<String>> PUBLISH_HOST = Setting.listSetting("transport.publish_host", HOST, s -> s, false, Setting.Scope.CLUSTER);
public static final Setting<List<String>> BIND_HOST = Setting.listSetting("transport.bind_host", HOST, s -> s, false, Setting.Scope.CLUSTER);
public static final Setting<String> PORT = new Setting<>("transport.tcp.port", "9300-9400", s -> s, false, Setting.Scope.CLUSTER);
public static final Setting<Integer> PUBLISH_PORT = Setting.intSetting("transport.publish_port", -1, -1, false, Setting.Scope.CLUSTER);
public static final String DEFAULT_PROFILE = "default";
public static final Setting<Settings> TRANSPORT_PROFILES_SETTING = Setting.groupSetting("transport.profiles.", true, Setting.Scope.CLUSTER);

View File

@ -119,12 +119,6 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_BLOCKING_SERVER;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_KEEP_ALIVE;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_NO_DELAY;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_RECEIVE_BUFFER_SIZE;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_REUSE_ADDRESS;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_SEND_BUFFER_SIZE;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.transport.NetworkExceptionHelper.isCloseConnectionException;
import static org.elasticsearch.common.transport.NetworkExceptionHelper.isConnectException;
@ -158,8 +152,16 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
public static final Setting<Integer> CONNECTIONS_PER_NODE_PING = Setting.intSetting("transport.connections_per_node.ping", 1, 1, false, Setting.Scope.CLUSTER);
// the scheduled internal ping interval setting, defaults to disabled (-1)
public static final Setting<TimeValue> PING_SCHEDULE = Setting.timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_BLOCKING_CLIENT = Setting.boolSetting("transport." + TcpSettings.TCP_BLOCKING_CLIENT.getKey(), TcpSettings.TCP_BLOCKING_CLIENT, false, Setting.Scope.CLUSTER);
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT = Setting.timeSetting("transport." + TcpSettings.TCP_CONNECT_TIMEOUT.getKey(), TcpSettings.TCP_CONNECT_TIMEOUT, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_BLOCKING_CLIENT = Setting.boolSetting("transport.tcp.blocking_client", TcpSettings.TCP_BLOCKING_CLIENT, false, Setting.Scope.CLUSTER);
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT = Setting.timeSetting("transport.tcp.connect_timeout", TcpSettings.TCP_CONNECT_TIMEOUT, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_NO_DELAY = Setting.boolSetting("transport.tcp_no_delay", TcpSettings.TCP_NO_DELAY, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_KEEP_ALIVE = Setting.boolSetting("transport.tcp.keep_alive", TcpSettings.TCP_KEEP_ALIVE, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_BLOCKING_SERVER = Setting.boolSetting("transport.tcp.blocking_server", TcpSettings.TCP_BLOCKING_SERVER, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_REUSE_ADDRESS = Setting.boolSetting("transport.tcp.reuse_address", TcpSettings.TCP_REUSE_ADDRESS, false, Setting.Scope.CLUSTER);
public static final Setting<ByteSizeValue> TCP_SEND_BUFFER_SIZE = Setting.byteSizeSetting("transport.tcp.send_buffer_size", TcpSettings.TCP_SEND_BUFFER_SIZE, false, Setting.Scope.CLUSTER);
public static final Setting<ByteSizeValue> TCP_RECEIVE_BUFFER_SIZE = Setting.byteSizeSetting("transport.tcp.receive_buffer_size", TcpSettings.TCP_RECEIVE_BUFFER_SIZE, false, Setting.Scope.CLUSTER);
public static final Setting<ByteSizeValue> NETTY_MAX_CUMULATION_BUFFER_CAPACITY = Setting.byteSizeSetting("transport.netty.max_cumulation_buffer_capacity", new ByteSizeValue(-1), false, Setting.Scope.CLUSTER);
public static final Setting<Integer> NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS = Setting.intSetting("transport.netty.max_composite_buffer_components", -1, -1, false, Setting.Scope.CLUSTER);
@ -179,7 +181,6 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_MAX = Setting.byteSizeSetting("transport.netty.receive_predictor_max", NETTY_RECEIVE_PREDICTOR_SIZE, false, Setting.Scope.CLUSTER);
public static final Setting<Integer> NETTY_BOSS_COUNT = Setting.intSetting("transport.netty.boss_count", 1, 1, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> NETWORK_SERVER = Setting.boolSetting("network.server", true, false, Setting.Scope.CLUSTER);
protected final NetworkService networkService;
protected final Version version;
@ -284,7 +285,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
boolean success = false;
try {
clientBootstrap = createClientBootstrap();
if (NETWORK_SERVER.get(settings)) {
if (NetworkService.NETWORK_SERVER.get(settings)) {
final OpenChannelsHandler openChannels = new OpenChannelsHandler(logger);
this.serverOpenChannels = openChannels;
@ -356,25 +357,25 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
clientBootstrap.setPipelineFactory(configureClientChannelPipelineFactory());
clientBootstrap.setOption("connectTimeoutMillis", connectTimeout.millis());
boolean tcpNoDelay = settings.getAsBoolean("transport.netty.tcp_no_delay", TCP_NO_DELAY.get(settings));
boolean tcpNoDelay = TCP_NO_DELAY.get(settings);
clientBootstrap.setOption("tcpNoDelay", tcpNoDelay);
boolean tcpKeepAlive = settings.getAsBoolean("transport.netty.tcp_keep_alive", TCP_KEEP_ALIVE.get(settings));
boolean tcpKeepAlive = TCP_KEEP_ALIVE.get(settings);
clientBootstrap.setOption("keepAlive", tcpKeepAlive);
ByteSizeValue tcpSendBufferSize = settings.getAsBytesSize("transport.netty.tcp_send_buffer_size", TCP_SEND_BUFFER_SIZE.get(settings));
ByteSizeValue tcpSendBufferSize = TCP_SEND_BUFFER_SIZE.get(settings);
if (tcpSendBufferSize.bytes() > 0) {
clientBootstrap.setOption("sendBufferSize", tcpSendBufferSize.bytes());
}
ByteSizeValue tcpReceiveBufferSize = settings.getAsBytesSize("transport.netty.tcp_receive_buffer_size", TCP_RECEIVE_BUFFER_SIZE.get(settings));
ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.get(settings);
if (tcpReceiveBufferSize.bytes() > 0) {
clientBootstrap.setOption("receiveBufferSize", tcpReceiveBufferSize.bytes());
}
clientBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
boolean reuseAddress = settings.getAsBoolean("transport.netty.reuse_address", TCP_REUSE_ADDRESS.get(settings));
boolean reuseAddress = TCP_REUSE_ADDRESS.get(settings);
clientBootstrap.setOption("reuseAddress", reuseAddress);
return clientBootstrap;
@ -383,31 +384,31 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
private Settings createFallbackSettings() {
Settings.Builder fallbackSettingsBuilder = settingsBuilder();
String fallbackBindHost = settings.get("transport.netty.bind_host", settings.get("transport.bind_host", settings.get("transport.host")));
if (fallbackBindHost != null) {
fallbackSettingsBuilder.put("bind_host", fallbackBindHost);
List<String> fallbackBindHost = TransportSettings.BIND_HOST.get(settings);
if (fallbackBindHost.isEmpty() == false) {
fallbackSettingsBuilder.putArray("bind_host", fallbackBindHost);
}
String fallbackPublishHost = settings.get("transport.netty.publish_host", settings.get("transport.publish_host", settings.get("transport.host")));
if (fallbackPublishHost != null) {
fallbackSettingsBuilder.put("publish_host", fallbackPublishHost);
List<String> fallbackPublishHost = TransportSettings.PUBLISH_HOST.get(settings);
if (fallbackPublishHost.isEmpty() == false) {
fallbackSettingsBuilder.putArray("publish_host", fallbackPublishHost);
}
boolean fallbackTcpNoDelay = settings.getAsBoolean("transport.netty.tcp_no_delay", TCP_NO_DELAY.get(settings));
boolean fallbackTcpNoDelay = settings.getAsBoolean("transport.netty.tcp_no_delay", TcpSettings.TCP_NO_DELAY.get(settings));
fallbackSettingsBuilder.put("tcp_no_delay", fallbackTcpNoDelay);
boolean fallbackTcpKeepAlive = settings.getAsBoolean("transport.netty.tcp_keep_alive", TCP_KEEP_ALIVE.get(settings));
boolean fallbackTcpKeepAlive = settings.getAsBoolean("transport.netty.tcp_keep_alive", TcpSettings.TCP_KEEP_ALIVE.get(settings));
fallbackSettingsBuilder.put("tcp_keep_alive", fallbackTcpKeepAlive);
boolean fallbackReuseAddress = settings.getAsBoolean("transport.netty.reuse_address", TCP_REUSE_ADDRESS.get(settings));
boolean fallbackReuseAddress = settings.getAsBoolean("transport.netty.reuse_address", TcpSettings.TCP_REUSE_ADDRESS.get(settings));
fallbackSettingsBuilder.put("reuse_address", fallbackReuseAddress);
ByteSizeValue fallbackTcpSendBufferSize = settings.getAsBytesSize("transport.netty.tcp_send_buffer_size", TCP_SEND_BUFFER_SIZE.get(settings));
ByteSizeValue fallbackTcpSendBufferSize = settings.getAsBytesSize("transport.netty.tcp_send_buffer_size", TcpSettings.TCP_SEND_BUFFER_SIZE.get(settings));
if (fallbackTcpSendBufferSize.bytes() >= 0) {
fallbackSettingsBuilder.put("tcp_send_buffer_size", fallbackTcpSendBufferSize);
}
ByteSizeValue fallbackTcpBufferSize = settings.getAsBytesSize("transport.netty.tcp_receive_buffer_size", TCP_RECEIVE_BUFFER_SIZE.get(settings));
ByteSizeValue fallbackTcpBufferSize = settings.getAsBytesSize("transport.netty.tcp_receive_buffer_size", TcpSettings.TCP_RECEIVE_BUFFER_SIZE.get(settings));
if (fallbackTcpBufferSize.bytes() >= 0) {
fallbackSettingsBuilder.put("tcp_receive_buffer_size", fallbackTcpBufferSize);
}
@ -495,7 +496,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
final String[] publishHosts;
if (TransportSettings.DEFAULT_PROFILE.equals(name)) {
publishHosts = settings.getAsArray("transport.netty.publish_host", settings.getAsArray("transport.publish_host", settings.getAsArray("transport.host", null)));
publishHosts = TransportSettings.PUBLISH_HOST.get(settings).toArray(Strings.EMPTY_ARRAY);
} else {
publishHosts = profileSettings.getAsArray("publish_host", boundAddressesHostStrings);
}
@ -507,15 +508,15 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
throw new BindTransportException("Failed to resolve publish address", e);
}
Integer publishPort;
int publishPort;
if (TransportSettings.DEFAULT_PROFILE.equals(name)) {
publishPort = settings.getAsInt("transport.netty.publish_port", settings.getAsInt("transport.publish_port", null));
publishPort = TransportSettings.PUBLISH_PORT.get(settings);
} else {
publishPort = profileSettings.getAsInt("publish_port", null);
publishPort = profileSettings.getAsInt("publish_port", -1);
}
// if port not explicitly provided, search for port of address in boundAddresses that matches publishInetAddress
if (publishPort == null) {
if (publishPort < 0) {
for (InetSocketAddress boundAddress : boundAddresses) {
InetAddress boundInetAddress = boundAddress.getAddress();
if (boundInetAddress.isAnyLocalAddress() || boundInetAddress.equals(publishInetAddress)) {
@ -526,7 +527,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
// if port still not matches, just take port of first bound address
if (publishPort == null) {
if (publishPort < 0) {
// TODO: In case of DEFAULT_PROFILE we should probably fail here, as publish address does not match any bound address
// In case of a custom profile, we might use the publish address of the default profile
publishPort = boundAddresses.get(0).getPort();
@ -538,15 +539,15 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
private void createServerBootstrap(String name, Settings settings) {
boolean blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", TCP_BLOCKING_SERVER.get(settings));
boolean blockingServer = TCP_BLOCKING_SERVER.get(settings);
String port = settings.get("port");
String bindHost = settings.get("bind_host");
String publishHost = settings.get("publish_host");
String tcpNoDelay = settings.get("tcp_no_delay");
String tcpKeepAlive = settings.get("tcp_keep_alive");
boolean reuseAddress = settings.getAsBoolean("reuse_address", NetworkUtils.defaultReuseAddress());
ByteSizeValue tcpSendBufferSize = settings.getAsBytesSize("tcp_send_buffer_size", TCP_SEND_BUFFER_SIZE.getDefault(settings));
ByteSizeValue tcpReceiveBufferSize = settings.getAsBytesSize("tcp_receive_buffer_size", TCP_RECEIVE_BUFFER_SIZE.getDefault(settings));
ByteSizeValue tcpSendBufferSize = TCP_SEND_BUFFER_SIZE.getDefault(settings);
ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.getDefault(settings);
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]",
name, workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeRecovery, connectionsPerNodeBulk, connectionsPerNodeReg, connectionsPerNodeState, connectionsPerNodePing, receivePredictorMin, receivePredictorMax);

View File

@ -52,7 +52,7 @@ public class NettySizeHeaderFrameDecoderTests extends ESTestCase {
private final Settings settings = settingsBuilder()
.put("name", "foo")
.put("transport.host", "127.0.0.1")
.put(TransportSettings.BIND_HOST.getKey(), "127.0.0.1")
.put(TransportSettings.PORT.getKey(), "0")
.build();

View File

@ -115,9 +115,7 @@ public class ClusterDiscoveryConfiguration extends NodeConfigurationSource {
} else {
// we need to pin the node port & host so we'd know where to point things
builder.put(TransportSettings.PORT.getKey(), unicastHostPorts[nodeOrdinal]);
builder.put("transport.host", IP_ADDR); // only bind on one IF we use v4 here by default
builder.put("transport.bind_host", IP_ADDR);
builder.put("transport.publish_host", IP_ADDR);
builder.put(TransportSettings.HOST.getKey(), IP_ADDR); // only bind on one IF we use v4 here by default
builder.put("http.enabled", false);
for (int i = 0; i < unicastHostOrdinals.length; i++) {
unicastHosts[i] = IP_ADDR + ":" + (unicastHostPorts[unicastHostOrdinals[i]]);