Validate `transport.profiles.*` settings (#25508)

Transport profiles unfortunately have never been validated. Yet, it's very
easy to make a mistake when configuring profiles which will most likely stay
undetected since we don't validate the settings but allow almost everything
based on the wildcard in `transport.profiles.*`. This change removes the
settings subset based parsing of profiles but rather uses concrete affix settings
for the profiles which makes it easier to fall back to higher level settings since
the fallback settings are present when the profile setting is parsed. Previously, it was
unclear in the code which setting is used ie. if the profiles settings (with removed
prefixes) or the global node setting. There is no distinction anymore since we don't pull
prefix based settings.
This commit is contained in:
Simon Willnauer 2017-07-07 09:40:59 +02:00 committed by GitHub
parent c96257ca73
commit 1f67d079b1
11 changed files with 353 additions and 227 deletions

View File

@ -23,6 +23,7 @@ import org.elasticsearch.SecureSM;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.http.HttpTransportSettings; import org.elasticsearch.http.HttpTransportSettings;
@ -320,30 +321,15 @@ final class Security {
* @param policy the {@link Permissions} instance to apply the dynamic {@link SocketPermission}s to * @param policy the {@link Permissions} instance to apply the dynamic {@link SocketPermission}s to
* @param settings the {@link Settings} instance to read the transport settings from * @param settings the {@link Settings} instance to read the transport settings from
*/ */
private static void addSocketPermissionForTransportProfiles( private static void addSocketPermissionForTransportProfiles(final Permissions policy, final Settings settings) {
final Permissions policy,
final Settings settings) {
// transport is way over-engineered // transport is way over-engineered
final Map<String, Settings> profiles = new HashMap<>(TcpTransport.TRANSPORT_PROFILES_SETTING.get(settings).getAsGroups()); Set<TcpTransport.ProfileSettings> profiles = TcpTransport.getProfileSettings(settings);
profiles.putIfAbsent(TcpTransport.DEFAULT_PROFILE, Settings.EMPTY); Set<String> uniquePortRanges = new HashSet<>();
// loop through all profiles and add permissions for each one
// loop through all profiles and add permissions for each one, if it's valid; otherwise Netty transports are lenient and ignores it for (final TcpTransport.ProfileSettings profile : profiles) {
for (final Map.Entry<String, Settings> entry : profiles.entrySet()) { if (uniquePortRanges.add(profile.portOrRange)) {
final Settings profileSettings = entry.getValue(); // profiles fall back to the transport.port if it's not explicit but we want to only add one permission per range
final String name = entry.getKey(); addSocketPermissionForPortRange(policy, profile.portOrRange);
// a profile is only valid if it's the default profile, or if it has an actual name and specifies a port
// TODO: can this leniency be removed?
final boolean valid =
TcpTransport.DEFAULT_PROFILE.equals(name) ||
(name != null && name.length() > 0 && profileSettings.get("port") != null);
if (valid) {
final String transportRange = profileSettings.get("port");
if (transportRange != null) {
addSocketPermissionForPortRange(policy, transportRange);
} else {
addSocketPermissionForTransport(policy, settings);
}
} }
} }
} }

View File

@ -270,12 +270,20 @@ public final class ClusterSettings extends AbstractScopedSettings {
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING, HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING,
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING, HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING,
Transport.TRANSPORT_TCP_COMPRESS, Transport.TRANSPORT_TCP_COMPRESS,
TcpTransport.TRANSPORT_PROFILES_SETTING,
TcpTransport.HOST, TcpTransport.HOST,
TcpTransport.PUBLISH_HOST, TcpTransport.PUBLISH_HOST,
TcpTransport.BIND_HOST, TcpTransport.BIND_HOST,
TcpTransport.PUBLISH_PORT, TcpTransport.PUBLISH_PORT,
TcpTransport.PORT, TcpTransport.PORT,
TcpTransport.BIND_HOST_PROFILE,
TcpTransport.PUBLISH_HOST_PROFILE,
TcpTransport.PUBLISH_PORT_PROFILE,
TcpTransport.PORT_PROFILE,
TcpTransport.TCP_NO_DELAY_PROFILE,
TcpTransport.TCP_KEEP_ALIVE_PROFILE,
TcpTransport.TCP_REUSE_ADDRESS_PROFILE,
TcpTransport.TCP_SEND_BUFFER_SIZE_PROFILE,
TcpTransport.TCP_RECEIVE_BUFFER_SIZE_PROFILE,
TcpTransport.CONNECTIONS_PER_NODE_RECOVERY, TcpTransport.CONNECTIONS_PER_NODE_RECOVERY,
TcpTransport.CONNECTIONS_PER_NODE_BULK, TcpTransport.CONNECTIONS_PER_NODE_BULK,
TcpTransport.CONNECTIONS_PER_NODE_REG, TcpTransport.CONNECTIONS_PER_NODE_REG,

View File

@ -108,6 +108,7 @@ import java.util.stream.Collectors;
import static java.util.Collections.emptyList; import static java.util.Collections.emptyList;
import static java.util.Collections.unmodifiableMap; import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.settings.Setting.affixKeySetting;
import static org.elasticsearch.common.settings.Setting.boolSetting; import static org.elasticsearch.common.settings.Setting.boolSetting;
import static org.elasticsearch.common.settings.Setting.groupSetting; import static org.elasticsearch.common.settings.Setting.groupSetting;
import static org.elasticsearch.common.settings.Setting.intSetting; import static org.elasticsearch.common.settings.Setting.intSetting;
@ -133,8 +134,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
public static final Setting<Integer> PUBLISH_PORT = public static final Setting<Integer> PUBLISH_PORT =
intSetting("transport.publish_port", -1, -1, Setting.Property.NodeScope); intSetting("transport.publish_port", -1, -1, Setting.Property.NodeScope);
public static final String DEFAULT_PROFILE = "default"; public static final String DEFAULT_PROFILE = "default";
public static final Setting<Settings> TRANSPORT_PROFILES_SETTING =
groupSetting("transport.profiles.", Setting.Property.Dynamic, Setting.Property.NodeScope);
// the scheduled internal ping interval setting, defaults to disabled (-1) // the scheduled internal ping interval setting, defaults to disabled (-1)
public static final Setting<TimeValue> PING_SCHEDULE = public static final Setting<TimeValue> PING_SCHEDULE =
timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), Setting.Property.NodeScope); timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), Setting.Property.NodeScope);
@ -164,6 +163,26 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
Setting.Property.NodeScope); Setting.Property.NodeScope);
public static final Setting.AffixSetting<Boolean> TCP_NO_DELAY_PROFILE = affixKeySetting("transport.profiles.", "tcp_no_delay",
key -> boolSetting(key, TcpTransport.TCP_NO_DELAY, Setting.Property.NodeScope));
public static final Setting.AffixSetting<Boolean> TCP_KEEP_ALIVE_PROFILE = affixKeySetting("transport.profiles.", "tcp_keep_alive",
key -> boolSetting(key, TcpTransport.TCP_KEEP_ALIVE, Setting.Property.NodeScope));
public static final Setting.AffixSetting<Boolean> TCP_REUSE_ADDRESS_PROFILE = affixKeySetting("transport.profiles.", "reuse_address",
key -> boolSetting(key, TcpTransport.TCP_REUSE_ADDRESS, Setting.Property.NodeScope));
public static final Setting.AffixSetting<ByteSizeValue> TCP_SEND_BUFFER_SIZE_PROFILE = affixKeySetting("transport.profiles.",
"send_buffer_size", key -> Setting.byteSizeSetting(key, TcpTransport.TCP_SEND_BUFFER_SIZE, Setting.Property.NodeScope));
public static final Setting.AffixSetting<ByteSizeValue> TCP_RECEIVE_BUFFER_SIZE_PROFILE = affixKeySetting("transport.profiles.",
"receive_buffer_size", key -> Setting.byteSizeSetting(key, TcpTransport.TCP_RECEIVE_BUFFER_SIZE, Setting.Property.NodeScope));
public static final Setting.AffixSetting<List<String>> BIND_HOST_PROFILE = affixKeySetting("transport.profiles.", "bind_host",
key -> listSetting(key, BIND_HOST, Function.identity(), Setting.Property.NodeScope));
public static final Setting.AffixSetting<List<String>> PUBLISH_HOST_PROFILE = affixKeySetting("transport.profiles.", "publish_host",
key -> listSetting(key, PUBLISH_HOST, Function.identity(), Setting.Property.NodeScope));
public static final Setting.AffixSetting<String> PORT_PROFILE = affixKeySetting("transport.profiles.", "port",
key -> new Setting(key, PORT, Function.identity(), Setting.Property.NodeScope));
public static final Setting.AffixSetting<Integer> PUBLISH_PORT_PROFILE = affixKeySetting("transport.profiles.", "publish_port",
key -> intSetting(key, -1, -1, Setting.Property.NodeScope));
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9); private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9);
private static final int PING_DATA_SIZE = -1; private static final int PING_DATA_SIZE = -1;
private final CircuitBreakerService circuitBreakerService; private final CircuitBreakerService circuitBreakerService;
@ -173,6 +192,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
protected final ThreadPool threadPool; protected final ThreadPool threadPool;
private final BigArrays bigArrays; private final BigArrays bigArrays;
protected final NetworkService networkService; protected final NetworkService networkService;
protected final Set<ProfileSettings> profileSettings;
protected volatile TransportServiceAdapter transportServiceAdapter; protected volatile TransportServiceAdapter transportServiceAdapter;
// node id to actual channel // node id to actual channel
@ -204,6 +224,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) { NetworkService networkService) {
super(settings); super(settings);
this.profileSettings = getProfileSettings(settings);
this.threadPool = threadPool; this.threadPool = threadPool;
this.bigArrays = bigArrays; this.bigArrays = bigArrays;
this.circuitBreakerService = circuitBreakerService; this.circuitBreakerService = circuitBreakerService;
@ -664,43 +685,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
return unmodifiableMap(new HashMap<>(profileBoundAddresses)); return unmodifiableMap(new HashMap<>(profileBoundAddresses));
} }
protected Map<String, Settings> buildProfileSettings() {
// extract default profile first and create standard bootstrap
Map<String, Settings> profiles = TRANSPORT_PROFILES_SETTING.get(settings).getAsGroups(true);
if (!profiles.containsKey(DEFAULT_PROFILE)) {
profiles = new HashMap<>(profiles);
profiles.put(DEFAULT_PROFILE, Settings.EMPTY);
}
Settings defaultSettings = profiles.get(DEFAULT_PROFILE);
Map<String, Settings> result = new HashMap<>();
// loop through all profiles and start them up, special handling for default one
for (Map.Entry<String, Settings> entry : profiles.entrySet()) {
Settings profileSettings = entry.getValue();
String name = entry.getKey();
if (!Strings.hasLength(name)) {
logger.info("transport profile configured without a name. skipping profile with settings [{}]",
profileSettings.toDelimitedString(','));
continue;
} else if (DEFAULT_PROFILE.equals(name)) {
profileSettings = Settings.builder()
.put(profileSettings)
.put("port", profileSettings.get("port", PORT.get(this.settings)))
.build();
} else if (profileSettings.get("port") == null) {
// if profile does not have a port, skip it
logger.info("No port configured for profile [{}], not binding", name);
continue;
}
Settings mergedSettings = Settings.builder()
.put(defaultSettings.getAsMap())
.put(profileSettings.getAsMap())
.build();
result.put(name, mergedSettings);
}
return result;
}
@Override @Override
public List<String> getLocalAddresses() { public List<String> getLocalAddresses() {
List<String> local = new ArrayList<>(); List<String> local = new ArrayList<>();
@ -712,15 +696,14 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
return local; return local;
} }
protected void bindServer(final String name, final Settings profileSettings) { protected void bindServer(ProfileSettings profileSettings) {
// Bind and start to accept incoming connections. // Bind and start to accept incoming connections.
InetAddress hostAddresses[]; InetAddress hostAddresses[];
String bindHosts[] = profileSettings.getAsArray("bind_host", List<String> profileBindHosts = profileSettings.bindHosts;
NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY));
try { try {
hostAddresses = networkService.resolveBindHostAddresses(bindHosts); hostAddresses = networkService.resolveBindHostAddresses(profileBindHosts.toArray(Strings.EMPTY_ARRAY));
} catch (IOException e) { } catch (IOException e) {
throw new BindTransportException("Failed to resolve host " + Arrays.toString(bindHosts), e); throw new BindTransportException("Failed to resolve host " + profileBindHosts, e);
} }
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
String[] addresses = new String[hostAddresses.length]; String[] addresses = new String[hostAddresses.length];
@ -734,15 +717,15 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
List<InetSocketAddress> boundAddresses = new ArrayList<>(); List<InetSocketAddress> boundAddresses = new ArrayList<>();
for (InetAddress hostAddress : hostAddresses) { for (InetAddress hostAddress : hostAddresses) {
boundAddresses.add(bindToPort(name, hostAddress, profileSettings.get("port"))); boundAddresses.add(bindToPort(profileSettings.profileName, hostAddress, profileSettings.portOrRange));
} }
final BoundTransportAddress boundTransportAddress = createBoundTransportAddress(name, profileSettings, boundAddresses); final BoundTransportAddress boundTransportAddress = createBoundTransportAddress(profileSettings, boundAddresses);
if (DEFAULT_PROFILE.equals(name)) { if (profileSettings.isDefaultProfile) {
this.boundAddress = boundTransportAddress; this.boundAddress = boundTransportAddress;
} else { } else {
profileBoundAddresses.put(name, boundTransportAddress); profileBoundAddresses.put(profileSettings.profileName, boundTransportAddress);
} }
} }
@ -779,7 +762,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
return boundSocket.get(); return boundSocket.get();
} }
private BoundTransportAddress createBoundTransportAddress(String name, Settings profileSettings, private BoundTransportAddress createBoundTransportAddress(ProfileSettings profileSettings,
List<InetSocketAddress> boundAddresses) { List<InetSocketAddress> boundAddresses) {
String[] boundAddressesHostStrings = new String[boundAddresses.size()]; String[] boundAddressesHostStrings = new String[boundAddresses.size()];
TransportAddress[] transportBoundAddresses = new TransportAddress[boundAddresses.size()]; TransportAddress[] transportBoundAddresses = new TransportAddress[boundAddresses.size()];
@ -789,37 +772,30 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
transportBoundAddresses[i] = new TransportAddress(boundAddress); transportBoundAddresses[i] = new TransportAddress(boundAddress);
} }
String[] publishHosts; List<String> publishHosts = profileSettings.publishHosts;
if (DEFAULT_PROFILE.equals(name)) { if (profileSettings.isDefaultProfile == false && publishHosts.isEmpty()) {
publishHosts = PUBLISH_HOST.get(settings).toArray(Strings.EMPTY_ARRAY); publishHosts = Arrays.asList(boundAddressesHostStrings);
} else {
publishHosts = profileSettings.getAsArray("publish_host", boundAddressesHostStrings);
} }
if (publishHosts == null || publishHosts.length == 0) { if (publishHosts.isEmpty()) {
publishHosts = NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY); publishHosts = NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING.get(settings);
} }
final InetAddress publishInetAddress; final InetAddress publishInetAddress;
try { try {
publishInetAddress = networkService.resolvePublishHostAddresses(publishHosts); publishInetAddress = networkService.resolvePublishHostAddresses(publishHosts.toArray(Strings.EMPTY_ARRAY));
} catch (Exception e) { } catch (Exception e) {
throw new BindTransportException("Failed to resolve publish address", e); throw new BindTransportException("Failed to resolve publish address", e);
} }
final int publishPort = resolvePublishPort(name, settings, profileSettings, boundAddresses, publishInetAddress); final int publishPort = resolvePublishPort(profileSettings, boundAddresses, publishInetAddress);
final TransportAddress publishAddress = new TransportAddress(new InetSocketAddress(publishInetAddress, publishPort)); final TransportAddress publishAddress = new TransportAddress(new InetSocketAddress(publishInetAddress, publishPort));
return new BoundTransportAddress(transportBoundAddresses, publishAddress); return new BoundTransportAddress(transportBoundAddresses, publishAddress);
} }
// package private for tests // package private for tests
public static int resolvePublishPort(String profileName, Settings settings, Settings profileSettings, public static int resolvePublishPort(ProfileSettings profileSettings, List<InetSocketAddress> boundAddresses,
List<InetSocketAddress> boundAddresses, InetAddress publishInetAddress) { InetAddress publishInetAddress) {
int publishPort; int publishPort = profileSettings.publishPort;
if (DEFAULT_PROFILE.equals(profileName)) {
publishPort = PUBLISH_PORT.get(settings);
} else {
publishPort = profileSettings.getAsInt("publish_port", -1);
}
// if port not explicitly provided, search for port of address in boundAddresses that matches publishInetAddress // if port not explicitly provided, search for port of address in boundAddresses that matches publishInetAddress
if (publishPort < 0) { if (publishPort < 0) {
@ -844,7 +820,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
} }
if (publishPort < 0) { if (publishPort < 0) {
String profileExplanation = DEFAULT_PROFILE.equals(profileName) ? "" : " for profile " + profileName; String profileExplanation = profileSettings.isDefaultProfile ? "" : " for profile " + profileSettings.profileName;
throw new BindTransportException("Failed to auto-resolve publish port" + profileExplanation + ", multiple bound addresses " + throw new BindTransportException("Failed to auto-resolve publish port" + profileExplanation + ", multiple bound addresses " +
boundAddresses + " with distinct ports and none of them matched the publish address (" + publishInetAddress + "). " + boundAddresses + " with distinct ports and none of them matched the publish address (" + publishInetAddress + "). " +
"Please specify a unique port by setting " + PORT.getKey() + " or " + "Please specify a unique port by setting " + PORT.getKey() + " or " +
@ -1729,4 +1705,61 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
getNumOpenServerConnections(), readBytesMetric.count(), readBytesMetric.sum(), transmittedBytesMetric.count(), getNumOpenServerConnections(), readBytesMetric.count(), readBytesMetric.sum(), transmittedBytesMetric.count(),
transmittedBytesMetric.sum()); transmittedBytesMetric.sum());
} }
/**
* Returns all profile settings for the given settings object
*/
public static Set<ProfileSettings> getProfileSettings(Settings settings) {
HashSet<ProfileSettings> profiles = new HashSet<>();
boolean isDefaultSet = false;
for (String profile : settings.getGroups("transport.profiles.", true).keySet()) {
profiles.add(new ProfileSettings(settings, profile));
if (DEFAULT_PROFILE.equals(profile)) {
isDefaultSet = true;
}
}
if (isDefaultSet == false) {
profiles.add(new ProfileSettings(settings, DEFAULT_PROFILE));
}
return Collections.unmodifiableSet(profiles);
}
/**
* Representation of a transport profile settings for a <tt>transport.profiles.$profilename.*</tt>
*/
public static final class ProfileSettings {
public final String profileName;
public final boolean tcpNoDelay;
public final boolean tcpKeepAlive;
public final boolean reuseAddress;
public final ByteSizeValue sendBufferSize;
public final ByteSizeValue receiveBufferSize;
public final List<String> bindHosts;
public final List<String> publishHosts;
public final String portOrRange;
public final int publishPort;
public final boolean isDefaultProfile;
public ProfileSettings(Settings settings, String profileName) {
this.profileName = profileName;
isDefaultProfile = DEFAULT_PROFILE.equals(profileName);
tcpKeepAlive = TCP_KEEP_ALIVE_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
tcpNoDelay = TCP_NO_DELAY_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
reuseAddress = TCP_REUSE_ADDRESS_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
sendBufferSize = TCP_SEND_BUFFER_SIZE_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
receiveBufferSize = TCP_RECEIVE_BUFFER_SIZE_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
List<String> profileBindHosts = BIND_HOST_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
bindHosts = (profileBindHosts.isEmpty() ? NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING.get(settings)
: profileBindHosts);
publishHosts = PUBLISH_HOST_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
Setting<String> concretePort = PORT_PROFILE.getConcreteSettingForNamespace(profileName);
if (concretePort.exists(settings) == false && isDefaultProfile == false) {
throw new IllegalStateException("profile [" + profileName + "] has no port configured");
}
portOrRange = PORT_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
publishPort = isDefaultProfile ? PUBLISH_PORT.get(settings) :
PUBLISH_PORT_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
}
}
} }

