From 1f67d079b145a26036c226c847f348504c273df2 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 7 Jul 2017 09:40:59 +0200 Subject: [PATCH] Validate `transport.profiles.*` settings (#25508) Transport profiles unfortunately have never been validated. Yet, it's very easy to make a mistake when configuring profiles which will most likely stay undetected since we don't validate the settings but allow almost everything based on the wildcard in `transport.profiles.*`. This change removes the settings subset based parsing of profiles but rather uses concrete affix settings for the profiles which makes it easier to fall back to higher level settings since the fallback settings are present when the profile setting is parsed. Previously, it was unclear in the code which setting is used ie. if the profiles settings (with removed prefixes) or the global node setting. There is no distinction anymore since we don't pull prefix based settings. --- .../org/elasticsearch/bootstrap/Security.java | 32 +--- .../common/settings/ClusterSettings.java | 10 +- .../elasticsearch/transport/TcpTransport.java | 167 +++++++++------- .../transport/PublishPortTests.java | 29 +-- .../transport/netty4/Netty4Transport.java | 87 ++------- .../netty4/NettyTransportMultiPortTests.java | 25 +-- .../AbstractSimpleTransportTestCase.java | 179 ++++++++++++++++++ .../transport/MockTcpTransport.java | 6 +- .../transport/nio/NioTransport.java | 16 +- .../transport/nio/channel/ChannelFactory.java | 25 +-- .../channel/AbstractNioChannelTestCase.java | 4 +- 11 files changed, 353 insertions(+), 227 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/bootstrap/Security.java b/core/src/main/java/org/elasticsearch/bootstrap/Security.java index 075456cd9e4..2b0812c5577 100644 --- a/core/src/main/java/org/elasticsearch/bootstrap/Security.java +++ b/core/src/main/java/org/elasticsearch/bootstrap/Security.java @@ -23,6 +23,7 @@ import org.elasticsearch.SecureSM; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.http.HttpTransportSettings; @@ -320,30 +321,15 @@ final class Security { * @param policy the {@link Permissions} instance to apply the dynamic {@link SocketPermission}s to * @param settings the {@link Settings} instance to read the transport settings from */ - private static void addSocketPermissionForTransportProfiles( - final Permissions policy, - final Settings settings) { + private static void addSocketPermissionForTransportProfiles(final Permissions policy, final Settings settings) { // transport is way over-engineered - final Map profiles = new HashMap<>(TcpTransport.TRANSPORT_PROFILES_SETTING.get(settings).getAsGroups()); - profiles.putIfAbsent(TcpTransport.DEFAULT_PROFILE, Settings.EMPTY); - - // loop through all profiles and add permissions for each one, if it's valid; otherwise Netty transports are lenient and ignores it - for (final Map.Entry entry : profiles.entrySet()) { - final Settings profileSettings = entry.getValue(); - final String name = entry.getKey(); - - // a profile is only valid if it's the default profile, or if it has an actual name and specifies a port - // TODO: can this leniency be removed? - final boolean valid = - TcpTransport.DEFAULT_PROFILE.equals(name) || - (name != null && name.length() > 0 && profileSettings.get("port") != null); - if (valid) { - final String transportRange = profileSettings.get("port"); - if (transportRange != null) { - addSocketPermissionForPortRange(policy, transportRange); - } else { - addSocketPermissionForTransport(policy, settings); - } + Set profiles = TcpTransport.getProfileSettings(settings); + Set uniquePortRanges = new HashSet<>(); + // loop through all profiles and add permissions for each one + for (final TcpTransport.ProfileSettings profile : profiles) { + if (uniquePortRanges.add(profile.portOrRange)) { + // profiles fall back to the transport.port if it's not explicit but we want to only add one permission per range + addSocketPermissionForPortRange(policy, profile.portOrRange); } } } diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index e478ea49a01..d70415718e3 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -270,12 +270,20 @@ public final class ClusterSettings extends AbstractScopedSettings { HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING, HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING, Transport.TRANSPORT_TCP_COMPRESS, - TcpTransport.TRANSPORT_PROFILES_SETTING, TcpTransport.HOST, TcpTransport.PUBLISH_HOST, TcpTransport.BIND_HOST, TcpTransport.PUBLISH_PORT, TcpTransport.PORT, + TcpTransport.BIND_HOST_PROFILE, + TcpTransport.PUBLISH_HOST_PROFILE, + TcpTransport.PUBLISH_PORT_PROFILE, + TcpTransport.PORT_PROFILE, + TcpTransport.TCP_NO_DELAY_PROFILE, + TcpTransport.TCP_KEEP_ALIVE_PROFILE, + TcpTransport.TCP_REUSE_ADDRESS_PROFILE, + TcpTransport.TCP_SEND_BUFFER_SIZE_PROFILE, + TcpTransport.TCP_RECEIVE_BUFFER_SIZE_PROFILE, TcpTransport.CONNECTIONS_PER_NODE_RECOVERY, TcpTransport.CONNECTIONS_PER_NODE_BULK, TcpTransport.CONNECTIONS_PER_NODE_REG, diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index f03b949c581..7e1d3422de9 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -108,6 +108,7 @@ import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.Collections.unmodifiableMap; +import static org.elasticsearch.common.settings.Setting.affixKeySetting; import static org.elasticsearch.common.settings.Setting.boolSetting; import static org.elasticsearch.common.settings.Setting.groupSetting; import static org.elasticsearch.common.settings.Setting.intSetting; @@ -133,8 +134,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i public static final Setting PUBLISH_PORT = intSetting("transport.publish_port", -1, -1, Setting.Property.NodeScope); public static final String DEFAULT_PROFILE = "default"; - public static final Setting TRANSPORT_PROFILES_SETTING = - groupSetting("transport.profiles.", Setting.Property.Dynamic, Setting.Property.NodeScope); // the scheduled internal ping interval setting, defaults to disabled (-1) public static final Setting PING_SCHEDULE = timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), Setting.Property.NodeScope); @@ -164,6 +163,26 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i Setting.Property.NodeScope); + public static final Setting.AffixSetting TCP_NO_DELAY_PROFILE = affixKeySetting("transport.profiles.", "tcp_no_delay", + key -> boolSetting(key, TcpTransport.TCP_NO_DELAY, Setting.Property.NodeScope)); + public static final Setting.AffixSetting TCP_KEEP_ALIVE_PROFILE = affixKeySetting("transport.profiles.", "tcp_keep_alive", + key -> boolSetting(key, TcpTransport.TCP_KEEP_ALIVE, Setting.Property.NodeScope)); + public static final Setting.AffixSetting TCP_REUSE_ADDRESS_PROFILE = affixKeySetting("transport.profiles.", "reuse_address", + key -> boolSetting(key, TcpTransport.TCP_REUSE_ADDRESS, Setting.Property.NodeScope)); + public static final Setting.AffixSetting TCP_SEND_BUFFER_SIZE_PROFILE = affixKeySetting("transport.profiles.", + "send_buffer_size", key -> Setting.byteSizeSetting(key, TcpTransport.TCP_SEND_BUFFER_SIZE, Setting.Property.NodeScope)); + public static final Setting.AffixSetting TCP_RECEIVE_BUFFER_SIZE_PROFILE = affixKeySetting("transport.profiles.", + "receive_buffer_size", key -> Setting.byteSizeSetting(key, TcpTransport.TCP_RECEIVE_BUFFER_SIZE, Setting.Property.NodeScope)); + + public static final Setting.AffixSetting> BIND_HOST_PROFILE = affixKeySetting("transport.profiles.", "bind_host", + key -> listSetting(key, BIND_HOST, Function.identity(), Setting.Property.NodeScope)); + public static final Setting.AffixSetting> PUBLISH_HOST_PROFILE = affixKeySetting("transport.profiles.", "publish_host", + key -> listSetting(key, PUBLISH_HOST, Function.identity(), Setting.Property.NodeScope)); + public static final Setting.AffixSetting PORT_PROFILE = affixKeySetting("transport.profiles.", "port", + key -> new Setting(key, PORT, Function.identity(), Setting.Property.NodeScope)); + public static final Setting.AffixSetting PUBLISH_PORT_PROFILE = affixKeySetting("transport.profiles.", "publish_port", + key -> intSetting(key, -1, -1, Setting.Property.NodeScope)); + private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9); private static final int PING_DATA_SIZE = -1; private final CircuitBreakerService circuitBreakerService; @@ -173,6 +192,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i protected final ThreadPool threadPool; private final BigArrays bigArrays; protected final NetworkService networkService; + protected final Set profileSettings; protected volatile TransportServiceAdapter transportServiceAdapter; // node id to actual channel @@ -204,6 +224,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { super(settings); + this.profileSettings = getProfileSettings(settings); this.threadPool = threadPool; this.bigArrays = bigArrays; this.circuitBreakerService = circuitBreakerService; @@ -664,43 +685,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i return unmodifiableMap(new HashMap<>(profileBoundAddresses)); } - protected Map buildProfileSettings() { - // extract default profile first and create standard bootstrap - Map profiles = TRANSPORT_PROFILES_SETTING.get(settings).getAsGroups(true); - if (!profiles.containsKey(DEFAULT_PROFILE)) { - profiles = new HashMap<>(profiles); - profiles.put(DEFAULT_PROFILE, Settings.EMPTY); - } - Settings defaultSettings = profiles.get(DEFAULT_PROFILE); - Map result = new HashMap<>(); - // loop through all profiles and start them up, special handling for default one - for (Map.Entry entry : profiles.entrySet()) { - Settings profileSettings = entry.getValue(); - String name = entry.getKey(); - - if (!Strings.hasLength(name)) { - logger.info("transport profile configured without a name. skipping profile with settings [{}]", - profileSettings.toDelimitedString(',')); - continue; - } else if (DEFAULT_PROFILE.equals(name)) { - profileSettings = Settings.builder() - .put(profileSettings) - .put("port", profileSettings.get("port", PORT.get(this.settings))) - .build(); - } else if (profileSettings.get("port") == null) { - // if profile does not have a port, skip it - logger.info("No port configured for profile [{}], not binding", name); - continue; - } - Settings mergedSettings = Settings.builder() - .put(defaultSettings.getAsMap()) - .put(profileSettings.getAsMap()) - .build(); - result.put(name, mergedSettings); - } - return result; - } - @Override public List getLocalAddresses() { List local = new ArrayList<>(); @@ -712,15 +696,14 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i return local; } - protected void bindServer(final String name, final Settings profileSettings) { + protected void bindServer(ProfileSettings profileSettings) { // Bind and start to accept incoming connections. InetAddress hostAddresses[]; - String bindHosts[] = profileSettings.getAsArray("bind_host", - NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY)); + List profileBindHosts = profileSettings.bindHosts; try { - hostAddresses = networkService.resolveBindHostAddresses(bindHosts); + hostAddresses = networkService.resolveBindHostAddresses(profileBindHosts.toArray(Strings.EMPTY_ARRAY)); } catch (IOException e) { - throw new BindTransportException("Failed to resolve host " + Arrays.toString(bindHosts), e); + throw new BindTransportException("Failed to resolve host " + profileBindHosts, e); } if (logger.isDebugEnabled()) { String[] addresses = new String[hostAddresses.length]; @@ -734,15 +717,15 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i List boundAddresses = new ArrayList<>(); for (InetAddress hostAddress : hostAddresses) { - boundAddresses.add(bindToPort(name, hostAddress, profileSettings.get("port"))); + boundAddresses.add(bindToPort(profileSettings.profileName, hostAddress, profileSettings.portOrRange)); } - final BoundTransportAddress boundTransportAddress = createBoundTransportAddress(name, profileSettings, boundAddresses); + final BoundTransportAddress boundTransportAddress = createBoundTransportAddress(profileSettings, boundAddresses); - if (DEFAULT_PROFILE.equals(name)) { + if (profileSettings.isDefaultProfile) { this.boundAddress = boundTransportAddress; } else { - profileBoundAddresses.put(name, boundTransportAddress); + profileBoundAddresses.put(profileSettings.profileName, boundTransportAddress); } } @@ -779,7 +762,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i return boundSocket.get(); } - private BoundTransportAddress createBoundTransportAddress(String name, Settings profileSettings, + private BoundTransportAddress createBoundTransportAddress(ProfileSettings profileSettings, List boundAddresses) { String[] boundAddressesHostStrings = new String[boundAddresses.size()]; TransportAddress[] transportBoundAddresses = new TransportAddress[boundAddresses.size()]; @@ -789,37 +772,30 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i transportBoundAddresses[i] = new TransportAddress(boundAddress); } - String[] publishHosts; - if (DEFAULT_PROFILE.equals(name)) { - publishHosts = PUBLISH_HOST.get(settings).toArray(Strings.EMPTY_ARRAY); - } else { - publishHosts = profileSettings.getAsArray("publish_host", boundAddressesHostStrings); + List publishHosts = profileSettings.publishHosts; + if (profileSettings.isDefaultProfile == false && publishHosts.isEmpty()) { + publishHosts = Arrays.asList(boundAddressesHostStrings); } - if (publishHosts == null || publishHosts.length == 0) { - publishHosts = NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY); + if (publishHosts.isEmpty()) { + publishHosts = NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING.get(settings); } final InetAddress publishInetAddress; try { - publishInetAddress = networkService.resolvePublishHostAddresses(publishHosts); + publishInetAddress = networkService.resolvePublishHostAddresses(publishHosts.toArray(Strings.EMPTY_ARRAY)); } catch (Exception e) { throw new BindTransportException("Failed to resolve publish address", e); } - final int publishPort = resolvePublishPort(name, settings, profileSettings, boundAddresses, publishInetAddress); + final int publishPort = resolvePublishPort(profileSettings, boundAddresses, publishInetAddress); final TransportAddress publishAddress = new TransportAddress(new InetSocketAddress(publishInetAddress, publishPort)); return new BoundTransportAddress(transportBoundAddresses, publishAddress); } // package private for tests - public static int resolvePublishPort(String profileName, Settings settings, Settings profileSettings, - List boundAddresses, InetAddress publishInetAddress) { - int publishPort; - if (DEFAULT_PROFILE.equals(profileName)) { - publishPort = PUBLISH_PORT.get(settings); - } else { - publishPort = profileSettings.getAsInt("publish_port", -1); - } + public static int resolvePublishPort(ProfileSettings profileSettings, List boundAddresses, + InetAddress publishInetAddress) { + int publishPort = profileSettings.publishPort; // if port not explicitly provided, search for port of address in boundAddresses that matches publishInetAddress if (publishPort < 0) { @@ -844,7 +820,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } if (publishPort < 0) { - String profileExplanation = DEFAULT_PROFILE.equals(profileName) ? "" : " for profile " + profileName; + String profileExplanation = profileSettings.isDefaultProfile ? "" : " for profile " + profileSettings.profileName; throw new BindTransportException("Failed to auto-resolve publish port" + profileExplanation + ", multiple bound addresses " + boundAddresses + " with distinct ports and none of them matched the publish address (" + publishInetAddress + "). " + "Please specify a unique port by setting " + PORT.getKey() + " or " + @@ -1729,4 +1705,61 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i getNumOpenServerConnections(), readBytesMetric.count(), readBytesMetric.sum(), transmittedBytesMetric.count(), transmittedBytesMetric.sum()); } + + /** + * Returns all profile settings for the given settings object + */ + public static Set getProfileSettings(Settings settings) { + HashSet profiles = new HashSet<>(); + boolean isDefaultSet = false; + for (String profile : settings.getGroups("transport.profiles.", true).keySet()) { + profiles.add(new ProfileSettings(settings, profile)); + if (DEFAULT_PROFILE.equals(profile)) { + isDefaultSet = true; + } + } + if (isDefaultSet == false) { + profiles.add(new ProfileSettings(settings, DEFAULT_PROFILE)); + } + return Collections.unmodifiableSet(profiles); + } + + /** + * Representation of a transport profile settings for a transport.profiles.$profilename.* + */ + public static final class ProfileSettings { + public final String profileName; + public final boolean tcpNoDelay; + public final boolean tcpKeepAlive; + public final boolean reuseAddress; + public final ByteSizeValue sendBufferSize; + public final ByteSizeValue receiveBufferSize; + public final List bindHosts; + public final List publishHosts; + public final String portOrRange; + public final int publishPort; + public final boolean isDefaultProfile; + + public ProfileSettings(Settings settings, String profileName) { + this.profileName = profileName; + isDefaultProfile = DEFAULT_PROFILE.equals(profileName); + tcpKeepAlive = TCP_KEEP_ALIVE_PROFILE.getConcreteSettingForNamespace(profileName).get(settings); + tcpNoDelay = TCP_NO_DELAY_PROFILE.getConcreteSettingForNamespace(profileName).get(settings); + reuseAddress = TCP_REUSE_ADDRESS_PROFILE.getConcreteSettingForNamespace(profileName).get(settings); + sendBufferSize = TCP_SEND_BUFFER_SIZE_PROFILE.getConcreteSettingForNamespace(profileName).get(settings); + receiveBufferSize = TCP_RECEIVE_BUFFER_SIZE_PROFILE.getConcreteSettingForNamespace(profileName).get(settings); + List profileBindHosts = BIND_HOST_PROFILE.getConcreteSettingForNamespace(profileName).get(settings); + bindHosts = (profileBindHosts.isEmpty() ? NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING.get(settings) + : profileBindHosts); + publishHosts = PUBLISH_HOST_PROFILE.getConcreteSettingForNamespace(profileName).get(settings); + Setting concretePort = PORT_PROFILE.getConcreteSettingForNamespace(profileName); + if (concretePort.exists(settings) == false && isDefaultProfile == false) { + throw new IllegalStateException("profile [" + profileName + "] has no port configured"); + } + portOrRange = PORT_PROFILE.getConcreteSettingForNamespace(profileName).get(settings); + publishPort = isDefaultProfile ? PUBLISH_PORT.get(settings) : + PUBLISH_PORT_PROFILE.getConcreteSettingForNamespace(profileName).get(settings); + } + } + } diff --git a/core/src/test/java/org/elasticsearch/transport/PublishPortTests.java b/core/src/test/java/org/elasticsearch/transport/PublishPortTests.java index 42cc9b876ab..0f121f0c401 100644 --- a/core/src/test/java/org/elasticsearch/transport/PublishPortTests.java +++ b/core/src/test/java/org/elasticsearch/transport/PublishPortTests.java @@ -42,34 +42,39 @@ public class PublishPortTests extends ESTestCase { boolean useProfile = randomBoolean(); final String profile; - final Settings settings; - final Settings profileSettings; + Settings baseSettings; + Settings settings; if (useProfile) { - profile = "some_profile"; + baseSettings = Settings.builder().put("transport.profiles.some_profile.port", 0).build(); settings = randomBoolean() ? Settings.EMPTY : Settings.builder().put(TcpTransport.PUBLISH_PORT.getKey(), 9081).build(); - profileSettings = Settings.builder().put("publish_port", 9080).build(); + settings = Settings.builder().put(settings).put(baseSettings).put("transport.profiles.some_profile.publish_port", 9080).build(); + profile = "some_profile"; + } else { - profile = TcpTransport.DEFAULT_PROFILE; + baseSettings = Settings.EMPTY; settings = Settings.builder().put(TcpTransport.PUBLISH_PORT.getKey(), 9081).build(); - profileSettings = randomBoolean() ? Settings.EMPTY : Settings.builder().put("publish_port", 9080).build();; + settings = randomBoolean() ? settings : + Settings.builder().put(settings).put("transport.profiles.default.publish_port", 9080).build(); + profile = "default"; + } - int publishPort = resolvePublishPort(profile, settings, profileSettings, + int publishPort = resolvePublishPort(new TcpTransport.ProfileSettings(settings, profile), randomAddresses(), getByName("127.0.0.2")); assertThat("Publish port should be explicitly set", publishPort, equalTo(useProfile ? 9080 : 9081)); - publishPort = resolvePublishPort(profile, Settings.EMPTY, Settings.EMPTY, + publishPort = resolvePublishPort(new TcpTransport.ProfileSettings(baseSettings, profile), asList(address("127.0.0.1", boundPort), address("127.0.0.2", otherBoundPort)), getByName("127.0.0.1")); assertThat("Publish port should be derived from matched address", publishPort, equalTo(boundPort)); - publishPort = resolvePublishPort(profile, Settings.EMPTY, Settings.EMPTY, + publishPort = resolvePublishPort(new TcpTransport.ProfileSettings(baseSettings, profile), asList(address("127.0.0.1", boundPort), address("127.0.0.2", boundPort)), getByName("127.0.0.3")); assertThat("Publish port should be derived from unique port of bound addresses", publishPort, equalTo(boundPort)); try { - resolvePublishPort(profile, Settings.EMPTY, Settings.EMPTY, + resolvePublishPort(new TcpTransport.ProfileSettings(baseSettings, profile), asList(address("127.0.0.1", boundPort), address("127.0.0.2", otherBoundPort)), getByName("127.0.0.3")); fail("Expected BindTransportException as publish_port not specified and non-unique port of bound addresses"); @@ -77,13 +82,13 @@ public class PublishPortTests extends ESTestCase { assertThat(e.getMessage(), containsString("Failed to auto-resolve publish port")); } - publishPort = resolvePublishPort(profile, Settings.EMPTY, Settings.EMPTY, + publishPort = resolvePublishPort(new TcpTransport.ProfileSettings(baseSettings, profile), asList(address("0.0.0.0", boundPort), address("127.0.0.2", otherBoundPort)), getByName("127.0.0.1")); assertThat("Publish port should be derived from matching wildcard address", publishPort, equalTo(boundPort)); if (NetworkUtils.SUPPORTS_V6) { - publishPort = resolvePublishPort(profile, Settings.EMPTY, Settings.EMPTY, + publishPort = resolvePublishPort(new TcpTransport.ProfileSettings(baseSettings, profile), asList(address("0.0.0.0", boundPort), address("127.0.0.2", otherBoundPort)), getByName("::1")); assertThat("Publish port should be derived from matching wildcard address", publishPort, equalTo(boundPort)); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 5e12116f00a..5623005f7d9 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -62,7 +62,6 @@ import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportServiceAdapter; import java.io.IOException; import java.net.InetSocketAddress; @@ -71,6 +70,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -142,10 +142,6 @@ public class Netty4Transport extends TcpTransport { } } - TransportServiceAdapter transportServiceAdapter() { - return transportServiceAdapter; - } - @Override protected void doStart() { boolean success = false; @@ -154,14 +150,9 @@ public class Netty4Transport extends TcpTransport { if (NetworkService.NETWORK_SERVER.get(settings)) { final Netty4OpenChannelsHandler openChannels = new Netty4OpenChannelsHandler(logger); this.serverOpenChannels = openChannels; - // loop through all profiles and start them up, special handling for default one - for (Map.Entry entry : buildProfileSettings().entrySet()) { - // merge fallback settings with default settings with profile settings so we have complete settings with default values - final Settings settings = Settings.builder() - .put(createFallbackSettings()) - .put(entry.getValue()).build(); - createServerBootstrap(entry.getKey(), settings); - bindServer(entry.getKey(), settings); + for (ProfileSettings profileSettings : profileSettings) { + createServerBootstrap(profileSettings); + bindServer(profileSettings); } } super.doStart(); @@ -204,46 +195,12 @@ public class Netty4Transport extends TcpTransport { return bootstrap; } - private Settings createFallbackSettings() { - Settings.Builder fallbackSettingsBuilder = Settings.builder(); - - List fallbackBindHost = TcpTransport.BIND_HOST.get(settings); - if (fallbackBindHost.isEmpty() == false) { - fallbackSettingsBuilder.putArray("bind_host", fallbackBindHost); - } - - List fallbackPublishHost = TcpTransport.PUBLISH_HOST.get(settings); - if (fallbackPublishHost.isEmpty() == false) { - fallbackSettingsBuilder.putArray("publish_host", fallbackPublishHost); - } - - boolean fallbackTcpNoDelay = TcpTransport.TCP_NO_DELAY.get(settings); - fallbackSettingsBuilder.put("tcp_no_delay", fallbackTcpNoDelay); - - boolean fallbackTcpKeepAlive = TcpTransport.TCP_KEEP_ALIVE.get(settings); - fallbackSettingsBuilder.put("tcp_keep_alive", fallbackTcpKeepAlive); - - boolean fallbackReuseAddress = TcpTransport.TCP_REUSE_ADDRESS.get(settings); - fallbackSettingsBuilder.put("reuse_address", fallbackReuseAddress); - - ByteSizeValue fallbackTcpSendBufferSize = TCP_SEND_BUFFER_SIZE.get(settings); - if (fallbackTcpSendBufferSize.getBytes() >= 0) { - fallbackSettingsBuilder.put("tcp_send_buffer_size", fallbackTcpSendBufferSize); - } - - ByteSizeValue fallbackTcpBufferSize = TCP_RECEIVE_BUFFER_SIZE.get(settings); - if (fallbackTcpBufferSize.getBytes() >= 0) { - fallbackSettingsBuilder.put("tcp_receive_buffer_size", fallbackTcpBufferSize); - } - - return fallbackSettingsBuilder.build(); - } - - private void createServerBootstrap(String name, Settings settings) { + private void createServerBootstrap(ProfileSettings profileSettings) { + String name = profileSettings.profileName; if (logger.isDebugEnabled()) { logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], " + "connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]", - name, workerCount, settings.get("port"), settings.get("bind_host"), settings.get("publish_host"), compress, + name, workerCount, profileSettings.portOrRange, profileSettings.bindHosts, profileSettings.publishHosts, compress, defaultConnectionProfile.getConnectTimeout(), defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY), defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK), @@ -253,6 +210,7 @@ public class Netty4Transport extends TcpTransport { receivePredictorMin, receivePredictorMax); } + final ThreadFactory workerFactory = daemonThreadFactory(this.settings, TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, name); final ServerBootstrap serverBootstrap = new ServerBootstrap(); @@ -260,34 +218,31 @@ public class Netty4Transport extends TcpTransport { serverBootstrap.group(new NioEventLoopGroup(workerCount, workerFactory)); serverBootstrap.channel(NioServerSocketChannel.class); - serverBootstrap.childHandler(getServerChannelInitializer(name, settings)); + serverBootstrap.childHandler(getServerChannelInitializer(name)); - serverBootstrap.childOption(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings)); - serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings)); + serverBootstrap.childOption(ChannelOption.TCP_NODELAY, profileSettings.tcpNoDelay); + serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, profileSettings.tcpKeepAlive); - final ByteSizeValue tcpSendBufferSize = TCP_SEND_BUFFER_SIZE.getDefault(settings); - if (tcpSendBufferSize != null && tcpSendBufferSize.getBytes() > 0) { - serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes())); + if (profileSettings.sendBufferSize.getBytes() != -1) { + serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(profileSettings.sendBufferSize.getBytes())); } - final ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.getDefault(settings); - if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.getBytes() > 0) { - serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.bytesAsInt())); + if (profileSettings.receiveBufferSize.getBytes() != -1) { + serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(profileSettings.receiveBufferSize.bytesAsInt())); } serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator); serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator); - final boolean reuseAddress = TCP_REUSE_ADDRESS.get(settings); - serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress); - serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress); + serverBootstrap.option(ChannelOption.SO_REUSEADDR, profileSettings.reuseAddress); + serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, profileSettings.reuseAddress); serverBootstrap.validate(); serverBootstraps.put(name, serverBootstrap); } - protected ChannelHandler getServerChannelInitializer(String name, Settings settings) { - return new ServerChannelInitializer(name, settings); + protected ChannelHandler getServerChannelInitializer(String name) { + return new ServerChannelInitializer(name); } protected ChannelHandler getClientChannelInitializer() { @@ -455,11 +410,9 @@ public class Netty4Transport extends TcpTransport { protected class ServerChannelInitializer extends ChannelInitializer { protected final String name; - protected final Settings settings; - protected ServerChannelInitializer(String name, Settings settings) { + protected ServerChannelInitializer(String name) { this.name = name; - this.settings = settings; } @Override diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java index dba5ca2d82d..295e7ab389c 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java @@ -92,9 +92,9 @@ public class NettyTransportMultiPortTests extends ESTestCase { .build(); ThreadPool threadPool = new TestThreadPool("tst"); - try (TcpTransport transport = startTransport(settings, threadPool)) { - assertEquals(0, transport.profileBoundAddresses().size()); - assertEquals(1, transport.boundAddress().boundAddresses().length); + try { + IllegalStateException ex = expectThrows(IllegalStateException.class, () -> startTransport(settings, threadPool)); + assertEquals("profile [client1] has no port configured", ex.getMessage()); } finally { terminate(threadPool); } @@ -116,24 +116,6 @@ public class NettyTransportMultiPortTests extends ESTestCase { } } - public void testThatProfileWithoutValidNameIsIgnored() throws Exception { - Settings settings = Settings.builder() - .put("network.host", host) - .put(TcpTransport.PORT.getKey(), 0) - // mimics someone trying to define a profile for .local which is the profile for a node request to itself - .put("transport.profiles." + TransportService.DIRECT_RESPONSE_PROFILE + ".port", 22) // will not actually bind to this - .put("transport.profiles..port", 23) // will not actually bind to this - .build(); - - ThreadPool threadPool = new TestThreadPool("tst"); - try (TcpTransport transport = startTransport(settings, threadPool)) { - assertEquals(0, transport.profileBoundAddresses().size()); - assertEquals(1, transport.boundAddress().boundAddresses().length); - } finally { - terminate(threadPool); - } - } - private TcpTransport startTransport(Settings settings, ThreadPool threadPool) { BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); TcpTransport transport = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()), @@ -143,5 +125,4 @@ public class NettyTransportMultiPortTests extends ESTestCase { assertThat(transport.lifecycleState(), is(Lifecycle.State.STARTED)); return transport; } - } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index ef1e127930c..c30731b9b88 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -30,12 +30,15 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; @@ -54,6 +57,8 @@ import org.junit.Before; import java.io.IOException; import java.io.UncheckedIOException; +import java.net.Inet4Address; +import java.net.Inet6Address; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; @@ -75,6 +80,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collector; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -2419,4 +2426,176 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { serviceC.close(); } } + + public void testTransportProfilesWithPortAndHost() { + boolean doIPV6 = NetworkUtils.SUPPORTS_V6; + List hosts; + if (doIPV6) { + hosts = Arrays.asList("_local:ipv6_", "_local:ipv4_"); + } else { + hosts = Arrays.asList("_local:ipv4_"); + } + try (MockTransportService serviceC = build(Settings.builder() + .put("name", "TS_TEST") + .put("transport.profiles.default.bind_host", "_local:ipv4_") + .put("transport.profiles.some_profile.port", "8900-9000") + .put("transport.profiles.some_profile.bind_host", "_local:ipv4_") + .put("transport.profiles.some_other_profile.port", "8700-8800") + .putArray("transport.profiles.some_other_profile.bind_host", hosts) + .putArray("transport.profiles.some_other_profile.publish_host", "_local:ipv4_") + .build(), version0, null, true)) { + + serviceC.start(); + serviceC.acceptIncomingRequests(); + Map profileBoundAddresses = serviceC.transport.profileBoundAddresses(); + assertTrue(profileBoundAddresses.containsKey("some_profile")); + assertTrue(profileBoundAddresses.containsKey("some_other_profile")); + assertTrue(profileBoundAddresses.get("some_profile").publishAddress().getPort() >= 8900); + assertTrue(profileBoundAddresses.get("some_profile").publishAddress().getPort() < 9000); + assertTrue(profileBoundAddresses.get("some_other_profile").publishAddress().getPort() >= 8700); + assertTrue(profileBoundAddresses.get("some_other_profile").publishAddress().getPort() < 8800); + assertEquals(profileBoundAddresses.get("some_profile").boundAddresses().length, 1); + if (doIPV6) { + assertTrue(profileBoundAddresses.get("some_other_profile").boundAddresses().length >= 2); + int ipv4 = 0; + int ipv6 = 0; + for (TransportAddress addr : profileBoundAddresses.get("some_other_profile").boundAddresses()) { + if (addr.address().getAddress() instanceof Inet4Address) { + ipv4++; + } else if (addr.address().getAddress() instanceof Inet6Address) { + ipv6++; + } else { + fail("what kind of address is this: " + addr.address().getAddress()); + } + } + assertTrue("num ipv4 is wrong: " + ipv4, ipv4 >= 1); + assertTrue("num ipv6 is wrong: " + ipv6, ipv6 >= 1); + } else { + assertEquals(profileBoundAddresses.get("some_other_profile").boundAddresses().length, 1); + } + assertTrue(profileBoundAddresses.get("some_other_profile").publishAddress().address().getAddress() instanceof Inet4Address); + } + } + + public void testProfileSettings() { + boolean enable = randomBoolean(); + Settings globalSettings = Settings.builder() + .put("network.tcp.no_delay", enable) + .put("network.tcp.keep_alive", enable) + .put("network.tcp.reuse_address", enable) + .put("network.tcp.send_buffer_size", "43000b") + .put("network.tcp.receive_buffer_size", "42000b") + .put("network.publish_host", "the_publish_host") + .put("network.bind_host", "the_bind_host") + .build(); + + Settings globalSettings2 = Settings.builder() + .put("network.tcp.no_delay", !enable) + .put("network.tcp.keep_alive", !enable) + .put("network.tcp.reuse_address", !enable) + .put("network.tcp.send_buffer_size", "4b") + .put("network.tcp.receive_buffer_size", "3b") + .put("network.publish_host", "another_publish_host") + .put("network.bind_host", "another_bind_host") + .build(); + + Settings transportSettings = Settings.builder() + .put("transport.tcp_no_delay", enable) + .put("transport.tcp.keep_alive", enable) + .put("transport.tcp.reuse_address", enable) + .put("transport.tcp.send_buffer_size", "43000b") + .put("transport.tcp.receive_buffer_size", "42000b") + .put("transport.publish_host", "the_publish_host") + .put("transport.tcp.port", "9700-9800") + .put("transport.bind_host", "the_bind_host") + .put(globalSettings2) + .build(); + + Settings transportSettings2 = Settings.builder() + .put("transport.tcp_no_delay", !enable) + .put("transport.tcp.keep_alive", !enable) + .put("transport.tcp.reuse_address", !enable) + .put("transport.tcp.send_buffer_size", "5b") + .put("transport.tcp.receive_buffer_size", "6b") + .put("transport.publish_host", "another_publish_host") + .put("transport.tcp.port", "9702-9802") + .put("transport.bind_host", "another_bind_host") + .put(globalSettings2) + .build(); + Settings defaultProfileSettings = Settings.builder() + .put("transport.profiles.default.tcp_no_delay", enable) + .put("transport.profiles.default.tcp_keep_alive", enable) + .put("transport.profiles.default.reuse_address", enable) + .put("transport.profiles.default.send_buffer_size", "43000b") + .put("transport.profiles.default.receive_buffer_size", "42000b") + .put("transport.profiles.default.port", "9700-9800") + .put("transport.profiles.default.publish_host", "the_publish_host") + .put("transport.profiles.default.bind_host", "the_bind_host") + .put("transport.profiles.default.publish_port", 42) + .put(randomBoolean() ? transportSettings2 : globalSettings2) // ensure that we have profile precedence + .build(); + + Settings profileSettings = Settings.builder() + .put("transport.profiles.some_profile.tcp_no_delay", enable) + .put("transport.profiles.some_profile.tcp_keep_alive", enable) + .put("transport.profiles.some_profile.reuse_address", enable) + .put("transport.profiles.some_profile.send_buffer_size", "43000b") + .put("transport.profiles.some_profile.receive_buffer_size", "42000b") + .put("transport.profiles.some_profile.port", "9700-9800") + .put("transport.profiles.some_profile.publish_host", "the_publish_host") + .put("transport.profiles.some_profile.bind_host", "the_bind_host") + .put("transport.profiles.some_profile.publish_port", 42) + .put(randomBoolean() ? transportSettings2 : globalSettings2) // ensure that we have profile precedence + .put(randomBoolean() ? defaultProfileSettings : Settings.EMPTY) + .build(); + + Settings randomSettings = randomFrom(random(), globalSettings, transportSettings, profileSettings); + ClusterSettings clusterSettings = new ClusterSettings(randomSettings, ClusterSettings + .BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.validate(randomSettings); + TcpTransport.ProfileSettings settings = new TcpTransport.ProfileSettings( + Settings.builder().put(randomSettings).put("transport.profiles.some_profile.port", "9700-9800").build(), // port is required + "some_profile"); + + assertEquals(enable, settings.tcpNoDelay); + assertEquals(enable, settings.tcpKeepAlive); + assertEquals(enable, settings.reuseAddress); + assertEquals(43000, settings.sendBufferSize.getBytes()); + assertEquals(42000, settings.receiveBufferSize.getBytes()); + if (randomSettings == profileSettings) { + assertEquals(42, settings.publishPort); + } else { + assertEquals(-1, settings.publishPort); + } + + if (randomSettings == globalSettings) { // publish host has no global fallback for the profile since we later resolve it based on + // the bound address + assertEquals(Collections.emptyList(), settings.publishHosts); + } else { + assertEquals(Collections.singletonList("the_publish_host"), settings.publishHosts); + } + assertEquals("9700-9800", settings.portOrRange); + assertEquals(Collections.singletonList("the_bind_host"), settings.bindHosts); + } + + public void testProfilesIncludesDefault() { + Set profileSettings = TcpTransport.getProfileSettings(Settings.EMPTY); + assertEquals(1, profileSettings.size()); + assertEquals(TcpTransport.DEFAULT_PROFILE, profileSettings.stream().findAny().get().profileName); + + profileSettings = TcpTransport.getProfileSettings(Settings.builder() + .put("transport.profiles.test.port", "0") + .build()); + assertEquals(2, profileSettings.size()); + assertEquals(new HashSet<>(Arrays.asList("default", "test")), profileSettings.stream().map(s -> s.profileName).collect(Collectors + .toSet())); + + profileSettings = TcpTransport.getProfileSettings(Settings.builder() + .put("transport.profiles.test.port", "0") + .put("transport.profiles.default.port", "0") + .build()); + assertEquals(2, profileSettings.size()); + assertEquals(new HashSet<>(Arrays.asList("default", "test")), profileSettings.stream().map(s -> s.profileName).collect(Collectors + .toSet())); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 381520d4627..f2849705e05 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -389,10 +389,8 @@ public class MockTcpTransport extends TcpTransport try { if (NetworkService.NETWORK_SERVER.get(settings)) { // loop through all profiles and start them up, special handling for default one - for (Map.Entry entry : buildProfileSettings().entrySet()) { - final Settings profileSettings = Settings.builder() - .put(entry.getValue()).build(); - bindServer(entry.getKey(), profileSettings); + for (ProfileSettings profileSettings : profileSettings) { + bindServer(profileSettings); } } super.doStart(); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index a7c7d187806..335d438f577 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -45,7 +45,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; -import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadFactory; import java.util.function.Consumer; @@ -69,7 +69,6 @@ public class NioTransport extends TcpTransport { intSetting("transport.nio.acceptor_count", 1, 1, Setting.Property.NodeScope); private final TcpReadHandler tcpReadHandler = new TcpReadHandler(this); - private final BigArrays bigArrays; private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); private final OpenChannels openChannels = new OpenChannels(logger); private final ArrayList acceptors = new ArrayList<>(); @@ -80,7 +79,6 @@ public class NioTransport extends TcpTransport { public NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { super("nio", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService); - this.bigArrays = bigArrays; } @Override @@ -175,13 +173,9 @@ public class NioTransport extends TcpTransport { acceptors.add(acceptor); } // loop through all profiles and start them up, special handling for default one - for (Map.Entry entry : buildProfileSettings().entrySet()) { - // merge fallback settings with default settings with profile settings so we have complete settings with default values - final Settings profileSettings = Settings.builder() - .put(createFallbackSettings()) - .put(entry.getValue()).build(); - profileToChannelFactory.putIfAbsent(entry.getKey(), new ChannelFactory(profileSettings, tcpReadHandler)); - bindServer(entry.getKey(), profileSettings); + for (ProfileSettings profileSettings : profileSettings) { + profileToChannelFactory.putIfAbsent(profileSettings.profileName, new ChannelFactory(profileSettings, tcpReadHandler)); + bindServer(profileSettings); } } client = createClient(); @@ -269,7 +263,7 @@ public class NioTransport extends TcpTransport { private NioClient createClient() { Supplier selectorSupplier = new RoundRobinSelectorSupplier(socketSelectors); - ChannelFactory channelFactory = new ChannelFactory(settings, tcpReadHandler); + ChannelFactory channelFactory = new ChannelFactory(new ProfileSettings(settings, "default"), tcpReadHandler); return new NioClient(logger, openChannels, selectorSupplier, defaultConnectionProfile.getConnectTimeout(), channelFactory); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java index 84c36d41104..420af25fe6a 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java @@ -19,8 +19,6 @@ package org.elasticsearch.transport.nio.channel; -import org.elasticsearch.common.CheckedSupplier; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.mocksocket.PrivilegedSocketAccess; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.nio.TcpReadHandler; @@ -31,9 +29,6 @@ import java.net.ServerSocket; import java.net.Socket; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import java.security.AccessController; -import java.security.PrivilegedActionException; -import java.security.PrivilegedExceptionAction; public class ChannelFactory { @@ -44,12 +39,12 @@ public class ChannelFactory { private final int tcpReceiveBufferSize; private final TcpReadHandler handler; - public ChannelFactory(Settings settings, TcpReadHandler handler) { - tcpNoDelay = TcpTransport.TCP_NO_DELAY.get(settings); - tcpKeepAlive = TcpTransport.TCP_KEEP_ALIVE.get(settings); - tcpReusedAddress = TcpTransport.TCP_REUSE_ADDRESS.get(settings); - tcpSendBufferSize = Math.toIntExact(TcpTransport.TCP_SEND_BUFFER_SIZE.get(settings).getBytes()); - tcpReceiveBufferSize = Math.toIntExact(TcpTransport.TCP_RECEIVE_BUFFER_SIZE.get(settings).getBytes()); + public ChannelFactory(TcpTransport.ProfileSettings profileSettings, TcpReadHandler handler) { + tcpNoDelay = profileSettings.tcpNoDelay; + tcpKeepAlive = profileSettings.tcpKeepAlive; + tcpReusedAddress = profileSettings.reuseAddress; + tcpSendBufferSize = Math.toIntExact(profileSettings.sendBufferSize.getBytes()); + tcpReceiveBufferSize = Math.toIntExact(profileSettings.receiveBufferSize.getBytes()); this.handler = handler; } @@ -94,12 +89,4 @@ public class ChannelFactory { socket.setSendBufferSize(tcpReceiveBufferSize); } } - - private static T getSocketChannel(CheckedSupplier supplier) throws IOException { - try { - return AccessController.doPrivileged((PrivilegedExceptionAction) supplier::get); - } catch (PrivilegedActionException e) { - throw (IOException) e.getCause(); - } - } } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/AbstractNioChannelTestCase.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/AbstractNioChannelTestCase.java index c3909a06440..7db9f48ca45 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/AbstractNioChannelTestCase.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/AbstractNioChannelTestCase.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.mocksocket.MockServerSocket; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.nio.TcpReadHandler; import org.junit.After; import org.junit.Before; @@ -39,7 +40,8 @@ import static org.mockito.Mockito.mock; public abstract class AbstractNioChannelTestCase extends ESTestCase { - ChannelFactory channelFactory = new ChannelFactory(Settings.EMPTY, mock(TcpReadHandler.class)); + ChannelFactory channelFactory = new ChannelFactory(new TcpTransport.ProfileSettings(Settings.EMPTY, "default"), + mock(TcpReadHandler.class)); MockServerSocket mockServerSocket; private Thread serverThread;