diff --git a/core/src/main/java/org/elasticsearch/bootstrap/Security.java b/core/src/main/java/org/elasticsearch/bootstrap/Security.java index 075456cd9e4..2b0812c5577 100644 --- a/core/src/main/java/org/elasticsearch/bootstrap/Security.java +++ b/core/src/main/java/org/elasticsearch/bootstrap/Security.java @@ -23,6 +23,7 @@ import org.elasticsearch.SecureSM; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.http.HttpTransportSettings; @@ -320,30 +321,15 @@ final class Security { * @param policy the {@link Permissions} instance to apply the dynamic {@link SocketPermission}s to * @param settings the {@link Settings} instance to read the transport settings from */ - private static void addSocketPermissionForTransportProfiles( - final Permissions policy, - final Settings settings) { + private static void addSocketPermissionForTransportProfiles(final Permissions policy, final Settings settings) { // transport is way over-engineered - final Map profiles = new HashMap<>(TcpTransport.TRANSPORT_PROFILES_SETTING.get(settings).getAsGroups()); - profiles.putIfAbsent(TcpTransport.DEFAULT_PROFILE, Settings.EMPTY); - - // loop through all profiles and add permissions for each one, if it's valid; otherwise Netty transports are lenient and ignores it - for (final Map.Entry entry : profiles.entrySet()) { - final Settings profileSettings = entry.getValue(); - final String name = entry.getKey(); - - // a profile is only valid if it's the default profile, or if it has an actual name and specifies a port - // TODO: can this leniency be removed? - final boolean valid = - TcpTransport.DEFAULT_PROFILE.equals(name) || - (name != null && name.length() > 0 && profileSettings.get("port") != null); - if (valid) { - final String transportRange = profileSettings.get("port"); - if (transportRange != null) { - addSocketPermissionForPortRange(policy, transportRange); - } else { - addSocketPermissionForTransport(policy, settings); - } + Set profiles = TcpTransport.getProfileSettings(settings); + Set uniquePortRanges = new HashSet<>(); + // loop through all profiles and add permissions for each one + for (final TcpTransport.ProfileSettings profile : profiles) { + if (uniquePortRanges.add(profile.portOrRange)) { + // profiles fall back to the transport.port if it's not explicit but we want to only add one permission per range + addSocketPermissionForPortRange(policy, profile.portOrRange); } } } diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index e478ea49a01..d70415718e3 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -270,12 +270,20 @@ public final class ClusterSettings extends AbstractScopedSettings { HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING, HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING, Transport.TRANSPORT_TCP_COMPRESS, - TcpTransport.TRANSPORT_PROFILES_SETTING, TcpTransport.HOST, TcpTransport.PUBLISH_HOST, TcpTransport.BIND_HOST, TcpTransport.PUBLISH_PORT, TcpTransport.PORT, + TcpTransport.BIND_HOST_PROFILE, + TcpTransport.PUBLISH_HOST_PROFILE, + TcpTransport.PUBLISH_PORT_PROFILE, + TcpTransport.PORT_PROFILE, + TcpTransport.TCP_NO_DELAY_PROFILE, + TcpTransport.TCP_KEEP_ALIVE_PROFILE, + TcpTransport.TCP_REUSE_ADDRESS_PROFILE, + TcpTransport.TCP_SEND_BUFFER_SIZE_PROFILE, + TcpTransport.TCP_RECEIVE_BUFFER_SIZE_PROFILE, TcpTransport.CONNECTIONS_PER_NODE_RECOVERY, TcpTransport.CONNECTIONS_PER_NODE_BULK, TcpTransport.CONNECTIONS_PER_NODE_REG, diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index f03b949c581..7e1d3422de9 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -108,6 +108,7 @@ import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.Collections.unmodifiableMap; +import static org.elasticsearch.common.settings.Setting.affixKeySetting; import static org.elasticsearch.common.settings.Setting.boolSetting; import static org.elasticsearch.common.settings.Setting.groupSetting; import static org.elasticsearch.common.settings.Setting.intSetting; @@ -133,8 +134,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i public static final Setting PUBLISH_PORT = intSetting("transport.publish_port", -1, -1, Setting.Property.NodeScope); public static final String DEFAULT_PROFILE = "default"; - public static final Setting TRANSPORT_PROFILES_SETTING = - groupSetting("transport.profiles.", Setting.Property.Dynamic, Setting.Property.NodeScope); // the scheduled internal ping interval setting, defaults to disabled (-1) public static final Setting PING_SCHEDULE = timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), Setting.Property.NodeScope); @@ -164,6 +163,26 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i Setting.Property.NodeScope); + public static final Setting.AffixSetting TCP_NO_DELAY_PROFILE = affixKeySetting("transport.profiles.", "tcp_no_delay", + key -> boolSetting(key, TcpTransport.TCP_NO_DELAY, Setting.Property.NodeScope)); + public static final Setting.AffixSetting TCP_KEEP_ALIVE_PROFILE = affixKeySetting("transport.profiles.", "tcp_keep_alive", + key -> boolSetting(key, TcpTransport.TCP_KEEP_ALIVE, Setting.Property.NodeScope)); + public static final Setting.AffixSetting TCP_REUSE_ADDRESS_PROFILE = affixKeySetting("transport.profiles.", "reuse_address", + key -> boolSetting(key, TcpTransport.TCP_REUSE_ADDRESS, Setting.Property.NodeScope)); + public static final Setting.AffixSetting TCP_SEND_BUFFER_SIZE_PROFILE = affixKeySetting("transport.profiles.", + "send_buffer_size", key -> Setting.byteSizeSetting(key, TcpTransport.TCP_SEND_BUFFER_SIZE, Setting.Property.NodeScope)); + public static final Setting.AffixSetting TCP_RECEIVE_BUFFER_SIZE_PROFILE = affixKeySetting("transport.profiles.", + "receive_buffer_size", key -> Setting.byteSizeSetting(key, TcpTransport.TCP_RECEIVE_BUFFER_SIZE, Setting.Property.NodeScope)); + + public static final Setting.AffixSetting> BIND_HOST_PROFILE = affixKeySetting("transport.profiles.", "bind_host", + key -> listSetting(key, BIND_HOST, Function.identity(), Setting.Property.NodeScope)); + public static final Setting.AffixSetting> PUBLISH_HOST_PROFILE = affixKeySetting("transport.profiles.", "publish_host", + key -> listSetting(key, PUBLISH_HOST, Function.identity(), Setting.Property.NodeScope)); + public static final Setting.AffixSetting PORT_PROFILE = affixKeySetting("transport.profiles.", "port", + key -> new Setting(key, PORT, Function.identity(), Setting.Property.NodeScope)); + public static final Setting.AffixSetting PUBLISH_PORT_PROFILE = affixKeySetting("transport.profiles.", "publish_port", + key -> intSetting(key, -1, -1, Setting.Property.NodeScope)); + private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9); private static final int PING_DATA_SIZE = -1; private final CircuitBreakerService circuitBreakerService; @@ -173,6 +192,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i protected final ThreadPool threadPool; private final BigArrays bigArrays; protected final NetworkService networkService; + protected final Set profileSettings; protected volatile TransportServiceAdapter transportServiceAdapter; // node id to actual channel @@ -204,6 +224,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { super(settings); + this.profileSettings = getProfileSettings(settings); this.threadPool = threadPool; this.bigArrays = bigArrays; this.circuitBreakerService = circuitBreakerService; @@ -664,43 +685,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i return unmodifiableMap(new HashMap<>(profileBoundAddresses)); } - protected Map buildProfileSettings() { - // extract default profile first and create standard bootstrap - Map profiles = TRANSPORT_PROFILES_SETTING.get(settings).getAsGroups(true); - if (!profiles.containsKey(DEFAULT_PROFILE)) { - profiles = new HashMap<>(profiles); - profiles.put(DEFAULT_PROFILE, Settings.EMPTY); - } - Settings defaultSettings = profiles.get(DEFAULT_PROFILE); - Map result = new HashMap<>(); - // loop through all profiles and start them up, special handling for default one - for (Map.Entry entry : profiles.entrySet()) { - Settings profileSettings = entry.getValue(); - String name = entry.getKey(); - - if (!Strings.hasLength(name)) { - logger.info("transport profile configured without a name. skipping profile with settings [{}]", - profileSettings.toDelimitedString(',')); - continue; - } else if (DEFAULT_PROFILE.equals(name)) { - profileSettings = Settings.builder() - .put(profileSettings) - .put("port", profileSettings.get("port", PORT.get(this.settings))) - .build(); - } else if (profileSettings.get("port") == null) { - // if profile does not have a port, skip it - logger.info("No port configured for profile [{}], not binding", name); - continue; - } - Settings mergedSettings = Settings.builder() - .put(defaultSettings.getAsMap()) - .put(profileSettings.getAsMap()) - .build(); - result.put(name, mergedSettings); - } - return result; - } - @Override public List getLocalAddresses() { List local = new ArrayList<>(); @@ -712,15 +696,14 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i return local; } - protected void bindServer(final String name, final Settings profileSettings) { + protected void bindServer(ProfileSettings profileSettings) { // Bind and start to accept incoming connections. InetAddress hostAddresses[]; - String bindHosts[] = profileSettings.getAsArray("bind_host", - NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY)); + List profileBindHosts = profileSettings.bindHosts; try { - hostAddresses = networkService.resolveBindHostAddresses(bindHosts); + hostAddresses = networkService.resolveBindHostAddresses(profileBindHosts.toArray(Strings.EMPTY_ARRAY)); } catch (IOException e) { - throw new BindTransportException("Failed to resolve host " + Arrays.toString(bindHosts), e); + throw new BindTransportException("Failed to resolve host " + profileBindHosts, e); } if (logger.isDebugEnabled()) { String[] addresses = new String[hostAddresses.length]; @@ -734,15 +717,15 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i List boundAddresses = new ArrayList<>(); for (InetAddress hostAddress : hostAddresses) { - boundAddresses.add(bindToPort(name, hostAddress, profileSettings.get("port"))); + boundAddresses.add(bindToPort(profileSettings.profileName, hostAddress, profileSettings.portOrRange)); } - final BoundTransportAddress boundTransportAddress = createBoundTransportAddress(name, profileSettings, boundAddresses); + final BoundTransportAddress boundTransportAddress = createBoundTransportAddress(profileSettings, boundAddresses); - if (DEFAULT_PROFILE.equals(name)) { + if (profileSettings.isDefaultProfile) { this.boundAddress = boundTransportAddress; } else { - profileBoundAddresses.put(name, boundTransportAddress); + profileBoundAddresses.put(profileSettings.profileName, boundTransportAddress); } } @@ -779,7 +762,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i return boundSocket.get(); } - private BoundTransportAddress createBoundTransportAddress(String name, Settings profileSettings, + private BoundTransportAddress createBoundTransportAddress(ProfileSettings profileSettings, List boundAddresses) { String[] boundAddressesHostStrings = new String[boundAddresses.size()]; TransportAddress[] transportBoundAddresses = new TransportAddress[boundAddresses.size()]; @@ -789,37 +772,30 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i transportBoundAddresses[i] = new TransportAddress(boundAddress); } - String[] publishHosts; - if (DEFAULT_PROFILE.equals(name)) { - publishHosts = PUBLISH_HOST.get(settings).toArray(Strings.EMPTY_ARRAY); - } else { - publishHosts = profileSettings.getAsArray("publish_host", boundAddressesHostStrings); + List publishHosts = profileSettings.publishHosts; + if (profileSettings.isDefaultProfile == false && publishHosts.isEmpty()) { + publishHosts = Arrays.asList(boundAddressesHostStrings); } - if (publishHosts == null || publishHosts.length == 0) { - publishHosts = NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY); + if (publishHosts.isEmpty()) { + publishHosts = NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING.get(settings); } final InetAddress publishInetAddress; try { - publishInetAddress = networkService.resolvePublishHostAddresses(publishHosts); + publishInetAddress = networkService.resolvePublishHostAddresses(publishHosts.toArray(Strings.EMPTY_ARRAY)); } catch (Exception e) { throw new BindTransportException("Failed to resolve publish address", e); } - final int publishPort = resolvePublishPort(name, settings, profileSettings, boundAddresses, publishInetAddress); + final int publishPort = resolvePublishPort(profileSettings, boundAddresses, publishInetAddress); final TransportAddress publishAddress = new TransportAddress(new InetSocketAddress(publishInetAddress, publishPort)); return new BoundTransportAddress(transportBoundAddresses, publishAddress); } // package private for tests - public static int resolvePublishPort(String profileName, Settings settings, Settings profileSettings, - List boundAddresses, InetAddress publishInetAddress) { - int publishPort; - if (DEFAULT_PROFILE.equals(profileName)) { - publishPort = PUBLISH_PORT.get(settings); - } else { - publishPort = profileSettings.getAsInt("publish_port", -1); - } + public static int resolvePublishPort(ProfileSettings profileSettings, List boundAddresses, + InetAddress publishInetAddress) { + int publishPort = profileSettings.publishPort; // if port not explicitly provided, search for port of address in boundAddresses that matches publishInetAddress if (publishPort < 0) { @@ -844,7 +820,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } if (publishPort < 0) { - String profileExplanation = DEFAULT_PROFILE.equals(profileName) ? "" : " for profile " + profileName; + String profileExplanation = profileSettings.isDefaultProfile ? "" : " for profile " + profileSettings.profileName; throw new BindTransportException("Failed to auto-resolve publish port" + profileExplanation + ", multiple bound addresses " + boundAddresses + " with distinct ports and none of them matched the publish address (" + publishInetAddress + "). " + "Please specify a unique port by setting " + PORT.getKey() + " or " + @@ -1729,4 +1705,61 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i getNumOpenServerConnections(), readBytesMetric.count(), readBytesMetric.sum(), transmittedBytesMetric.count(), transmittedBytesMetric.sum()); } + + /** + * Returns all profile settings for the given settings object + */ + public static Set getProfileSettings(Settings settings) { + HashSet profiles = new HashSet<>(); + boolean isDefaultSet = false; + for (String profile : settings.getGroups("transport.profiles.", true).keySet()) { + profiles.add(new ProfileSettings(settings, profile)); + if (DEFAULT_PROFILE.equals(profile)) { + isDefaultSet = true; + } + } + if (isDefaultSet == false) { + profiles.add(new ProfileSettings(settings, DEFAULT_PROFILE)); + } + return Collections.unmodifiableSet(profiles); + } + + /** + * Representation of a transport profile settings for a transport.profiles.$profilename.* + */ + public static final class ProfileSettings { + public final String profileName; + public final boolean tcpNoDelay; + public final boolean tcpKeepAlive; + public final boolean reuseAddress; + public final ByteSizeValue sendBufferSize; + public final ByteSizeValue receiveBufferSize; + public final List bindHosts; + public final List publishHosts; + public final String portOrRange; + public final int publishPort; + public final boolean isDefaultProfile; + + public ProfileSettings(Settings settings, String profileName) { + this.profileName = profileName; + isDefaultProfile = DEFAULT_PROFILE.equals(profileName); + tcpKeepAlive = TCP_KEEP_ALIVE_PROFILE.getConcreteSettingForNamespace(profileName).get(settings); + tcpNoDelay = TCP_NO_DELAY_PROFILE.getConcreteSettingForNamespace(profileName).get(settings); + reuseAddress = TCP_REUSE_ADDRESS_PROFILE.getConcreteSettingForNamespace(profileName).get(settings); + sendBufferSize = TCP_SEND_BUFFER_SIZE_PROFILE.getConcreteSettingForNamespace(profileName).get(settings); + receiveBufferSize = TCP_RECEIVE_BUFFER_SIZE_PROFILE.getConcreteSettingForNamespace(profileName).get(settings); + List profileBindHosts = BIND_HOST_PROFILE.getConcreteSettingForNamespace(profileName).get(settings); + bindHosts = (profileBindHosts.isEmpty() ? NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING.get(settings) + : profileBindHosts); + publishHosts = PUBLISH_HOST_PROFILE.getConcreteSettingForNamespace(profileName).get(settings); + Setting concretePort = PORT_PROFILE.getConcreteSettingForNamespace(profileName); + if (concretePort.exists(settings) == false && isDefaultProfile == false) { + throw new IllegalStateException("profile [" + profileName + "] has no port configured"); + } + portOrRange = PORT_PROFILE.getConcreteSettingForNamespace(profileName).get(settings); + publishPort = isDefaultProfile ? PUBLISH_PORT.get(settings) : + PUBLISH_PORT_PROFILE.getConcreteSettingForNamespace(profileName).get(settings); + } + } + } diff --git a/core/src/test/java/org/elasticsearch/transport/PublishPortTests.java b/core/src/test/java/org/elasticsearch/transport/PublishPortTests.java index 42cc9b876ab..0f121f0c401 100644 --- a/core/src/test/java/org/elasticsearch/transport/PublishPortTests.java +++ b/core/src/test/java/org/elasticsearch/transport/PublishPortTests.java @@ -42,34 +42,39 @@ public class PublishPortTests extends ESTestCase { boolean useProfile = randomBoolean(); final String profile; - final Settings settings; - final Settings profileSettings; + Settings baseSettings; + Settings settings; if (useProfile) { - profile = "some_profile"; + baseSettings = Settings.builder().put("transport.profiles.some_profile.port", 0).build(); settings = randomBoolean() ? Settings.EMPTY : Settings.builder().put(TcpTransport.PUBLISH_PORT.getKey(), 9081).build(); - profileSettings = Settings.builder().put("publish_port", 9080).build(); + settings = Settings.builder().put(settings).put(baseSettings).put("transport.profiles.some_profile.publish_port", 9080).build(); + profile = "some_profile"; + } else { - profile = TcpTransport.DEFAULT_PROFILE; + baseSettings = Settings.EMPTY; settings = Settings.builder().put(TcpTransport.PUBLISH_PORT.getKey(), 9081).build(); - profileSettings = randomBoolean() ? Settings.EMPTY : Settings.builder().put("publish_port", 9080).build();; + settings = randomBoolean() ? settings : + Settings.builder().put(settings).put("transport.profiles.default.publish_port", 9080).build(); + profile = "default"; + } - int publishPort = resolvePublishPort(profile, settings, profileSettings, + int publishPort = resolvePublishPort(new TcpTransport.ProfileSettings(settings, profile), randomAddresses(), getByName("127.0.0.2")); assertThat("Publish port should be explicitly set", publishPort, equalTo(useProfile ? 9080 : 9081)); - publishPort = resolvePublishPort(profile, Settings.EMPTY, Settings.EMPTY, + publishPort = resolvePublishPort(new TcpTransport.ProfileSettings(baseSettings, profile), asList(address("127.0.0.1", boundPort), address("127.0.0.2", otherBoundPort)), getByName("127.0.0.1")); assertThat("Publish port should be derived from matched address", publishPort, equalTo(boundPort)); - publishPort = resolvePublishPort(profile, Settings.EMPTY, Settings.EMPTY, + publishPort = resolvePublishPort(new TcpTransport.ProfileSettings(baseSettings, profile), asList(address("127.0.0.1", boundPort), address("127.0.0.2", boundPort)), getByName("127.0.0.3")); assertThat("Publish port should be derived from unique port of bound addresses", publishPort, equalTo(boundPort)); try { - resolvePublishPort(profile, Settings.EMPTY, Settings.EMPTY, + resolvePublishPort(new TcpTransport.ProfileSettings(baseSettings, profile), asList(address("127.0.0.1", boundPort), address("127.0.0.2", otherBoundPort)), getByName("127.0.0.3")); fail("Expected BindTransportException as publish_port not specified and non-unique port of bound addresses"); @@ -77,13 +82,13 @@ public class PublishPortTests extends ESTestCase { assertThat(e.getMessage(), containsString("Failed to auto-resolve publish port")); } - publishPort = resolvePublishPort(profile, Settings.EMPTY, Settings.EMPTY, + publishPort = resolvePublishPort(new TcpTransport.ProfileSettings(baseSettings, profile), asList(address("0.0.0.0", boundPort), address("127.0.0.2", otherBoundPort)), getByName("127.0.0.1")); assertThat("Publish port should be derived from matching wildcard address", publishPort, equalTo(boundPort)); if (NetworkUtils.SUPPORTS_V6) { - publishPort = resolvePublishPort(profile, Settings.EMPTY, Settings.EMPTY, + publishPort = resolvePublishPort(new TcpTransport.ProfileSettings(baseSettings, profile), asList(address("0.0.0.0", boundPort), address("127.0.0.2", otherBoundPort)), getByName("::1")); assertThat("Publish port should be derived from matching wildcard address", publishPort, equalTo(boundPort)); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 5e12116f00a..5623005f7d9 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -62,7 +62,6 @@ import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportServiceAdapter; import java.io.IOException; import java.net.InetSocketAddress; @@ -71,6 +70,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -142,10 +142,6 @@ public class Netty4Transport extends TcpTransport { } } - TransportServiceAdapter transportServiceAdapter() { - return transportServiceAdapter; - } - @Override protected void doStart() { boolean success = false; @@ -154,14 +150,9 @@ public class Netty4Transport extends TcpTransport { if (NetworkService.NETWORK_SERVER.get(settings)) { final Netty4OpenChannelsHandler openChannels = new Netty4OpenChannelsHandler(logger); this.serverOpenChannels = openChannels; - // loop through all profiles and start them up, special handling for default one - for (Map.Entry entry : buildProfileSettings().entrySet()) { - // merge fallback settings with default settings with profile settings so we have complete settings with default values - final Settings settings = Settings.builder() - .put(createFallbackSettings()) - .put(entry.getValue()).build(); - createServerBootstrap(entry.getKey(), settings); - bindServer(entry.getKey(), settings); + for (ProfileSettings profileSettings : profileSettings) { + createServerBootstrap(profileSettings); + bindServer(profileSettings); } } super.doStart(); @@ -204,46 +195,12 @@ public class Netty4Transport extends TcpTransport { return bootstrap; } - private Settings createFallbackSettings() { - Settings.Builder fallbackSettingsBuilder = Settings.builder(); - - List fallbackBindHost = TcpTransport.BIND_HOST.get(settings); - if (fallbackBindHost.isEmpty() == false) { - fallbackSettingsBuilder.putArray("bind_host", fallbackBindHost); - } - - List fallbackPublishHost = TcpTransport.PUBLISH_HOST.get(settings); - if (fallbackPublishHost.isEmpty() == false) { - fallbackSettingsBuilder.putArray("publish_host", fallbackPublishHost); - } - - boolean fallbackTcpNoDelay = TcpTransport.TCP_NO_DELAY.get(settings); - fallbackSettingsBuilder.put("tcp_no_delay", fallbackTcpNoDelay); - - boolean fallbackTcpKeepAlive = TcpTransport.TCP_KEEP_ALIVE.get(settings); - fallbackSettingsBuilder.put("tcp_keep_alive", fallbackTcpKeepAlive); - - boolean fallbackReuseAddress = TcpTransport.TCP_REUSE_ADDRESS.get(settings); - fallbackSettingsBuilder.put("reuse_address", fallbackReuseAddress); - - ByteSizeValue fallbackTcpSendBufferSize = TCP_SEND_BUFFER_SIZE.get(settings); - if (fallbackTcpSendBufferSize.getBytes() >= 0) { - fallbackSettingsBuilder.put("tcp_send_buffer_size", fallbackTcpSendBufferSize); - } - - ByteSizeValue fallbackTcpBufferSize = TCP_RECEIVE_BUFFER_SIZE.get(settings); - if (fallbackTcpBufferSize.getBytes() >= 0) { - fallbackSettingsBuilder.put("tcp_receive_buffer_size", fallbackTcpBufferSize); - } - - return fallbackSettingsBuilder.build(); - } - - private void createServerBootstrap(String name, Settings settings) { + private void createServerBootstrap(ProfileSettings profileSettings) { + String name = profileSettings.profileName; if (logger.isDebugEnabled()) { logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], " + "connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]", - name, workerCount, settings.get("port"), settings.get("bind_host"), settings.get("publish_host"), compress, + name, workerCount, profileSettings.portOrRange, profileSettings.bindHosts, profileSettings.publishHosts, compress, defaultConnectionProfile.getConnectTimeout(), defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY), defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK), @@ -253,6 +210,7 @@ public class Netty4Transport extends TcpTransport { receivePredictorMin, receivePredictorMax); } + final ThreadFactory workerFactory = daemonThreadFactory(this.settings, TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, name); final ServerBootstrap serverBootstrap = new ServerBootstrap(); @@ -260,34 +218,31 @@ public class Netty4Transport extends TcpTransport { serverBootstrap.group(new NioEventLoopGroup(workerCount, workerFactory)); serverBootstrap.channel(NioServerSocketChannel.class); - serverBootstrap.childHandler(getServerChannelInitializer(name, settings)); + serverBootstrap.childHandler(getServerChannelInitializer(name)); - serverBootstrap.childOption(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings)); - serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings)); + serverBootstrap.childOption(ChannelOption.TCP_NODELAY, profileSettings.tcpNoDelay); + serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, profileSettings.tcpKeepAlive); - final ByteSizeValue tcpSendBufferSize = TCP_SEND_BUFFER_SIZE.getDefault(settings); - if (tcpSendBufferSize != null && tcpSendBufferSize.getBytes() > 0) { - serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes())); + if (profileSettings.sendBufferSize.getBytes() != -1) { + serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(profileSettings.sendBufferSize.getBytes())); } - final ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.getDefault(settings); - if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.getBytes() > 0) { - serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.bytesAsInt())); + if (profileSettings.receiveBufferSize.getBytes() != -1) { + serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(profileSettings.receiveBufferSize.bytesAsInt())); } serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator); serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator); - final boolean reuseAddress = TCP_REUSE_ADDRESS.get(settings); - serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress); - serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress); + serverBootstrap.option(ChannelOption.SO_REUSEADDR, profileSettings.reuseAddress); + serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, profileSettings.reuseAddress); serverBootstrap.validate(); serverBootstraps.put(name, serverBootstrap); } - protected ChannelHandler getServerChannelInitializer(String name, Settings settings) { - return new ServerChannelInitializer(name, settings); + protected ChannelHandler getServerChannelInitializer(String name) { + return new ServerChannelInitializer(name); } protected ChannelHandler getClientChannelInitializer() { @@ -455,11 +410,9 @@ public class Netty4Transport extends TcpTransport { protected class ServerChannelInitializer extends ChannelInitializer { protected final String name; - protected final Settings settings; - protected ServerChannelInitializer(String name, Settings settings) { + protected ServerChannelInitializer(String name) { this.name = name; - this.settings = settings; } @Override diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java index dba5ca2d82d..295e7ab389c 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java @@ -92,9 +92,9 @@ public class NettyTransportMultiPortTests extends ESTestCase { .build(); ThreadPool threadPool = new TestThreadPool("tst"); - try (TcpTransport transport = startTransport(settings, threadPool)) { - assertEquals(0, transport.profileBoundAddresses().size()); - assertEquals(1, transport.boundAddress().boundAddresses().length); + try { + IllegalStateException ex = expectThrows(IllegalStateException.class, () -> startTransport(settings, threadPool)); + assertEquals("profile [client1] has no port configured", ex.getMessage()); } finally { terminate(threadPool); } @@ -116,24 +116,6 @@ public class NettyTransportMultiPortTests extends ESTestCase { } } - public void testThatProfileWithoutValidNameIsIgnored() throws Exception { - Settings settings = Settings.builder() - .put("network.host", host) - .put(TcpTransport.PORT.getKey(), 0) - // mimics someone trying to define a profile for .local which is the profile for a node request to itself - .put("transport.profiles." + TransportService.DIRECT_RESPONSE_PROFILE + ".port", 22) // will not actually bind to this - .put("transport.profiles..port", 23) // will not actually bind to this - .build(); - - ThreadPool threadPool = new TestThreadPool("tst"); - try (TcpTransport transport = startTransport(settings, threadPool)) { - assertEquals(0, transport.profileBoundAddresses().size()); - assertEquals(1, transport.boundAddress().boundAddresses().length); - } finally { - terminate(threadPool); - } - } - private TcpTransport startTransport(Settings settings, ThreadPool threadPool) { BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); TcpTransport transport = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()), @@ -143,5 +125,4 @@ public class NettyTransportMultiPortTests extends ESTestCase { assertThat(transport.lifecycleState(), is(Lifecycle.State.STARTED)); return transport; } - } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index ef1e127930c..c30731b9b88 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -30,12 +30,15 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; @@ -54,6 +57,8 @@ import org.junit.Before; import java.io.IOException; import java.io.UncheckedIOException; +import java.net.Inet4Address; +import java.net.Inet6Address; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; @@ -75,6 +80,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collector; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -2419,4 +2426,176 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { serviceC.close(); } } + + public void testTransportProfilesWithPortAndHost() { + boolean doIPV6 = NetworkUtils.SUPPORTS_V6; + List hosts; + if (doIPV6) { + hosts = Arrays.asList("_local:ipv6_", "_local:ipv4_"); + } else { + hosts = Arrays.asList("_local:ipv4_"); + } + try (MockTransportService serviceC = build(Settings.builder() + .put("name", "TS_TEST") + .put("transport.profiles.default.bind_host", "_local:ipv4_") + .put("transport.profiles.some_profile.port", "8900-9000") + .put("transport.profiles.some_profile.bind_host", "_local:ipv4_") + .put("transport.profiles.some_other_profile.port", "8700-8800") + .putArray("transport.profiles.some_other_profile.bind_host", hosts) + .putArray("transport.profiles.some_other_profile.publish_host", "_local:ipv4_") + .build(), version0, null, true)) { + + serviceC.start(); + serviceC.acceptIncomingRequests(); + Map profileBoundAddresses = serviceC.transport.profileBoundAddresses(); + assertTrue(profileBoundAddresses.containsKey("some_profile")); + assertTrue(profileBoundAddresses.containsKey("some_other_profile")); + assertTrue(profileBoundAddresses.get("some_profile").publishAddress().getPort() >= 8900); + assertTrue(profileBoundAddresses.get("some_profile").publishAddress().getPort() < 9000); + assertTrue(profileBoundAddresses.get("some_other_profile").publishAddress().getPort() >= 8700); + assertTrue(profileBoundAddresses.get("some_other_profile").publishAddress().getPort() < 8800); + assertEquals(profileBoundAddresses.get("some_profile").boundAddresses().length, 1); + if (doIPV6) { + assertTrue(profileBoundAddresses.get("some_other_profile").boundAddresses().length >= 2); + int ipv4 = 0; + int ipv6 = 0; + for (TransportAddress addr : profileBoundAddresses.get("some_other_profile").boundAddresses()) { + if (addr.address().getAddress() instanceof Inet4Address) { + ipv4++; + } else if (addr.address().getAddress() instanceof Inet6Address) { + ipv6++; + } else { + fail("what kind of address is this: " + addr.address().getAddress()); + } + } + assertTrue("num ipv4 is wrong: " + ipv4, ipv4 >= 1); + assertTrue("num ipv6 is wrong: " + ipv6, ipv6 >= 1); + } else { + assertEquals(profileBoundAddresses.get("some_other_profile").boundAddresses().length, 1); + } + assertTrue(profileBoundAddresses.get("some_other_profile").publishAddress().address().getAddress() instanceof Inet4Address); + } + } + + public void testProfileSettings() { + boolean enable = randomBoolean(); + Settings globalSettings = Settings.builder() + .put("network.tcp.no_delay", enable) + .put("network.tcp.keep_alive", enable) + .put("network.tcp.reuse_address", enable) + .put("network.tcp.send_buffer_size", "43000b") + .put("network.tcp.receive_buffer_size", "42000b") + .put("network.publish_host", "the_publish_host") + .put("network.bind_host", "the_bind_host") + .build(); + + Settings globalSettings2 = Settings.builder() + .put("network.tcp.no_delay", !enable) + .put("network.tcp.keep_alive", !enable) + .put("network.tcp.reuse_address", !enable) + .put("network.tcp.send_buffer_size", "4b") + .put("network.tcp.receive_buffer_size", "3b") + .put("network.publish_host", "another_publish_host") + .put("network.bind_host", "another_bind_host") + .build(); + + Settings transportSettings = Settings.builder() + .put("transport.tcp_no_delay", enable) + .put("transport.tcp.keep_alive", enable) + .put("transport.tcp.reuse_address", enable) + .put("transport.tcp.send_buffer_size", "43000b") + .put("transport.tcp.receive_buffer_size", "42000b") + .put("transport.publish_host", "the_publish_host") + .put("transport.tcp.port", "9700-9800") + .put("transport.bind_host", "the_bind_host") + .put(globalSettings2) + .build(); + + Settings transportSettings2 = Settings.builder() + .put("transport.tcp_no_delay", !enable) + .put("transport.tcp.keep_alive", !enable) + .put("transport.tcp.reuse_address", !enable) + .put("transport.tcp.send_buffer_size", "5b") + .put("transport.tcp.receive_buffer_size", "6b") + .put("transport.publish_host", "another_publish_host") + .put("transport.tcp.port", "9702-9802") + .put("transport.bind_host", "another_bind_host") + .put(globalSettings2) + .build(); + Settings defaultProfileSettings = Settings.builder() + .put("transport.profiles.default.tcp_no_delay", enable) + .put("transport.profiles.default.tcp_keep_alive", enable) + .put("transport.profiles.default.reuse_address", enable) + .put("transport.profiles.default.send_buffer_size", "43000b") + .put("transport.profiles.default.receive_buffer_size", "42000b") + .put("transport.profiles.default.port", "9700-9800") + .put("transport.profiles.default.publish_host", "the_publish_host") + .put("transport.profiles.default.bind_host", "the_bind_host") + .put("transport.profiles.default.publish_port", 42) + .put(randomBoolean() ? transportSettings2 : globalSettings2) // ensure that we have profile precedence + .build(); + + Settings profileSettings = Settings.builder() + .put("transport.profiles.some_profile.tcp_no_delay", enable) + .put("transport.profiles.some_profile.tcp_keep_alive", enable) + .put("transport.profiles.some_profile.reuse_address", enable) + .put("transport.profiles.some_profile.send_buffer_size", "43000b") + .put("transport.profiles.some_profile.receive_buffer_size", "42000b") + .put("transport.profiles.some_profile.port", "9700-9800") + .put("transport.profiles.some_profile.publish_host", "the_publish_host") + .put("transport.profiles.some_profile.bind_host", "the_bind_host") + .put("transport.profiles.some_profile.publish_port", 42) + .put(randomBoolean() ? transportSettings2 : globalSettings2) // ensure that we have profile precedence + .put(randomBoolean() ? defaultProfileSettings : Settings.EMPTY) + .build(); + + Settings randomSettings = randomFrom(random(), globalSettings, transportSettings, profileSettings); + ClusterSettings clusterSettings = new ClusterSettings(randomSettings, ClusterSettings + .BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.validate(randomSettings); + TcpTransport.ProfileSettings settings = new TcpTransport.ProfileSettings( + Settings.builder().put(randomSettings).put("transport.profiles.some_profile.port", "9700-9800").build(), // port is required + "some_profile"); + + assertEquals(enable, settings.tcpNoDelay); + assertEquals(enable, settings.tcpKeepAlive); + assertEquals(enable, settings.reuseAddress); + assertEquals(43000, settings.sendBufferSize.getBytes()); + assertEquals(42000, settings.receiveBufferSize.getBytes()); + if (randomSettings == profileSettings) { + assertEquals(42, settings.publishPort); + } else { + assertEquals(-1, settings.publishPort); + } + + if (randomSettings == globalSettings) { // publish host has no global fallback for the profile since we later resolve it based on + // the bound address + assertEquals(Collections.emptyList(), settings.publishHosts); + } else { + assertEquals(Collections.singletonList("the_publish_host"), settings.publishHosts); + } + assertEquals("9700-9800", settings.portOrRange); + assertEquals(Collections.singletonList("the_bind_host"), settings.bindHosts); + } + + public void testProfilesIncludesDefault() { + Set profileSettings = TcpTransport.getProfileSettings(Settings.EMPTY); + assertEquals(1, profileSettings.size()); + assertEquals(TcpTransport.DEFAULT_PROFILE, profileSettings.stream().findAny().get().profileName); + + profileSettings = TcpTransport.getProfileSettings(Settings.builder() + .put("transport.profiles.test.port", "0") + .build()); + assertEquals(2, profileSettings.size()); + assertEquals(new HashSet<>(Arrays.asList("default", "test")), profileSettings.stream().map(s -> s.profileName).collect(Collectors + .toSet())); + + profileSettings = TcpTransport.getProfileSettings(Settings.builder() + .put("transport.profiles.test.port", "0") + .put("transport.profiles.default.port", "0") + .build()); + assertEquals(2, profileSettings.size()); + assertEquals(new HashSet<>(Arrays.asList("default", "test")), profileSettings.stream().map(s -> s.profileName).collect(Collectors + .toSet())); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 381520d4627..f2849705e05 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -389,10 +389,8 @@ public class MockTcpTransport extends TcpTransport try { if (NetworkService.NETWORK_SERVER.get(settings)) { // loop through all profiles and start them up, special handling for default one - for (Map.Entry entry : buildProfileSettings().entrySet()) { - final Settings profileSettings = Settings.builder() - .put(entry.getValue()).build(); - bindServer(entry.getKey(), profileSettings); + for (ProfileSettings profileSettings : profileSettings) { + bindServer(profileSettings); } } super.doStart(); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index a7c7d187806..335d438f577 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -45,7 +45,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; -import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadFactory; import java.util.function.Consumer; @@ -69,7 +69,6 @@ public class NioTransport extends TcpTransport { intSetting("transport.nio.acceptor_count", 1, 1, Setting.Property.NodeScope); private final TcpReadHandler tcpReadHandler = new TcpReadHandler(this); - private final BigArrays bigArrays; private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); private final OpenChannels openChannels = new OpenChannels(logger); private final ArrayList acceptors = new ArrayList<>(); @@ -80,7 +79,6 @@ public class NioTransport extends TcpTransport { public NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { super("nio", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService); - this.bigArrays = bigArrays; } @Override @@ -175,13 +173,9 @@ public class NioTransport extends TcpTransport { acceptors.add(acceptor); } // loop through all profiles and start them up, special handling for default one - for (Map.Entry entry : buildProfileSettings().entrySet()) { - // merge fallback settings with default settings with profile settings so we have complete settings with default values - final Settings profileSettings = Settings.builder() - .put(createFallbackSettings()) - .put(entry.getValue()).build(); - profileToChannelFactory.putIfAbsent(entry.getKey(), new ChannelFactory(profileSettings, tcpReadHandler)); - bindServer(entry.getKey(), profileSettings); + for (ProfileSettings profileSettings : profileSettings) { + profileToChannelFactory.putIfAbsent(profileSettings.profileName, new ChannelFactory(profileSettings, tcpReadHandler)); + bindServer(profileSettings); } } client = createClient(); @@ -269,7 +263,7 @@ public class NioTransport extends TcpTransport { private NioClient createClient() { Supplier selectorSupplier = new RoundRobinSelectorSupplier(socketSelectors); - ChannelFactory channelFactory = new ChannelFactory(settings, tcpReadHandler); + ChannelFactory channelFactory = new ChannelFactory(new ProfileSettings(settings, "default"), tcpReadHandler); return new NioClient(logger, openChannels, selectorSupplier, defaultConnectionProfile.getConnectTimeout(), channelFactory); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java index 84c36d41104..420af25fe6a 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java @@ -19,8 +19,6 @@ package org.elasticsearch.transport.nio.channel; -import org.elasticsearch.common.CheckedSupplier; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.mocksocket.PrivilegedSocketAccess; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.nio.TcpReadHandler; @@ -31,9 +29,6 @@ import java.net.ServerSocket; import java.net.Socket; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import java.security.AccessController; -import java.security.PrivilegedActionException; -import java.security.PrivilegedExceptionAction; public class ChannelFactory { @@ -44,12 +39,12 @@ public class ChannelFactory { private final int tcpReceiveBufferSize; private final TcpReadHandler handler; - public ChannelFactory(Settings settings, TcpReadHandler handler) { - tcpNoDelay = TcpTransport.TCP_NO_DELAY.get(settings); - tcpKeepAlive = TcpTransport.TCP_KEEP_ALIVE.get(settings); - tcpReusedAddress = TcpTransport.TCP_REUSE_ADDRESS.get(settings); - tcpSendBufferSize = Math.toIntExact(TcpTransport.TCP_SEND_BUFFER_SIZE.get(settings).getBytes()); - tcpReceiveBufferSize = Math.toIntExact(TcpTransport.TCP_RECEIVE_BUFFER_SIZE.get(settings).getBytes()); + public ChannelFactory(TcpTransport.ProfileSettings profileSettings, TcpReadHandler handler) { + tcpNoDelay = profileSettings.tcpNoDelay; + tcpKeepAlive = profileSettings.tcpKeepAlive; + tcpReusedAddress = profileSettings.reuseAddress; + tcpSendBufferSize = Math.toIntExact(profileSettings.sendBufferSize.getBytes()); + tcpReceiveBufferSize = Math.toIntExact(profileSettings.receiveBufferSize.getBytes()); this.handler = handler; } @@ -94,12 +89,4 @@ public class ChannelFactory { socket.setSendBufferSize(tcpReceiveBufferSize); } } - - private static T getSocketChannel(CheckedSupplier supplier) throws IOException { - try { - return AccessController.doPrivileged((PrivilegedExceptionAction) supplier::get); - } catch (PrivilegedActionException e) { - throw (IOException) e.getCause(); - } - } } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/AbstractNioChannelTestCase.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/AbstractNioChannelTestCase.java index c3909a06440..7db9f48ca45 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/AbstractNioChannelTestCase.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/AbstractNioChannelTestCase.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.mocksocket.MockServerSocket; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.nio.TcpReadHandler; import org.junit.After; import org.junit.Before; @@ -39,7 +40,8 @@ import static org.mockito.Mockito.mock; public abstract class AbstractNioChannelTestCase extends ESTestCase { - ChannelFactory channelFactory = new ChannelFactory(Settings.EMPTY, mock(TcpReadHandler.class)); + ChannelFactory channelFactory = new ChannelFactory(new TcpTransport.ProfileSettings(Settings.EMPTY, "default"), + mock(TcpReadHandler.class)); MockServerSocket mockServerSocket; private Thread serverThread;