View File

@ -42,34 +42,39 @@ public class PublishPortTests extends ESTestCase {
boolean useProfile = randomBoolean(); boolean useProfile = randomBoolean();
final String profile; final String profile;
final Settings settings; Settings baseSettings;
final Settings profileSettings; Settings settings;
if (useProfile) { if (useProfile) {
profile = "some_profile"; baseSettings = Settings.builder().put("transport.profiles.some_profile.port", 0).build();
settings = randomBoolean() ? Settings.EMPTY : Settings.builder().put(TcpTransport.PUBLISH_PORT.getKey(), 9081).build(); settings = randomBoolean() ? Settings.EMPTY : Settings.builder().put(TcpTransport.PUBLISH_PORT.getKey(), 9081).build();
profileSettings = Settings.builder().put("publish_port", 9080).build(); settings = Settings.builder().put(settings).put(baseSettings).put("transport.profiles.some_profile.publish_port", 9080).build();
profile = "some_profile";
} else { } else {
profile = TcpTransport.DEFAULT_PROFILE; baseSettings = Settings.EMPTY;
settings = Settings.builder().put(TcpTransport.PUBLISH_PORT.getKey(), 9081).build(); settings = Settings.builder().put(TcpTransport.PUBLISH_PORT.getKey(), 9081).build();
profileSettings = randomBoolean() ? Settings.EMPTY : Settings.builder().put("publish_port", 9080).build();; settings = randomBoolean() ? settings :
Settings.builder().put(settings).put("transport.profiles.default.publish_port", 9080).build();
profile = "default";
} }
int publishPort = resolvePublishPort(profile, settings, profileSettings, int publishPort = resolvePublishPort(new TcpTransport.ProfileSettings(settings, profile),
randomAddresses(), getByName("127.0.0.2")); randomAddresses(), getByName("127.0.0.2"));
assertThat("Publish port should be explicitly set", publishPort, equalTo(useProfile ? 9080 : 9081)); assertThat("Publish port should be explicitly set", publishPort, equalTo(useProfile ? 9080 : 9081));
publishPort = resolvePublishPort(profile, Settings.EMPTY, Settings.EMPTY, publishPort = resolvePublishPort(new TcpTransport.ProfileSettings(baseSettings, profile),
asList(address("127.0.0.1", boundPort), address("127.0.0.2", otherBoundPort)), asList(address("127.0.0.1", boundPort), address("127.0.0.2", otherBoundPort)),
getByName("127.0.0.1")); getByName("127.0.0.1"));
assertThat("Publish port should be derived from matched address", publishPort, equalTo(boundPort)); assertThat("Publish port should be derived from matched address", publishPort, equalTo(boundPort));
publishPort = resolvePublishPort(profile, Settings.EMPTY, Settings.EMPTY, publishPort = resolvePublishPort(new TcpTransport.ProfileSettings(baseSettings, profile),
asList(address("127.0.0.1", boundPort), address("127.0.0.2", boundPort)), asList(address("127.0.0.1", boundPort), address("127.0.0.2", boundPort)),
getByName("127.0.0.3")); getByName("127.0.0.3"));
assertThat("Publish port should be derived from unique port of bound addresses", publishPort, equalTo(boundPort)); assertThat("Publish port should be derived from unique port of bound addresses", publishPort, equalTo(boundPort));
try { try {
resolvePublishPort(profile, Settings.EMPTY, Settings.EMPTY, resolvePublishPort(new TcpTransport.ProfileSettings(baseSettings, profile),
asList(address("127.0.0.1", boundPort), address("127.0.0.2", otherBoundPort)), asList(address("127.0.0.1", boundPort), address("127.0.0.2", otherBoundPort)),
getByName("127.0.0.3")); getByName("127.0.0.3"));
fail("Expected BindTransportException as publish_port not specified and non-unique port of bound addresses"); fail("Expected BindTransportException as publish_port not specified and non-unique port of bound addresses");
@ -77,13 +82,13 @@ public class PublishPortTests extends ESTestCase {
assertThat(e.getMessage(), containsString("Failed to auto-resolve publish port")); assertThat(e.getMessage(), containsString("Failed to auto-resolve publish port"));
} }
publishPort = resolvePublishPort(profile, Settings.EMPTY, Settings.EMPTY, publishPort = resolvePublishPort(new TcpTransport.ProfileSettings(baseSettings, profile),
asList(address("0.0.0.0", boundPort), address("127.0.0.2", otherBoundPort)), asList(address("0.0.0.0", boundPort), address("127.0.0.2", otherBoundPort)),
getByName("127.0.0.1")); getByName("127.0.0.1"));
assertThat("Publish port should be derived from matching wildcard address", publishPort, equalTo(boundPort)); assertThat("Publish port should be derived from matching wildcard address", publishPort, equalTo(boundPort));
if (NetworkUtils.SUPPORTS_V6) { if (NetworkUtils.SUPPORTS_V6) {
publishPort = resolvePublishPort(profile, Settings.EMPTY, Settings.EMPTY, publishPort = resolvePublishPort(new TcpTransport.ProfileSettings(baseSettings, profile),
asList(address("0.0.0.0", boundPort), address("127.0.0.2", otherBoundPort)), asList(address("0.0.0.0", boundPort), address("127.0.0.2", otherBoundPort)),
getByName("::1")); getByName("::1"));
assertThat("Publish port should be derived from matching wildcard address", publishPort, equalTo(boundPort)); assertThat("Publish port should be derived from matching wildcard address", publishPort, equalTo(boundPort));

View File

@ -62,7 +62,6 @@ import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportServiceAdapter;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -71,6 +70,7 @@ import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -142,10 +142,6 @@ public class Netty4Transport extends TcpTransport<Channel> {
} }
} }
TransportServiceAdapter transportServiceAdapter() {
return transportServiceAdapter;
}
@Override @Override
protected void doStart() { protected void doStart() {
boolean success = false; boolean success = false;
@ -154,14 +150,9 @@ public class Netty4Transport extends TcpTransport<Channel> {
if (NetworkService.NETWORK_SERVER.get(settings)) { if (NetworkService.NETWORK_SERVER.get(settings)) {
final Netty4OpenChannelsHandler openChannels = new Netty4OpenChannelsHandler(logger); final Netty4OpenChannelsHandler openChannels = new Netty4OpenChannelsHandler(logger);
this.serverOpenChannels = openChannels; this.serverOpenChannels = openChannels;
// loop through all profiles and start them up, special handling for default one for (ProfileSettings profileSettings : profileSettings) {
for (Map.Entry<String, Settings> entry : buildProfileSettings().entrySet()) { createServerBootstrap(profileSettings);
// merge fallback settings with default settings with profile settings so we have complete settings with default values bindServer(profileSettings);
final Settings settings = Settings.builder()
.put(createFallbackSettings())
.put(entry.getValue()).build();
createServerBootstrap(entry.getKey(), settings);
bindServer(entry.getKey(), settings);
} }
} }
super.doStart(); super.doStart();
@ -204,46 +195,12 @@ public class Netty4Transport extends TcpTransport<Channel> {
return bootstrap; return bootstrap;
} }
private Settings createFallbackSettings() { private void createServerBootstrap(ProfileSettings profileSettings) {
Settings.Builder fallbackSettingsBuilder = Settings.builder(); String name = profileSettings.profileName;
List<String> fallbackBindHost = TcpTransport.BIND_HOST.get(settings);
if (fallbackBindHost.isEmpty() == false) {
fallbackSettingsBuilder.putArray("bind_host", fallbackBindHost);
}
List<String> fallbackPublishHost = TcpTransport.PUBLISH_HOST.get(settings);
if (fallbackPublishHost.isEmpty() == false) {
fallbackSettingsBuilder.putArray("publish_host", fallbackPublishHost);
}
boolean fallbackTcpNoDelay = TcpTransport.TCP_NO_DELAY.get(settings);
fallbackSettingsBuilder.put("tcp_no_delay", fallbackTcpNoDelay);
boolean fallbackTcpKeepAlive = TcpTransport.TCP_KEEP_ALIVE.get(settings);
fallbackSettingsBuilder.put("tcp_keep_alive", fallbackTcpKeepAlive);
boolean fallbackReuseAddress = TcpTransport.TCP_REUSE_ADDRESS.get(settings);
fallbackSettingsBuilder.put("reuse_address", fallbackReuseAddress);
ByteSizeValue fallbackTcpSendBufferSize = TCP_SEND_BUFFER_SIZE.get(settings);
if (fallbackTcpSendBufferSize.getBytes() >= 0) {
fallbackSettingsBuilder.put("tcp_send_buffer_size", fallbackTcpSendBufferSize);
}
ByteSizeValue fallbackTcpBufferSize = TCP_RECEIVE_BUFFER_SIZE.get(settings);
if (fallbackTcpBufferSize.getBytes() >= 0) {
fallbackSettingsBuilder.put("tcp_receive_buffer_size", fallbackTcpBufferSize);
}
return fallbackSettingsBuilder.build();
}
private void createServerBootstrap(String name, Settings settings) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], " logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], "
+ "connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]", + "connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]",
name, workerCount, settings.get("port"), settings.get("bind_host"), settings.get("publish_host"), compress, name, workerCount, profileSettings.portOrRange, profileSettings.bindHosts, profileSettings.publishHosts, compress,
defaultConnectionProfile.getConnectTimeout(), defaultConnectionProfile.getConnectTimeout(),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY), defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK), defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK),
@ -253,6 +210,7 @@ public class Netty4Transport extends TcpTransport<Channel> {
receivePredictorMin, receivePredictorMax); receivePredictorMin, receivePredictorMax);
} }
final ThreadFactory workerFactory = daemonThreadFactory(this.settings, TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, name); final ThreadFactory workerFactory = daemonThreadFactory(this.settings, TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, name);
final ServerBootstrap serverBootstrap = new ServerBootstrap(); final ServerBootstrap serverBootstrap = new ServerBootstrap();
@ -260,34 +218,31 @@ public class Netty4Transport extends TcpTransport<Channel> {
serverBootstrap.group(new NioEventLoopGroup(workerCount, workerFactory)); serverBootstrap.group(new NioEventLoopGroup(workerCount, workerFactory));
serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(getServerChannelInitializer(name, settings)); serverBootstrap.childHandler(getServerChannelInitializer(name));
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings)); serverBootstrap.childOption(ChannelOption.TCP_NODELAY, profileSettings.tcpNoDelay);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings)); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, profileSettings.tcpKeepAlive);
final ByteSizeValue tcpSendBufferSize = TCP_SEND_BUFFER_SIZE.getDefault(settings); if (profileSettings.sendBufferSize.getBytes() != -1) {
if (tcpSendBufferSize != null && tcpSendBufferSize.getBytes() > 0) { serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(profileSettings.sendBufferSize.getBytes()));
serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));
} }
final ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.getDefault(settings); if (profileSettings.receiveBufferSize.getBytes() != -1) {
if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.getBytes() > 0) { serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(profileSettings.receiveBufferSize.bytesAsInt()));
serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.bytesAsInt()));
} }
serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator); serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator); serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
final boolean reuseAddress = TCP_REUSE_ADDRESS.get(settings); serverBootstrap.option(ChannelOption.SO_REUSEADDR, profileSettings.reuseAddress);
serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress); serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, profileSettings.reuseAddress);
serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);
serverBootstrap.validate(); serverBootstrap.validate();
serverBootstraps.put(name, serverBootstrap); serverBootstraps.put(name, serverBootstrap);
} }
protected ChannelHandler getServerChannelInitializer(String name, Settings settings) { protected ChannelHandler getServerChannelInitializer(String name) {
return new ServerChannelInitializer(name, settings); return new ServerChannelInitializer(name);
} }
protected ChannelHandler getClientChannelInitializer() { protected ChannelHandler getClientChannelInitializer() {
@ -455,11 +410,9 @@ public class Netty4Transport extends TcpTransport<Channel> {
protected class ServerChannelInitializer extends ChannelInitializer<Channel> { protected class ServerChannelInitializer extends ChannelInitializer<Channel> {
protected final String name; protected final String name;
protected final Settings settings;
protected ServerChannelInitializer(String name, Settings settings) { protected ServerChannelInitializer(String name) {
this.name = name; this.name = name;
this.settings = settings;
} }
@Override @Override

View File

@ -92,9 +92,9 @@ public class NettyTransportMultiPortTests extends ESTestCase {
.build(); .build();
ThreadPool threadPool = new TestThreadPool("tst"); ThreadPool threadPool = new TestThreadPool("tst");
try (TcpTransport<?> transport = startTransport(settings, threadPool)) { try {
assertEquals(0, transport.profileBoundAddresses().size()); IllegalStateException ex = expectThrows(IllegalStateException.class, () -> startTransport(settings, threadPool));
assertEquals(1, transport.boundAddress().boundAddresses().length); assertEquals("profile [client1] has no port configured", ex.getMessage());
} finally { } finally {
terminate(threadPool); terminate(threadPool);
} }
@ -116,24 +116,6 @@ public class NettyTransportMultiPortTests extends ESTestCase {
} }
} }
public void testThatProfileWithoutValidNameIsIgnored() throws Exception {
Settings settings = Settings.builder()
.put("network.host", host)
.put(TcpTransport.PORT.getKey(), 0)
// mimics someone trying to define a profile for .local which is the profile for a node request to itself
.put("transport.profiles." + TransportService.DIRECT_RESPONSE_PROFILE + ".port", 22) // will not actually bind to this
.put("transport.profiles..port", 23) // will not actually bind to this
.build();
ThreadPool threadPool = new TestThreadPool("tst");
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
assertEquals(0, transport.profileBoundAddresses().size());
assertEquals(1, transport.boundAddress().boundAddresses().length);
} finally {
terminate(threadPool);
}
}
private TcpTransport<?> startTransport(Settings settings, ThreadPool threadPool) { private TcpTransport<?> startTransport(Settings settings, ThreadPool threadPool) {
BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
TcpTransport<?> transport = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()), TcpTransport<?> transport = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
@ -143,5 +125,4 @@ public class NettyTransportMultiPortTests extends ESTestCase {
assertThat(transport.lifecycleState(), is(Lifecycle.State.STARTED)); assertThat(transport.lifecycleState(), is(Lifecycle.State.STARTED));
return transport; return transport;
} }
} }

View File

@ -30,12 +30,15 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
@ -54,6 +57,8 @@ import org.junit.Before;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
@ -75,6 +80,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
@ -2419,4 +2426,176 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
serviceC.close(); serviceC.close();
} }
} }
public void testTransportProfilesWithPortAndHost() {
boolean doIPV6 = NetworkUtils.SUPPORTS_V6;
List<String> hosts;
if (doIPV6) {
hosts = Arrays.asList("_local:ipv6_", "_local:ipv4_");
} else {
hosts = Arrays.asList("_local:ipv4_");
}
try (MockTransportService serviceC = build(Settings.builder()
.put("name", "TS_TEST")
.put("transport.profiles.default.bind_host", "_local:ipv4_")
.put("transport.profiles.some_profile.port", "8900-9000")
.put("transport.profiles.some_profile.bind_host", "_local:ipv4_")
.put("transport.profiles.some_other_profile.port", "8700-8800")
.putArray("transport.profiles.some_other_profile.bind_host", hosts)
.putArray("transport.profiles.some_other_profile.publish_host", "_local:ipv4_")
.build(), version0, null, true)) {
serviceC.start();
serviceC.acceptIncomingRequests();
Map<String, BoundTransportAddress> profileBoundAddresses = serviceC.transport.profileBoundAddresses();
assertTrue(profileBoundAddresses.containsKey("some_profile"));
assertTrue(profileBoundAddresses.containsKey("some_other_profile"));
assertTrue(profileBoundAddresses.get("some_profile").publishAddress().getPort() >= 8900);
assertTrue(profileBoundAddresses.get("some_profile").publishAddress().getPort() < 9000);
assertTrue(profileBoundAddresses.get("some_other_profile").publishAddress().getPort() >= 8700);
assertTrue(profileBoundAddresses.get("some_other_profile").publishAddress().getPort() < 8800);
assertEquals(profileBoundAddresses.get("some_profile").boundAddresses().length, 1);
if (doIPV6) {
assertTrue(profileBoundAddresses.get("some_other_profile").boundAddresses().length >= 2);
int ipv4 = 0;
int ipv6 = 0;
for (TransportAddress addr : profileBoundAddresses.get("some_other_profile").boundAddresses()) {
if (addr.address().getAddress() instanceof Inet4Address) {
ipv4++;
} else if (addr.address().getAddress() instanceof Inet6Address) {
ipv6++;
} else {
fail("what kind of address is this: " + addr.address().getAddress());
}
}
assertTrue("num ipv4 is wrong: " + ipv4, ipv4 >= 1);
assertTrue("num ipv6 is wrong: " + ipv6, ipv6 >= 1);
} else {
assertEquals(profileBoundAddresses.get("some_other_profile").boundAddresses().length, 1);
}
assertTrue(profileBoundAddresses.get("some_other_profile").publishAddress().address().getAddress() instanceof Inet4Address);
}
}
public void testProfileSettings() {
boolean enable = randomBoolean();
Settings globalSettings = Settings.builder()
.put("network.tcp.no_delay", enable)
.put("network.tcp.keep_alive", enable)
.put("network.tcp.reuse_address", enable)
.put("network.tcp.send_buffer_size", "43000b")
.put("network.tcp.receive_buffer_size", "42000b")
.put("network.publish_host", "the_publish_host")
.put("network.bind_host", "the_bind_host")
.build();
Settings globalSettings2 = Settings.builder()
.put("network.tcp.no_delay", !enable)
.put("network.tcp.keep_alive", !enable)
.put("network.tcp.reuse_address", !enable)
.put("network.tcp.send_buffer_size", "4b")
.put("network.tcp.receive_buffer_size", "3b")
.put("network.publish_host", "another_publish_host")
.put("network.bind_host", "another_bind_host")
.build();
Settings transportSettings = Settings.builder()
.put("transport.tcp_no_delay", enable)
.put("transport.tcp.keep_alive", enable)
.put("transport.tcp.reuse_address", enable)
.put("transport.tcp.send_buffer_size", "43000b")
.put("transport.tcp.receive_buffer_size", "42000b")
.put("transport.publish_host", "the_publish_host")
.put("transport.tcp.port", "9700-9800")
.put("transport.bind_host", "the_bind_host")
.put(globalSettings2)
.build();
Settings transportSettings2 = Settings.builder()
.put("transport.tcp_no_delay", !enable)
.put("transport.tcp.keep_alive", !enable)
.put("transport.tcp.reuse_address", !enable)
.put("transport.tcp.send_buffer_size", "5b")
.put("transport.tcp.receive_buffer_size", "6b")
.put("transport.publish_host", "another_publish_host")
.put("transport.tcp.port", "9702-9802")
.put("transport.bind_host", "another_bind_host")
.put(globalSettings2)
.build();
Settings defaultProfileSettings = Settings.builder()
.put("transport.profiles.default.tcp_no_delay", enable)
.put("transport.profiles.default.tcp_keep_alive", enable)
.put("transport.profiles.default.reuse_address", enable)
.put("transport.profiles.default.send_buffer_size", "43000b")
.put("transport.profiles.default.receive_buffer_size", "42000b")
.put("transport.profiles.default.port", "9700-9800")
.put("transport.profiles.default.publish_host", "the_publish_host")
.put("transport.profiles.default.bind_host", "the_bind_host")
.put("transport.profiles.default.publish_port", 42)
.put(randomBoolean() ? transportSettings2 : globalSettings2) // ensure that we have profile precedence
.build();
Settings profileSettings = Settings.builder()
.put("transport.profiles.some_profile.tcp_no_delay", enable)
.put("transport.profiles.some_profile.tcp_keep_alive", enable)
.put("transport.profiles.some_profile.reuse_address", enable)
.put("transport.profiles.some_profile.send_buffer_size", "43000b")
.put("transport.profiles.some_profile.receive_buffer_size", "42000b")
.put("transport.profiles.some_profile.port", "9700-9800")
.put("transport.profiles.some_profile.publish_host", "the_publish_host")
.put("transport.profiles.some_profile.bind_host", "the_bind_host")
.put("transport.profiles.some_profile.publish_port", 42)
.put(randomBoolean() ? transportSettings2 : globalSettings2) // ensure that we have profile precedence
.put(randomBoolean() ? defaultProfileSettings : Settings.EMPTY)
.build();
Settings randomSettings = randomFrom(random(), globalSettings, transportSettings, profileSettings);
ClusterSettings clusterSettings = new ClusterSettings(randomSettings, ClusterSettings
.BUILT_IN_CLUSTER_SETTINGS);
clusterSettings.validate(randomSettings);
TcpTransport.ProfileSettings settings = new TcpTransport.ProfileSettings(
Settings.builder().put(randomSettings).put("transport.profiles.some_profile.port", "9700-9800").build(), // port is required
"some_profile");
assertEquals(enable, settings.tcpNoDelay);
assertEquals(enable, settings.tcpKeepAlive);
assertEquals(enable, settings.reuseAddress);
assertEquals(43000, settings.sendBufferSize.getBytes());
assertEquals(42000, settings.receiveBufferSize.getBytes());
if (randomSettings == profileSettings) {
assertEquals(42, settings.publishPort);
} else {
assertEquals(-1, settings.publishPort);
}
if (randomSettings == globalSettings) { // publish host has no global fallback for the profile since we later resolve it based on
// the bound address
assertEquals(Collections.emptyList(), settings.publishHosts);
} else {
assertEquals(Collections.singletonList("the_publish_host"), settings.publishHosts);
}
assertEquals("9700-9800", settings.portOrRange);
assertEquals(Collections.singletonList("the_bind_host"), settings.bindHosts);
}
public void testProfilesIncludesDefault() {
Set<TcpTransport.ProfileSettings> profileSettings = TcpTransport.getProfileSettings(Settings.EMPTY);
assertEquals(1, profileSettings.size());
assertEquals(TcpTransport.DEFAULT_PROFILE, profileSettings.stream().findAny().get().profileName);
profileSettings = TcpTransport.getProfileSettings(Settings.builder()
.put("transport.profiles.test.port", "0")
.build());
assertEquals(2, profileSettings.size());
assertEquals(new HashSet<>(Arrays.asList("default", "test")), profileSettings.stream().map(s -> s.profileName).collect(Collectors
.toSet()));
profileSettings = TcpTransport.getProfileSettings(Settings.builder()
.put("transport.profiles.test.port", "0")
.put("transport.profiles.default.port", "0")
.build());
assertEquals(2, profileSettings.size());
assertEquals(new HashSet<>(Arrays.asList("default", "test")), profileSettings.stream().map(s -> s.profileName).collect(Collectors
.toSet()));
}
} }

View File

@ -389,10 +389,8 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
try { try {
if (NetworkService.NETWORK_SERVER.get(settings)) { if (NetworkService.NETWORK_SERVER.get(settings)) {
// loop through all profiles and start them up, special handling for default one // loop through all profiles and start them up, special handling for default one
for (Map.Entry<String, Settings> entry : buildProfileSettings().entrySet()) { for (ProfileSettings profileSettings : profileSettings) {
final Settings profileSettings = Settings.builder() bindServer(profileSettings);
.put(entry.getValue()).build();
bindServer(entry.getKey(), profileSettings);
} }
} }
super.doStart(); super.doStart();

View File

@ -45,7 +45,7 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -69,7 +69,6 @@ public class NioTransport extends TcpTransport<NioChannel> {
intSetting("transport.nio.acceptor_count", 1, 1, Setting.Property.NodeScope); intSetting("transport.nio.acceptor_count", 1, 1, Setting.Property.NodeScope);
private final TcpReadHandler tcpReadHandler = new TcpReadHandler(this); private final TcpReadHandler tcpReadHandler = new TcpReadHandler(this);
private final BigArrays bigArrays;
private final ConcurrentMap<String, ChannelFactory> profileToChannelFactory = newConcurrentMap(); private final ConcurrentMap<String, ChannelFactory> profileToChannelFactory = newConcurrentMap();
private final OpenChannels openChannels = new OpenChannels(logger); private final OpenChannels openChannels = new OpenChannels(logger);
private final ArrayList<AcceptingSelector> acceptors = new ArrayList<>(); private final ArrayList<AcceptingSelector> acceptors = new ArrayList<>();
@ -80,7 +79,6 @@ public class NioTransport extends TcpTransport<NioChannel> {
public NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, public NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
super("nio", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService); super("nio", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
this.bigArrays = bigArrays;
} }
@Override @Override
@ -175,13 +173,9 @@ public class NioTransport extends TcpTransport<NioChannel> {
acceptors.add(acceptor); acceptors.add(acceptor);
} }
// loop through all profiles and start them up, special handling for default one // loop through all profiles and start them up, special handling for default one
for (Map.Entry<String, Settings> entry : buildProfileSettings().entrySet()) { for (ProfileSettings profileSettings : profileSettings) {
// merge fallback settings with default settings with profile settings so we have complete settings with default values profileToChannelFactory.putIfAbsent(profileSettings.profileName, new ChannelFactory(profileSettings, tcpReadHandler));
final Settings profileSettings = Settings.builder() bindServer(profileSettings);
.put(createFallbackSettings())
.put(entry.getValue()).build();
profileToChannelFactory.putIfAbsent(entry.getKey(), new ChannelFactory(profileSettings, tcpReadHandler));
bindServer(entry.getKey(), profileSettings);
} }
} }
client = createClient(); client = createClient();
@ -269,7 +263,7 @@ public class NioTransport extends TcpTransport<NioChannel> {
private NioClient createClient() { private NioClient createClient() {
Supplier<SocketSelector> selectorSupplier = new RoundRobinSelectorSupplier(socketSelectors); Supplier<SocketSelector> selectorSupplier = new RoundRobinSelectorSupplier(socketSelectors);
ChannelFactory channelFactory = new ChannelFactory(settings, tcpReadHandler); ChannelFactory channelFactory = new ChannelFactory(new ProfileSettings(settings, "default"), tcpReadHandler);
return new NioClient(logger, openChannels, selectorSupplier, defaultConnectionProfile.getConnectTimeout(), channelFactory); return new NioClient(logger, openChannels, selectorSupplier, defaultConnectionProfile.getConnectTimeout(), channelFactory);
} }

View File

@ -19,8 +19,6 @@
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.transport.nio.channel;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.mocksocket.PrivilegedSocketAccess; import org.elasticsearch.mocksocket.PrivilegedSocketAccess;
import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.nio.TcpReadHandler; import org.elasticsearch.transport.nio.TcpReadHandler;
@ -31,9 +29,6 @@ import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
public class ChannelFactory { public class ChannelFactory {
@ -44,12 +39,12 @@ public class ChannelFactory {
private final int tcpReceiveBufferSize; private final int tcpReceiveBufferSize;
private final TcpReadHandler handler; private final TcpReadHandler handler;
public ChannelFactory(Settings settings, TcpReadHandler handler) { public ChannelFactory(TcpTransport.ProfileSettings profileSettings, TcpReadHandler handler) {
tcpNoDelay = TcpTransport.TCP_NO_DELAY.get(settings); tcpNoDelay = profileSettings.tcpNoDelay;
tcpKeepAlive = TcpTransport.TCP_KEEP_ALIVE.get(settings); tcpKeepAlive = profileSettings.tcpKeepAlive;
tcpReusedAddress = TcpTransport.TCP_REUSE_ADDRESS.get(settings); tcpReusedAddress = profileSettings.reuseAddress;
tcpSendBufferSize = Math.toIntExact(TcpTransport.TCP_SEND_BUFFER_SIZE.get(settings).getBytes()); tcpSendBufferSize = Math.toIntExact(profileSettings.sendBufferSize.getBytes());
tcpReceiveBufferSize = Math.toIntExact(TcpTransport.TCP_RECEIVE_BUFFER_SIZE.get(settings).getBytes()); tcpReceiveBufferSize = Math.toIntExact(profileSettings.receiveBufferSize.getBytes());
this.handler = handler; this.handler = handler;
} }
@ -94,12 +89,4 @@ public class ChannelFactory {
socket.setSendBufferSize(tcpReceiveBufferSize); socket.setSendBufferSize(tcpReceiveBufferSize);
} }
} }
private static <T> T getSocketChannel(CheckedSupplier<T, IOException> supplier) throws IOException {
try {
return AccessController.doPrivileged((PrivilegedExceptionAction<T>) supplier::get);
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
} }

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.mocksocket.MockServerSocket; import org.elasticsearch.mocksocket.MockServerSocket;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.nio.TcpReadHandler; import org.elasticsearch.transport.nio.TcpReadHandler;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -39,7 +40,8 @@ import static org.mockito.Mockito.mock;
public abstract class AbstractNioChannelTestCase extends ESTestCase { public abstract class AbstractNioChannelTestCase extends ESTestCase {
ChannelFactory channelFactory = new ChannelFactory(Settings.EMPTY, mock(TcpReadHandler.class)); ChannelFactory channelFactory = new ChannelFactory(new TcpTransport.ProfileSettings(Settings.EMPTY, "default"),
mock(TcpReadHandler.class));
MockServerSocket mockServerSocket; MockServerSocket mockServerSocket;
private Thread serverThread; private Thread serverThread;